VxD
08/05/2022, 1:00 AMminimum_interval_seconds=10
.
We noticed that each time the sensor runs, it only processes one succeeded graph, even if 50 have succeeded over the past 10s.
This is heavily problematic because the sensor requires 10 minutes to process 60 completed jobs, which doesn't scale when we need to handle hundreds.
Is there a way we can get the sensor to process more than one pipeline in one go?rex
08/05/2022, 2:29 AMVxD
08/05/2022, 3:43 AMVxD
08/05/2022, 3:43 AMVxD
08/05/2022, 4:01 AMVxD
08/05/2022, 4:22 AMVxD
08/05/2022, 4:22 AMVxD
08/05/2022, 4:25 AMrex
08/05/2022, 4:29 AM.Values.sensors.useThreads
to enable the featurerex
08/05/2022, 4:29 AMrex
08/05/2022, 4:29 AMVxD
08/05/2022, 4:30 AMVxD
08/05/2022, 5:18 AMdagster.yaml
now properly lists
sensors:
use_threads: true
num_workers: 8
yet each tick of the run pipeline sensor only processes one pipeline.rex
08/05/2022, 5:24 AMVxD
08/05/2022, 5:25 AMDAGSTER_HOME
folder of the Daemon and the value is shown in Dagit's Status>Configuration page.VxD
08/05/2022, 5:26 AMrex
08/05/2022, 5:28 AMVxD
08/05/2022, 5:31 AMpstree -a
/app/.runtimes/python-3.10.0-2/bin/poetry run dagster-daemon run
└─python /app/.runtimes/python-3.10.0-2/bin/dagster-daemon run
└─8*[{python}]
VxD
08/05/2022, 6:01 AM@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
minimum_interval_seconds=10,
default_status=DefaultSensorStatus.RUNNING,
)
def on_graph_success(
context: RunStatusSensorContext,
) -> SkipReason | PipelineRunReaction:
[...]
return PipelineRunReaction(context.dagster_run)
prha
08/05/2022, 6:26 AMVxD
08/05/2022, 6:27 AMVxD
08/05/2022, 6:28 AMVxD
08/05/2022, 11:14 PMprha
08/05/2022, 11:32 PMrun_status_sensor
machinery and run your own sensor that queries the event log for run success events after a particular query… the body of your sensor could then execute your logic for each event that happens.prha
08/05/2022, 11:32 PMVxD
08/05/2022, 11:33 PMprha
08/05/2022, 11:33 PMprha
08/05/2022, 11:34 PMprha
08/05/2022, 11:35 PMVxD
08/05/2022, 11:36 PMVxD
08/05/2022, 11:51 PMRunStatusSensorContext
could contain a list of DagsterRun
instead of just listing one, then we could do the processing in parallel on our end.VxD
08/05/2022, 11:54 PMPipelineRunReaction
on one, you don't list it on the next tick.
That would allow you to have a transition period during which the sensor context would fill both dagster_run
with the last run that matched the event, and dagster_runs
which lists all that match, allowing people to migrate.VxD
08/05/2022, 11:56 PMVxD
08/23/2022, 2:05 AMVxD
08/23/2022, 2:08 AMStephen Bailey
08/23/2022, 1:36 PMrun_status_sensor
, and it caused our workflows to get out of sync by several hours (and was pretty unclear to me why). our situation was resolved by removing minimum_interval_seconds
@VxD this is what our sensors looked like previously and it worked fine at our scale (not huge)
@sensor(job=my_yielded_job)
def my_job_checking_sensor(context):
job_name_to_check = "foo_job"
time_window_start = datetime.now() - timedelta(seconds=1800)
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=job_name_to_check,
statuses=[PipelineRunStatus.SUCCESS, PipelineRunStatus.FAILURE],
updated_after=time_window_start,
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(run_key=str(run_record.storage_id))
VxD
08/23/2022, 2:25 PMVxD
08/23/2022, 2:31 PMprha
08/23/2022, 8:26 PMprha
08/23/2022, 8:27 PMDagster Bot
08/23/2022, 8:27 PMVxD
08/23/2022, 11:41 PM