https://dagster.io/ logo
#dagster-support
Title
# dagster-support
p

Philip Gunning

08/11/2022, 8:38 AM
HI all, We use sensors to trigger some of our dependant jobs, for example a job to gather reference data that we use to call APIs in a later job. THis works fine, however we want to start implementing more segregated logic in the jobs, that would mean downstream tasks have multiple upstream dependencies. Generalised it currently looks like this:
Copy code
@sensor(job=load_dependent_data)
def my_sensor(context):
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="upstream_job",
            statuses=[PipelineRunStatus.SUCCESS],
            updated_after=datetime.now() - timedelta(days=2)
        ),
        order_by="update_timestamp",
        ascending=False,
    )

    if (len(run_records) > 0):
        print(f"run_records[0]: {run_records[0]}")
        # Just execute the last run
        yield RunRequest(run_key=run_records[0].pipeline_run.run_id)
And we want to move to something more like this:
Copy code
@sensor(job=load_dependent_data)
def my_sensor(context):
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_names=["upstream_job1", "upstream_job2"],
            statuses=[PipelineRunStatus.SUCCESS],
            updated_after=datetime.now() - timedelta(days=2)
        ),
        order_by="update_timestamp",
        ascending=False,
    )

    if (len(run_records) > 0):
        print(f"run_records[0]: {run_records[0]}")
        # Just execute the last run
        yield RunRequest(run_key=run_records[0].pipeline_run.run_id)
How can we pass in multiple jobs as the trigger for execution? Additionally can we parametrize the job listed in the decorator somehow? to reuse the sensor for multiple triggered jobs?
j

jamie

08/11/2022, 12:02 PM
Hi @Philip Gunning the RunsFilter object only accepts a single job name, however you could call
get_run_records
multiple times with different run filters and append the results. For passing multiple jobs, you can pass a list of jobs as
jobs
to
@sensor
and then specify which job you want to run in the
RunRequest
using the
job_name
parameter
Copy code
@sensor (
   jobs=[job_1, job_2]
)
def my_sensor():
     yield RunRequest(run_key=..., job_name="job_1")
     yield RunRequest(run_key=..., job_name="job_2"
fyi: using
jobs
on
@sensor
and
job_name
in
RunRequest
is considered experimental
❤️ 1
p

Philip Gunning

08/11/2022, 12:05 PM
for the latter, what i meant was an effort to almost parameterize the job passed in to make it reusable as a util functin i could import in multiple places
j

jamie

08/11/2022, 12:09 PM
ah ok, I think I'm following. My first thought it to make a factory function for producing the sensor
Copy code
def create_sensor(job):
    @sensor (
       job=job
    )
    def the_sensor():
      .... 

    return the_sensor 

# some other file

job_a_sensor = create_sensor(job_a)
is that what you were describing?
p

Philip Gunning

08/11/2022, 12:10 PM
that sounds great, thanks
s

Stefan Adelbert

08/16/2022, 4:33 AM
@Philip Gunning Where are you importing
PipelineRunStatus
from? I used to import
dagster.core.storage.pipeline_run.PipelineRunStatus
, but recently had to change that to
dagster._core.storage.pipeline_run.PipelineRunStatus
after upgrading to 1.0.3.
Perhaps
dagster._core.instance.PipelineRunStatus
is better.
j

jamie

08/16/2022, 2:09 PM
just an fyi for you guys,
PipelineRunStatus
has been deprecated in favor of
DagsterRunStatus
. They are the exact same, but we renamed it a while ago when we moved away from solids/pipelines
s

Stefan Adelbert

08/16/2022, 9:50 PM
Thanks @jamie. I noticed that type and suspected it should be used instead. Where is the "right" place to import it from?
dagster._core.instance.DagsterRunStatus
?
j

jamie

08/17/2022, 1:06 PM
it's part of our public api so you can just do
from dagster import DagsterRunStatus
👍 1
2 Views