https://dagster.io/ logo
a

alir

06/12/2020, 5:42 PM
I'm trying out the migration to v0.8 now and wanted to seek some clarifications. when running dagster in a k8s cluster with celery workers, if I call
execute_pipeline
or
execute_pipeline_iterator
, which worker is the child pipeline executed in? In the same worker? Or will it be enqueued as a new job in the broker queue, to be picked up later by another worker? I'm assuming it's the former.
And assuming I am right, then what is a sound way of being able to just launch new pipelines as new jobs in the broker, to be picked up by any available worker, now that
RemoteDagitRunLauncher
is on the deprecation path?
n

nate

06/12/2020, 5:44 PM
by default all execution step tasks are enqueued on a “dagster” celery queue, and if you have a few workers picking up tasks from that queue they’ll pick up and execute those tasks
you’re using the
celery-k8s
executor?
a

alir

06/12/2020, 5:44 PM
yea
n

nate

06/12/2020, 5:45 PM
yeah we’re going to be working on documentation for this soon, but take a look at https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery/dagster_celery/tasks.py#L75 if you want to see what happens on each worker
👍 1
(this function is executed on the worker when it picks up a task)
a

alir

06/12/2020, 5:46 PM
i was using
RemoteDagitRunLauncher
mostly because I wanted the parent pipeline to launch the new pipelines and then exit immediately, without needing to wait for the child pipelines to complete. As I understand it,
execute_pipeline
is synchronous and
execute_pipeline_iterator
requires the parent to handle the events through the returned iterator for the pipeline to execute. but I might be mistaken in my mental model
n

nate

06/12/2020, 5:46 PM
and this is what happens on the run master https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery/dagster_celery/engine.py#L36 each step in the execution plan is submitted to the celery queue via
_submit_task_k8s_job
a

alir

06/12/2020, 5:46 PM
thanks for the link! i'll go take a look now
oh sorry, i'm using
celery
and not
celery-k8s
. i misread. but your links still have the appropriate references so it's fine
n

nate

06/12/2020, 5:49 PM
yeah, the main difference between the two is that with
celery
, step execution will happen in-process in the workers
with
celery-k8s
it will happen in launched K8s Jobs which are created by the workers
a

alir

06/12/2020, 5:50 PM
ah. so then
execute_pipeline
in the
celery
executor will execute the child pipeline's steps in the same process ?
n

nate

06/12/2020, 5:52 PM
you’re calling
execute_pipeline
from another pipeline?
a

alir

06/12/2020, 5:53 PM
yep
n

nate

06/12/2020, 5:53 PM
yeah it depends on your pipeline run config when you invoke
execute_pipeline
if you specify
celery
there it should kick off execution and push new steps out to the celery queue
a

alir

06/12/2020, 6:07 PM
got it. I just took some time to look at the
execute_pipeline
logic. Aside from the use of
RemoteDagitRunLauncher
, how would I launch new pipelines with asynchronous behavior? That is, I want pipeline A to launch pipeline B, and I want A to exit as soon as it's done with all its steps. I don't need it to wait around until B is done.
a

alex

06/12/2020, 6:31 PM
This is a use case we haven’t really designed for, but “launching” the run is what you are looking for, though it may be a bit cumbersome to do at the moment. A
DagsterInstance
has a
RunLauncher
. You can configure it via
dagster.yaml
in
$DAGSTER_HOME
or use the default one which launches in a subprocess. For your case you may want to write a custom one that works for your constraints. From within a pipeline execution, you can theoretically call
context.instance.launch_run
from within a solid, though satisfying that API may prove challenging
a

alir

06/12/2020, 6:33 PM
got it. i'm working on that at the moment, and i'm just working my around the code base trying to figure out the details around
external_pipeline
.
a

alex

06/12/2020, 6:34 PM
that is indeed the tricky part, and honestly i think that API is likely to change over the coming weeks since
ExternalPipeline
isn’t quite the right object to require there
but for now lets seeeeee
a

alir

06/12/2020, 6:35 PM
well, i think the silver lining is i get to peek under the hood
a

alex

06/12/2020, 6:36 PM
well actually
if you write your own
RunLauncher
- you can just ignore that argument
i dont believe we have any type checks against it outside of our
RunLauncher
implementations
a

alir

06/12/2020, 6:38 PM
ah i'm afraid if i do write my own runlauncher, it'll start resembling the
RemoteDagitRunLauncher
. In my mental model, if my solid is executing within a celery worker, and I want to launch a new run, the solid needs to be told "hey, there's a dagit instance here. use it to launch this new run".
a

alex

06/12/2020, 6:40 PM
how are you deploying celery?
a

alir

06/12/2020, 6:40 PM
in a kubernetes cluster. i've dagit running in a pod, and celery running in other pods, and they are communicating over a rabbitmq broker
a

alex

06/12/2020, 6:41 PM
did you use our
helm
chart or do it all your self?
a

alir

06/12/2020, 6:41 PM
and the dagit instance is a long-running instance
i used the helm chart as a guide but did it by myself
a

alex

06/12/2020, 6:44 PM
if you got to
/instance
in dagit (or click the “Instance Details” button) what does it have set for
RunLauncher
currently?
a

alir

06/12/2020, 6:46 PM
Copy code
Run Launcher:
     module: dagster.core.launcher.cli_api_run_launcher
     class: CliApiRunLauncher
     config:
       {}
(In 0.7.15, I used
RemoteDagitRunLauncher
in
dagster.yaml
and the pipelines were being launched as expected. In the process of migrating to 0.8, i temporarily removed the Runlauncher configuration from
dagster.yaml
)
a

alex

06/12/2020, 6:48 PM
cool cool - ya now dagit execution goes through this machinery every time so the dagit box needs to have this one (or I guess the
K8sRunLauncher
) set to successfully execute
theoretically - you could set a forked
RemoteDagitRunLauncher
that didn’t require
external_pipeline
in the
dagster.yaml
that gets mounted in to the
celery
boxes and then in solid execution
context.instance.launch_run(run_id=run_id, external_pipeline=None)
should work
a

alir

06/12/2020, 6:50 PM
hmmmmmmmm
a

alex

06/12/2020, 6:51 PM
alternatively you could just make a GraphQL request to the
dagit
box from your solid
a

alir

06/12/2020, 6:52 PM
that's an interesting idea! let me think about this
a

alex

06/12/2020, 6:53 PM
i think thats probably the way to go
a

alir

06/12/2020, 6:54 PM
okay this was very helpful! thanks! i have something to chew on now. i'll report back
a

alex

06/12/2020, 6:55 PM
you could also exec
dagster-graphql -p launchPipelineExecution  -v <json encoded variables> --remote <dagit hostname>
you can crack open the websocket section of the network inspector in dagit and click the
launch
button to see an example of what the input structure looks like
a

alir

06/12/2020, 6:58 PM
oh that's very useful, that i can figure out what the query needs to look by inspecting dagit. i'm contemplating between graphql and a forked RemoteDagitRunLauncher
a

alex

06/12/2020, 6:59 PM
all the
RemoteDagitRunLauncher
is doing is making the same GraphQL call so doing it directly is just skipping unneeded steps
a

alir

06/12/2020, 10:34 PM
i'm happy to report back that the graphql approach works great. I'm just issuing a POST to http://dagit-url/graphql with the
LAUNCH_PIPELINE_EXECUTION_MUTATION
query and with the appropriate executionParameters set. this approach might be leaking the abstraction a little bit, since I have to peek underneath the dagster hood. but it's a way forward and I think I can improve upon this. thanks for the suggestion!
❤️ 1
a

alex

06/12/2020, 11:01 PM
awesome - glad it works. Dagit was always meant to be a GraphQL server that could be used for programatic API access so I wouldn’t consider anything about this approach wrong.
keep us posted on any improvements you make or insights you have