Hi, I'm testing out the Dagster Celery setup (not ...
# ask-community
s
Hi, I'm testing out the Dagster Celery setup (not k8s) and running into an issue where I can't seem to be able to specify the default Celery configuration in code - I managed to get the worker started and when passing the run config via Dagit playground, it works fine, but when I try to specify the same in code, the configuration is just ignored. Code that I'm using is this (let me know if you need the asset code as well, though I don't see how it would be relevant):
Copy code
partitioned_asset_job = define_asset_job(
    name=f"materialize_{asset.key.to_user_string()}",
    selection=AssetSelection.assets(asset),
    partitions_def=partitions_definition,
    executor_def=celery_executor.configured(
        {"execution": {"config": {"broker": EnvVar("DAGSTER_CELERY_BROKER_URL")}}}
    ),
)
For the service setup, I have docker compose with separate containers for dagit, daemon, user code, celery broker and celery worker. Could anyone point out what I'm doing wrong here?
🙏 1
🌈 1
Also: I couldn't find anything in the documentation mentioning the
executor.configured()
method - if this is the way to set default configuration for executors, then it really needs to be documented.
y
Copy code
executor_def=celery_executor.configured(
        {"broker": EnvVar("DAGSTER_CELERY_BROKER_URL")}
    ),
you should just need the actual config.
execution:config
will be constructed by dagster when you use .configured()
s
What do you mean by 'the actual config'? Where should it be specified?
y
i meant you dont need the
{"execution": {"config":
bits and you just need the config fields
s
Tried out this version, but it didn't work - doesn't even attempt to write to Celery:
Copy code
partitioned_asset_job = define_asset_job(
    name=f"materialize_{asset.key.to_user_string()}",
    selection=AssetSelection.assets(asset),
    partitions_def=partitions_definition,
    executor_def=celery_executor.configured(
        {"broker": {"env": "DAGSTER_CELERY_BROKER_URL"}}
    ),
)
What did work was setting it in definitions as the default executor:
Copy code
defs = Definitions(
    assets=assets,
    jobs=jobs,
    schedules=schedules,
    executor=celery_executor.configured(
        {"broker": {"env": "DAGSTER_CELERY_BROKER_URL"}}
    ),
)
Though this is not ideal as we may want to run some jobs not on Celery in the future