https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Rahul Sharma

12/23/2021, 9:06 AM
hello all, i am working on dagster-celery executor ,i am able to run pipeline by celery-executor of dagster-version==0.13 but same process not working for me of dagster-version==0.12.14 please let me know anyone have any idea . i am following this docs-->https://docs.dagster.io/0.12.14/deployment/guides/celery
d

daniel

12/23/2021, 2:19 PM
Hi Rahul - could you share more details about what didn't work (reproduction steps, the nature of the failure, and the stack traces of any error messages you saw)
r

Rahul Sharma

12/24/2021, 7:15 AM
i saw there is no error ,i think dagster unable to communicate with celery executor (same process is working for me of dagster V==13.12
these all screenshot related dagster-v=13.12
but when i change the dagster 13.12 to 12.14 then there is nothing on celery terminal and no data showing on flower please do help me.
Copy code
from dagster_celery import celery_executor
from dagster import ModeDefinition, default_executors, fs_io_manager, pipeline, solid

celery_mode_defs = [
    ModeDefinition(
        resource_defs={"io_manager": fs_io_manager},
        executor_defs=default_executors + [celery_executor],
    )
]

@solid
def get_name():
    return "dagster"

@solid
def get_name_one():
    return "dagster"

@solid
def get_name_two():
    return "dagster"

@solid
def get_name_three():
    return "dagster"

# @pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [celery_executor])])
@pipeline(mode_defs=celery_mode_defs)
def parallel_job():
    get_name()
    get_name_one()
    get_name_three()
    get_name_two()
@daniel please have a look above
d

daniel

12/24/2021, 1:52 PM
It might take a little longer than normal to respond to your question due to the holidays, but when I'm back at a keyboard I can try to reproduce on 0.12.14 I assume there's some reason you are stuck on on 0.12.14? I don't think there are breaking changes or any migrations required to switch to 0.13
r

Rahul Sharma

12/24/2021, 2:03 PM
ok thanks for your response.
d

daniel

12/24/2021, 9:49 PM
ok, from your screenshots, it looks like your job is still finishing, it's just not using the celery executor? What are you using as your run config in dagit?
you need to set the following config in dagit to tell it to use celery:
Copy code
execution:
  celery:
(that would be true in 0.13.12 too though - are you sure you used the same run config and job code in the two different dagster versions)
r

Rahul Sharma

12/25/2021, 2:02 AM
Copy code
execution:
  celery:
    broker: '<pyamqp://admin:pwd123@0.0.0.0:5672//>' 
    backend: 'rpc://' 
    include: ['my_module']
this is the file i am using but i am not using any custom config.
d

daniel

12/25/2021, 2:23 AM
for dagit to know that it should use the celery executor, you need to specify run config
(in both 0.12.14 and 0.13.12 if you're using the @pipeline decorator)
so you will want to make sure that in dagit you use that config you posted when you launch the run:
Copy code
execution:
  celery:
    broker: '<pyamqp://admin:pwd123@0.0.0.0:5672//>' 
    backend: 'rpc://' 
    include: ['my_module']
r

Rahul Sharma

12/25/2021, 4:07 AM
Copy code
dagster-celery worker start -y celery_config.yaml
i used above command but not working in v=0.12.14 but everything is working in v=0.13.12
do my_module.py will be same for both version ??
can we connect for while i think it would be better ??
d

daniel

12/25/2021, 4:20 AM
can you answer my question above about what run config you set on the run in dagit? Can you paste what you typed into the Launchpad box in the dagit UI? The run config in dagit needs to include the same execution config that you specified when starting the celery workers
r

Rahul Sharma

12/25/2021, 4:21 AM
Copy code
execution:
  celery:
    broker: '<pyamqp://admin:pwd123@0.0.0.0:5672//>' 
    backend: 'rpc://' 
    include: ['my_module']
this one
Copy code
$ dagster-celery worker start -A <http://dagster_celery.app|dagster_celery.app>

    $ docker run -p 5672:5672 rabbitmq:3.8.2

    $ dagit -f celery_pipeline.py

    $ celery flower -A dagster_celery:app --port=5555
i am using these command working on v=.13 but not v=.12
d

daniel

12/25/2021, 4:23 AM
when you launch a run in dagit, there is a box on the Launchpad where you enter configuration for the run. here is a screenshot of it:
in that box, you need to tell it to launch the run in celery by pasting the run configuration there:
Copy code
execution:
  celery:
    broker: '<pyamqp://admin:pwd123@0.0.0.0:5672//>' 
    backend: 'rpc://' 
    include: ['my_module']
https://docs.dagster.io/concepts/configuration/config-schema#dagit says more about how to apply configuration to runs
r

Rahul Sharma

12/25/2021, 4:25 AM
but why it is working on v=.13 whitout doing anything
d

daniel

12/25/2021, 4:26 AM
did you change the code at all between v13 and v12?
the code of your dagster job/pipeline i mean
r

Rahul Sharma

12/25/2021, 4:28 AM
i am using below two codes for v=0.13 and v=0.12
d

daniel

12/25/2021, 4:29 AM
Got it - it's because @job and @pipeline handle defaults differently. The confusing situation you ran into in v12 has been fixed in v13
and less config is required to make it run using celery by default
so if using @job and v13 is an option for you, it has fixed a number of problems and confusing things, including this one
r

Rahul Sharma

12/25/2021, 4:32 AM
so what should i do now ??
d

daniel

12/25/2021, 4:33 AM
I gave you some steps above for how to launch your run in dagit using celery in 0.12.14 - paste your config into the box I posted a screenshot of, then press launch run
or switch to 0.13 (this is my top recommendation if that is an option)
r

Rahul Sharma

12/25/2021, 4:34 AM
got it thanks
is there any way directly load this config file from pipeline script ??
d

daniel

12/25/2021, 1:32 PM
You can use this API to configure the celery executor in code, and the code can load the config from a file: https://docs.dagster.io/concepts/configuration/configured