https://dagster.io/ logo
#announcements
Title
# announcements
a

alir

06/09/2020, 4:24 PM
Are there any primitives in dagster I can use to respond to celery worker failures and restarts?
I have dagster set up in a k8s environment, with dagit passing jobs to celery workers with rabbitmq. Just as a test, I simulated a failure by killing and restarting the celery pod while a long-running solid was being executed by the worker.
The outcome of this test was: dagit thinks the pipeline is still running and the new celery worker pod did not re-execute the solid.
Ideally, I would use dagster primitives to seamlessly restart failed jobs. But I wouldn't mind building them if need be. I just wanted to know what the recommendation was. (I couldn't find past chats on slack on this topic)
m

max

06/09/2020, 5:29 PM
@nate
a

alir

06/09/2020, 5:34 PM
i'm thinking maybe the correct way to do this is at the celery/rabbitmq level, to make sure celery doesn't ack a task message in the broker until the task is complete. i don't know what the celery defaults are and i don't know if dagster changes the defaults, so I'm looking into that at the moment
if my reading about the celery documentation is right, I think a combination of enabling
task_reject_on_worker_lost
and
task_acks_late
would address this: https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_acks_late
but i can't pass these arguments to celery via dagster, can I ?
m

max

06/09/2020, 6:09 PM
you should be able to pass celery arguments through the dagster engine config
a

alir

06/09/2020, 6:09 PM
ah lemme take a look
m

max

06/09/2020, 6:10 PM
it's the
config_source
argument to
CeleryConfig
an awful name, but there's inconsistent naming in the underlying celery apis
i think there's certain kinds of retry/acks late behavior you should be able to configure at the celery level
there's a broader issue which is that right now there's no heartbeating for dagster execution steps, so across the various execution paths there are ways to get a pipeline into an eternally running state that probably shouldn't exist
but yes the intention with the celery engine is to give you as much flexibility as possible when configuring the underlying celery cluster
a

alir

06/09/2020, 6:21 PM
ok
config_source
works perfectly. I saw it before but it just slipped my mind. and enabling the two celery options makes this test case pass. of course, there are implications here: all solids must now be idempotent and we'll have to make sure any side effects will not kill us but it's a good practice to do that anyway.
re: heartbeats and eternal pipelines, perhaps one mitigation would be to at least have a cli command to terminate all currently running pipelines. that could be a sledgehammer that one could use in the entrypoint script of a dagster container to kill all currently running pipelines before starting a new one. but maybe it's a bit too coarse
i'll be interested in discussions around this topic, especially with the upcoming 0.8 release
in any case, thanks for your assist!
m

max

06/09/2020, 6:53 PM
yep, though with a variety of execution engines, we will also need to persist information about how to go find the pipelines we want to terminate, etc
i suspect we'll be looking at this as we plan for 0.9.0
so starting next week 🙂