Can I get some help configuring a custom celery ex...
# announcements
d
Can I get some help configuring a custom celery executor?
I have my Celery app configured for sqs and dynamo
Copy code
config = {
    "broker": f"sqs://{aws_access_key}:{aws_secret_key}@",
    "backend": f"dynamodb://{aws_access_key}:{aws_secret_key}@us-east-1/{DYNAMO_TABLE}",
    "task_default_queue": SQS_QUEUE,
    "task_queues": {"test": {"queue": SQS_QUEUE}},
}

app = Celery("dagster", **config)
Using this as a celery worker outside of dagster it’s working fine. I’m just having trouble getting it to run within dagster. Reading through the docus I’m getting lost where to configure where between
celery.yaml
,
pipeline_run.yaml
and the execution config. I can get the worker started doing
Copy code
dagster-celery worker start --name worker_1 --app app
Launching the pipeline (the celery example) I set the broker and backend in the execution config. The tasks are sent to the broker but not to the backend. I’m not sure what I’m missing. Also am I forcing extra steps? It seems cumbersome to have to write out the full broker and backend in the execution config
m
fab to see you doing this
can we see the relevant chunk of execution config as well
d
Copy code
storage:
  filesystem:

execution:
  celery:
    config:
      broker: <sqs://xxx:xxx@>
      backend: <dynamodb://xxx:xxx@us-east-1/dagster-celery>
With xxx being my actual keys
m
i've never used the dynamo backend, does it require any other config?
cc @cat
d
It just needs
backend
. What I set in my celery app are
broker
,
backend
,
task_queues
and
task_default_queue
m
and have you tried starting the dagster-celery worker with
-y
and pointing it to a yaml file containing that config
i don't think you need the boilerplate above and the
--app
flag
d
I just tried with supplying the
celery.yaml
I think I’m running into some problems with the default queue being used. But I’m also just confused on the role of the
celery.yaml
if all the broker and backend and everything are defined within the celery in the
app.py
that is being used by the dagster celery worker cli command and then the connection is made via the execution config
m
the celery.yaml is the preferred way to provide overrides
i can point you to where in the code these overrides are applied if you like
what issues did you see with the default queue?
d
I though I was setting my default queue to
dagster-celery
but then I saw that SQS was creating a new queue and the messages were just remaining there
I’ll keep playing around with this but may have some more questions later. I can also close an issue I brought up in discussions https://github.com/dagster-io/dagster/discussions/2960. I didn’t see where I could close it myself
thanks
c
with respect to “It seems cumbersome to have to write out the full broker and backend in the execution config”, we are working on merging the executor + run launcher concepts and then the celery config will likely be set on the instance and override-able in the playground / run config
👍 1
d
That’s exciting
😄 1
Also for anyone curious I got this working, the missing piece to bring it all together was specifying the queue with the
--queue
flag when launching the worker
😅 1
m
how could we improve the docs to make this clearer
d
I’m mainly chalking this up to me being dumb and getting confused on the differences in default queues created and listening between celery and dagster
m
i feel like we should have an example of this setup though
d
If it would be helpful I’m happy including my AWS setup somewhere. Not sure where in the docs it would be most helpful
m
can you send it over and i'll integrate it into the docs?
d
K. I can throw together a gist
https://gist.github.com/dehume-drizly/6c01cfcfd700e61eeb8238f208a61cb7 I’m still POC with this but just wanted to use an SQS queue that already exists