https://dagster.io/ logo
d

dhume

09/30/2020, 7:18 PM
Can I get some help configuring a custom celery executor?
I have my Celery app configured for sqs and dynamo
Copy code
config = {
    "broker": f"sqs://{aws_access_key}:{aws_secret_key}@",
    "backend": f"dynamodb://{aws_access_key}:{aws_secret_key}@us-east-1/{DYNAMO_TABLE}",
    "task_default_queue": SQS_QUEUE,
    "task_queues": {"test": {"queue": SQS_QUEUE}},
}

app = Celery("dagster", **config)
Using this as a celery worker outside of dagster it’s working fine. I’m just having trouble getting it to run within dagster. Reading through the docus I’m getting lost where to configure where between
celery.yaml
,
pipeline_run.yaml
and the execution config. I can get the worker started doing
Copy code
dagster-celery worker start --name worker_1 --app app
Launching the pipeline (the celery example) I set the broker and backend in the execution config. The tasks are sent to the broker but not to the backend. I’m not sure what I’m missing. Also am I forcing extra steps? It seems cumbersome to have to write out the full broker and backend in the execution config
m

max

09/30/2020, 7:20 PM
fab to see you doing this
can we see the relevant chunk of execution config as well
d

dhume

09/30/2020, 7:21 PM
Copy code
storage:
  filesystem:

execution:
  celery:
    config:
      broker: <sqs://xxx:xxx@>
      backend: <dynamodb://xxx:xxx@us-east-1/dagster-celery>
With xxx being my actual keys
m

max

09/30/2020, 7:28 PM
i've never used the dynamo backend, does it require any other config?
cc @cat
d

dhume

09/30/2020, 7:31 PM
It just needs
backend
. What I set in my celery app are
broker
,
backend
,
task_queues
and
task_default_queue
m

max

09/30/2020, 7:31 PM
and have you tried starting the dagster-celery worker with
-y
and pointing it to a yaml file containing that config
i don't think you need the boilerplate above and the
--app
flag
d

dhume

09/30/2020, 7:48 PM
I just tried with supplying the
celery.yaml
I think I’m running into some problems with the default queue being used. But I’m also just confused on the role of the
celery.yaml
if all the broker and backend and everything are defined within the celery in the
app.py
that is being used by the dagster celery worker cli command and then the connection is made via the execution config
m

max

09/30/2020, 7:49 PM
the celery.yaml is the preferred way to provide overrides
i can point you to where in the code these overrides are applied if you like
what issues did you see with the default queue?
d

dhume

09/30/2020, 7:57 PM
I though I was setting my default queue to
dagster-celery
but then I saw that SQS was creating a new queue and the messages were just remaining there
I’ll keep playing around with this but may have some more questions later. I can also close an issue I brought up in discussions https://github.com/dagster-io/dagster/discussions/2960. I didn’t see where I could close it myself
thanks
c

cat

09/30/2020, 8:01 PM
with respect to “It seems cumbersome to have to write out the full broker and backend in the execution config”, we are working on merging the executor + run launcher concepts and then the celery config will likely be set on the instance and override-able in the playground / run config
👍 1
d

dhume

09/30/2020, 8:02 PM
That’s exciting
😄 1
Also for anyone curious I got this working, the missing piece to bring it all together was specifying the queue with the
--queue
flag when launching the worker
😅 1
m

max

10/01/2020, 10:56 PM
how could we improve the docs to make this clearer
d

dhume

10/02/2020, 11:52 AM
I’m mainly chalking this up to me being dumb and getting confused on the differences in default queues created and listening between celery and dagster
m

max

10/02/2020, 1:37 PM
i feel like we should have an example of this setup though
d

dhume

10/02/2020, 1:39 PM
If it would be helpful I’m happy including my AWS setup somewhere. Not sure where in the docs it would be most helpful
m

max

10/02/2020, 1:45 PM
can you send it over and i'll integrate it into the docs?
d

dhume

10/02/2020, 1:47 PM
K. I can throw together a gist
https://gist.github.com/dehume-drizly/6c01cfcfd700e61eeb8238f208a61cb7 I’m still POC with this but just wanted to use an SQS queue that already exists
3 Views