Philip Gunning
08/11/2022, 8:38 AM@sensor(job=load_dependent_data)
def my_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name="upstream_job",
statuses=[PipelineRunStatus.SUCCESS],
updated_after=datetime.now() - timedelta(days=2)
),
order_by="update_timestamp",
ascending=False,
)
if (len(run_records) > 0):
print(f"run_records[0]: {run_records[0]}")
# Just execute the last run
yield RunRequest(run_key=run_records[0].pipeline_run.run_id)
And we want to move to something more like this:
@sensor(job=load_dependent_data)
def my_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_names=["upstream_job1", "upstream_job2"],
statuses=[PipelineRunStatus.SUCCESS],
updated_after=datetime.now() - timedelta(days=2)
),
order_by="update_timestamp",
ascending=False,
)
if (len(run_records) > 0):
print(f"run_records[0]: {run_records[0]}")
# Just execute the last run
yield RunRequest(run_key=run_records[0].pipeline_run.run_id)
How can we pass in multiple jobs as the trigger for execution?
Additionally can we parametrize the job listed in the decorator somehow? to reuse the sensor for multiple triggered jobs?jamie
08/11/2022, 12:02 PMget_run_records
multiple times with different run filters and append the results.
For passing multiple jobs, you can pass a list of jobs as jobs
to @sensor
and then specify which job you want to run in the RunRequest
using the job_name
parameter
@sensor (
jobs=[job_1, job_2]
)
def my_sensor():
yield RunRequest(run_key=..., job_name="job_1")
yield RunRequest(run_key=..., job_name="job_2"
fyi: using jobs
on @sensor
and job_name
in RunRequest
is considered experimentalPhilip Gunning
08/11/2022, 12:05 PMjamie
08/11/2022, 12:09 PMdef create_sensor(job):
@sensor (
job=job
)
def the_sensor():
....
return the_sensor
# some other file
job_a_sensor = create_sensor(job_a)
is that what you were describing?Philip Gunning
08/11/2022, 12:10 PMStefan Adelbert
08/16/2022, 4:33 AMPipelineRunStatus
from? I used to import dagster.core.storage.pipeline_run.PipelineRunStatus
, but recently had to change that to dagster._core.storage.pipeline_run.PipelineRunStatus
after upgrading to 1.0.3.dagster._core.instance.PipelineRunStatus
is better.jamie
08/16/2022, 2:09 PMPipelineRunStatus
has been deprecated in favor of DagsterRunStatus
. They are the exact same, but we renamed it a while ago when we moved away from solids/pipelinesStefan Adelbert
08/16/2022, 9:50 PMdagster._core.instance.DagsterRunStatus
?jamie
08/17/2022, 1:06 PMfrom dagster import DagsterRunStatus