Hello everyone, I want to write a job that allows ...
# ask-community
b
Hello everyone, I want to write a job that allows me to chain a graph or op and a JobDefinition using define_asset_job() function Or maybe design a graph that uses AssetSelection.all() instead of define_asset_job() Is there anyway to achieve this ??
j
Hi @BC A can you explain your use case a bit more? If you want a job that will materialize all of your assets, you can use
define_asset_job
with no selection parameter
Copy code
define_asset_job(name="all_assets_job")
https://docs.dagster.io/concepts/ops-jobs-graphs/jobs#from-software-defined-assets
b
Hi @jamie Thing is I do have a couple of ops wrapped into a graph that needs to run before materializing my defined assets job
Copy code
define_asset_job(name="all_assets_job")
and the assets that I want to materialize are not defined as dependency to this graph (even though they should at some point) but the graph dosen't produce any materialization but just filling bigquery tables which are used as data source entry point for my elt. (the graph i'm talking about is triggering google data transfer & bigquery transfer jobs)
j
I see. In that case, i think you’ll want to write a run_status_sensor that triggers whenever your first graph succeeds. Within the sensor you can start a run of the asset job. It would look something like this
Copy code
all_asset_job = define_asset_job(name="all_assets_job")
big_query_job = # the job that populates bigquery with your data

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    monitored_jobs=big_query_job,
    request_job=all_asset_job,
)
def report_status_sensor(context):
    return RunRequest(run_key=None)
b
Thank you very much. I initially started implementing a sensor but it looks like I missed this run status feature
Hey @jamie, I've got another blocking point which is : I use define_asset_job() to define my asset materialization job, but this function is returning an UnresolvedAssetJobDefinition, which is not supported by the run_status_sensor for the request_job param, it's expecting either a JobDefinition or a GraphDefinition. I managed to overcome the issue by using :
Copy code
from dagster._core.definitions.assets_job import build_assets_job
build_assets_job("myjob", assets= dbt_assets, resource_defs=resources_definitions)
But this is causing mess in my code and I don't know if I am supposed to use this build assets job function anyway. so I was wondering if there was an alternative to turn the result of define_asset_job() (very handy with the asset selector feature) into a resolved asset
j
are you getting an error passing the asset job to the sensor?
b
The error I was getting before launching dagit :
Copy code
Argument of type "UnresolvedAssetJobDefinition" cannot be assigned to parameter "request_job" of type "GraphDefinition | JobDefinition | None" in function "run_status_sensor"
  Type "UnresolvedAssetJobDefinition" cannot be assigned to type "GraphDefinition | JobDefinition | None"
    "UnresolvedAssetJobDefinition" is incompatible with "GraphDefinition"
    "UnresolvedAssetJobDefinition" is incompatible with "JobDefinition"
and the code :
Copy code
@run_status_sensor(
    run_status= DagsterRunStatus.SUCCESS,
    monitored_jobs=[update_upstream_assets_job],
    request_job= define_asset_job("elt_job", selection="*")
)
But actually when launching with dagit this works just fine I don't seem to really get it but wrong call from my side. Thanks again, I'd take an explanation if anything!
j
Yeah i think this is a type checking error on our side. we have other sensors that work with UnresolvedAssetJobs but the runstatussensor has a type check for only jobs and graphs. I don’t see a reason we can’t expand that to include the unresolved asset jobs. i’ll put a PR up and hopefully get a fix in soon!
🙏 1
hey just looping back on this. i wasn’t able to replicate the issue in my own setup. Can you try running this code snippet to see if you get the same error as before?
Copy code
from dagster import asset, run_status_sensor, define_asset_job, Definitions, RunRequest, DagsterRunStatus, op, job

@asset
def one():
    return 1

@asset
def two():
    return 2

all_assets_job = define_asset_job("all_assets")

@op
def succeeds():
    return True

@job
def good_job():
    succeeds()

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    request_job=all_assets_job,
    monitored_jobs=[good_job]
)
def my_run_status_sensor(context):
    return RunRequest(run_key=None)

defs = Definitions(
    assets=[one, two],
    jobs=[good_job, all_assets_job],
    sensors=[my_run_status_sensor]
)
I still think we should put up a PR to fix the type signatures, but in the meantime i want to make sure there isn’t an additional bug to investigate
b
Hey, seems like I'm not able to reproduce it either for a reason I ignore but it was just a Pylance error message
j
ok! I think the pr to change the type signature should remove the error for good then! it’ll come out in next week’s release
👌 1
231 Views