https://dagster.io/ logo
#ask-community
Title
# ask-community
v

VxD

08/05/2022, 1:00 AM
Hi Dagster team! We have a run status sensor set up to perform operations on graph success, running every 10s with
minimum_interval_seconds=10
. We noticed that each time the sensor runs, it only processes one succeeded graph, even if 50 have succeeded over the past 10s. This is heavily problematic because the sensor requires 10 minutes to process 60 completed jobs, which doesn't scale when we need to handle hundreds. Is there a way we can get the sensor to process more than one pipeline in one go?
r

rex

08/05/2022, 2:29 AM
What version of Dagster are you running? We improved throughput for the sensor daemon in 0.15.5 with the introduction of a a threadpool in the sensor daemon
v

VxD

08/05/2022, 3:43 AM
@rex This is on Dagster 0.15.8
Each invocation of the run sensor only picks one succeeded job.
I guess we could lower the minimum_interval_seconds to 1, but I'm afraid of the impact on our DB.
This is 100% reproducible. All my pipelines have finished long ago. I can see in the Sensors view in Dagit that the run sensor is being started once every 10s.
Each click on a tick shows that only one pipeline is handled.
r

rex

08/05/2022, 4:29 AM
How have you deployed Dagster? The threadpool feature is currently opt-in, so it’s not enabled by default. If you’re using helm, you can use
.Values.sensors.useThreads
to enable the feature
v

VxD

08/05/2022, 4:30 AM
Oh, let me give it a try immediately. Thanks for the heads up!
@rex Sadly, this does not seem to make any difference. 😞 My
dagster.yaml
now properly lists
Copy code
sensors:
  use_threads: true
  num_workers: 8
yet each tick of the run pipeline sensor only processes one pipeline.
r

rex

08/05/2022, 5:24 AM
is your daemon also reading from that dagster.yaml?
v

VxD

08/05/2022, 5:25 AM
It should? The file is present in the
DAGSTER_HOME
folder of the Daemon and the value is shown in Dagit's Status>Configuration page.
Is there a way I can tell for sure? Logs, maybe?
r

rex

08/05/2022, 5:28 AM
v

VxD

08/05/2022, 5:31 AM
Yes I can see 8 threads in the output of
pstree -a
Copy code
/app/.runtimes/python-3.10.0-2/bin/poetry run dagster-daemon run
    └─python /app/.runtimes/python-3.10.0-2/bin/dagster-daemon run
        └─8*[{python}]
A small detail that is important: this is a sensor that is shared between all our pipelines. It is defined as follows:
Copy code
@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    minimum_interval_seconds=10,
    default_status=DefaultSensorStatus.RUNNING,
)
def on_graph_success(
    context: RunStatusSensorContext,
) -> SkipReason | PipelineRunReaction:
    [...]
    return PipelineRunReaction(context.dagster_run)
p

prha

08/05/2022, 6:26 AM
Yeah, I think the issue here is the fact that we don’t duplicate run_status_sensor invocations even if it’s used for multiple jobs. The sensor will fire just once per interval.
v

VxD

08/05/2022, 6:27 AM
Thanks for looking into it!
This greatly affects our ability to use Dagster at scale. Off the top of your head, can you maybe think of a workaround we could use until a fix is in place, please?
@prha The only (kind of) related issue on Github I was able to find is https://github.com/dagster-io/dagster/issues/8211 Do you need another one or are you tracking this internally? This one is an absolute killer for us. 😥
p

prha

08/05/2022, 11:32 PM
The main thing I can think of is to bypass the
run_status_sensor
machinery and run your own sensor that queries the event log for run success events after a particular query… the body of your sensor could then execute your logic for each event that happens.
You’d have to do your own cursor management and maybe chunking to ensure you don’t hit any timeouts
v

VxD

08/05/2022, 11:33 PM
Oh I wasn't aware there was a way to get succeeded jobs from the event log; that's something to explore on our end yes, thanks!
p

prha

08/05/2022, 11:33 PM
well, there are success events for every run
Should also note that, with enough of a backlog, you might also start to get some bad performance
and might see things falling behind / failing. Just things to keep in mind.
v

VxD

08/05/2022, 11:36 PM
Sounds like we may want to wait for Dagster to do it natively. 😅
As a potentially easy workaround, it would already help a lot if the
RunStatusSensorContext
could contain a list of
DagsterRun
instead of just listing one, then we could do the processing in parallel on our end.
Then if you got a
PipelineRunReaction
on one, you don't list it on the next tick. That would allow you to have a transition period during which the sensor context would fill both
dagster_run
with the last run that matched the event, and
dagster_runs
which lists all that match, allowing people to migrate.
Or keep the existing API and have Dagster kick off one sensor per job in parallel which is probably preferable.
Hi! Has anyone had the chance to look at this? This one is killing us. 😵
🤖 1
Easy way to reproduce the issue: • Create a sensor that starts jobs every 30s • Create a sensor that activates on run success every 1min • Jobs are created faster than they are dequeued by the run sensor 😨
s

Stephen Bailey

08/23/2022, 1:36 PM
Want to call out that we just ran into this as well at Whatnot, when I switched the cursor logic from a "query the run records via GraphQL every 30 seconds" to a more "native" implementation using
run_status_sensor
, and it caused our workflows to get out of sync by several hours (and was pretty unclear to me why). our situation was resolved by removing
minimum_interval_seconds
@VxD this is what our sensors looked like previously and it worked fine at our scale (not huge)
Copy code
@sensor(job=my_yielded_job)
def my_job_checking_sensor(context):
    job_name_to_check = "foo_job"
 
    time_window_start = datetime.now() - timedelta(seconds=1800)
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name=job_name_to_check,
            statuses=[PipelineRunStatus.SUCCESS, PipelineRunStatus.FAILURE],
            updated_after=time_window_start,
        ),
        order_by="update_timestamp",
        ascending=False,
    )
    for run_record in run_records:
        yield RunRequest(run_key=str(run_record.storage_id))
v

VxD

08/23/2022, 2:25 PM
Ah, thanks Stephen! That looks like an elegant work around.
With a run status sensor, if I remove interval seconds on Dagster 0.15.8, it uses the default and de queues one job every 30s, making it even worse. 😔
😬 1
p

prha

08/23/2022, 8:26 PM
Yeah, I think we need to add better support for processing multiple runs per sensor iteration in the run status sensor, but @Stephen Bailey’s workaround of managing your own sensor is a good one. I’ll create a separate issue to track this framework improvement.
@Dagster Bot issue enable run status sensors to process multiple runs per iteration
d

Dagster Bot

08/23/2022, 8:27 PM
v

VxD

08/23/2022, 11:41 PM
Thanks for looking into it! Dagster has been working great for us so far! Getting it to scale horizontally is definitely an interesting challenge.
2 Views