https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Ruoyu Qian

08/15/2022, 7:37 PM
I am pretty new to Dagster, I want to know how can I trigger a job base on the completion of a materialization of software-defined assets. I read that sensor might be able to do it. this is the right direction to try?
🤖 1
s

sandy

08/15/2022, 8:44 PM
hey @Ruoyu Qian - yeah, that's exactly what asset sensors are for
r

Ruoyu Qian

08/15/2022, 9:07 PM
So I’m trying to have a job that makes an API call to a service. But I need this job to be dependent on another job which materialize certain table first
s

sandy

08/15/2022, 9:09 PM
that would be a good use case for asset sensors
❤️ 1
r

Ruoyu Qian

08/15/2022, 9:13 PM
I’m not exactly sure how that syntax works. so I already got a define_asset_job defined
Copy code
my_job = define_asset_job(
    name="my_job",
    description="""
        my_job  materializes all dbt models tagged with 'needed', including upstream dependencies.
    """,
selection=AssetSelection.groups("the_group_needed").upstream(),
)
how in the sensor do I pass in this
define_asset_job
. I don’t see it in the documentation
s

sandy

08/15/2022, 10:01 PM
if you want to launch a run after the materialization of a particular asset, you'd use an asset sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors if you want to launch a run after the completion of a particular job, you'd use a run status sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#run-status-sensors so you could create a run status sensor that monitors "my_job"
r

Ruoyu Qian

08/16/2022, 2:18 PM
Hey Sandy, I am trying to add
run_status_sensor
decorator to
my_job
which returns a
UnresolvedAssetJobDefinition
. I think
run_status_sensor
expects a function or a class. How should I approach this?
Copy code
@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    request_job=my_next_job,
)
my_job = define_asset_job(
    name="vero_job",
    description="""
      ,
    selection=AssetSelection.groups("a group").upstream(),
)
s

sandy

08/16/2022, 3:37 PM
it would look like this:
Copy code
very_job = define_asset_job(
    name="vero_job",
    description="""
      ,
    selection=AssetSelection.groups("a group").upstream(),
)

@run_status_sensor(
    monitored_jobs=[vero_job],
    run_status=DagsterRunStatus.SUCCESS,
    request_job=my_next_job,
)
def my_run_status_sensor(context):
    return RunRequest(run_key=None)
r

Ruoyu Qian

08/16/2022, 4:19 PM
so in the
Copy code
def my_run_status_sensor(context):
    return RunRequest(run_key=None)
I don’t need to specify anything.?
s

sandy

08/16/2022, 4:26 PM
right
r

Ruoyu Qian

08/17/2022, 2:50 PM
Thanks, Sandy. Do I need to push the code to the cloud to be able to see any changes in the sensor tab? or is there a way to test the sensor locally? when specify a local upstream job to run when it finish how do I know if the sensor also worked? this is what I used to trigger upstream job
Copy code
dagster job execute -f analytics/repository.py --job my_uptream_job
Hey Sandy, is it possible to have a short zoom call for dagster support?