Saulius Beinorius
06/21/2023, 12:57 PMpartitioned_asset_job = define_asset_job(
name=f"materialize_{asset.key.to_user_string()}",
selection=AssetSelection.assets(asset),
partitions_def=partitions_definition,
executor_def=celery_executor.configured(
{"execution": {"config": {"broker": EnvVar("DAGSTER_CELERY_BROKER_URL")}}}
),
)
For the service setup, I have docker compose with separate containers for dagit, daemon, user code, celery broker and celery worker.
Could anyone point out what I'm doing wrong here?Saulius Beinorius
06/21/2023, 1:46 PMexecutor.configured()
method - if this is the way to set default configuration for executors, then it really needs to be documented.yuhan
06/21/2023, 5:41 PMexecutor_def=celery_executor.configured(
{"broker": EnvVar("DAGSTER_CELERY_BROKER_URL")}
),
you should just need the actual config. execution:config
will be constructed by dagster when you use .configured()Saulius Beinorius
06/22/2023, 7:19 AMyuhan
06/22/2023, 4:20 PM{"execution": {"config":
bits and you just need the config fieldsSaulius Beinorius
06/26/2023, 7:58 AMpartitioned_asset_job = define_asset_job(
name=f"materialize_{asset.key.to_user_string()}",
selection=AssetSelection.assets(asset),
partitions_def=partitions_definition,
executor_def=celery_executor.configured(
{"broker": {"env": "DAGSTER_CELERY_BROKER_URL"}}
),
)
Saulius Beinorius
06/26/2023, 7:59 AMdefs = Definitions(
assets=assets,
jobs=jobs,
schedules=schedules,
executor=celery_executor.configured(
{"broker": {"env": "DAGSTER_CELERY_BROKER_URL"}}
),
)
Though this is not ideal as we may want to run some jobs not on Celery in the future