BC A
01/31/2023, 4:24 PMjamie
01/31/2023, 5:35 PMdefine_asset_job
with no selection parameter
define_asset_job(name="all_assets_job")
https://docs.dagster.io/concepts/ops-jobs-graphs/jobs#from-software-defined-assetsBC A
01/31/2023, 5:38 PMdefine_asset_job(name="all_assets_job")
and the assets that I want to materialize are not defined as dependency to this graph (even though they should at some point) but the graph dosen't produce any materialization but just filling bigquery tables which are used as data source entry point for my elt.
(the graph i'm talking about is triggering google data transfer & bigquery transfer jobs)jamie
01/31/2023, 5:45 PMall_asset_job = define_asset_job(name="all_assets_job")
big_query_job = # the job that populates bigquery with your data
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=big_query_job,
request_job=all_asset_job,
)
def report_status_sensor(context):
return RunRequest(run_key=None)
BC A
01/31/2023, 5:53 PMfrom dagster._core.definitions.assets_job import build_assets_job
build_assets_job("myjob", assets= dbt_assets, resource_defs=resources_definitions)
But this is causing mess in my code and I don't know if I am supposed to use this build assets job function anyway. so I was wondering if there was an alternative to turn the result of define_asset_job() (very handy with the asset selector feature) into a resolved assetjamie
01/31/2023, 7:31 PMBC A
01/31/2023, 11:42 PMArgument of type "UnresolvedAssetJobDefinition" cannot be assigned to parameter "request_job" of type "GraphDefinition | JobDefinition | None" in function "run_status_sensor"
Type "UnresolvedAssetJobDefinition" cannot be assigned to type "GraphDefinition | JobDefinition | None"
"UnresolvedAssetJobDefinition" is incompatible with "GraphDefinition"
"UnresolvedAssetJobDefinition" is incompatible with "JobDefinition"
and the code :
@run_status_sensor(
run_status= DagsterRunStatus.SUCCESS,
monitored_jobs=[update_upstream_assets_job],
request_job= define_asset_job("elt_job", selection="*")
)
But actually when launching with dagit this works just fine I don't seem to really get it but wrong call from my side.
Thanks again, I'd take an explanation if anything!jamie
02/01/2023, 3:44 PMfrom dagster import asset, run_status_sensor, define_asset_job, Definitions, RunRequest, DagsterRunStatus, op, job
@asset
def one():
return 1
@asset
def two():
return 2
all_assets_job = define_asset_job("all_assets")
@op
def succeeds():
return True
@job
def good_job():
succeeds()
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=all_assets_job,
monitored_jobs=[good_job]
)
def my_run_status_sensor(context):
return RunRequest(run_key=None)
defs = Definitions(
assets=[one, two],
jobs=[good_job, all_assets_job],
sensors=[my_run_status_sensor]
)
I still think we should put up a PR to fix the type signatures, but in the meantime i want to make sure there isn’t an additional bug to investigateBC A
02/03/2023, 4:46 PMjamie
02/03/2023, 5:01 PM