I'm trying out the migration to v0.8 now and wante...
# announcements
a
I'm trying out the migration to v0.8 now and wanted to seek some clarifications. when running dagster in a k8s cluster with celery workers, if I call
execute_pipeline
or
execute_pipeline_iterator
, which worker is the child pipeline executed in? In the same worker? Or will it be enqueued as a new job in the broker queue, to be picked up later by another worker? I'm assuming it's the former.
And assuming I am right, then what is a sound way of being able to just launch new pipelines as new jobs in the broker, to be picked up by any available worker, now that
RemoteDagitRunLauncher
is on the deprecation path?
n
by default all execution step tasks are enqueued on a “dagster” celery queue, and if you have a few workers picking up tasks from that queue they’ll pick up and execute those tasks
you’re using the
celery-k8s
executor?
a
yea
n
yeah we’re going to be working on documentation for this soon, but take a look at https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery/dagster_celery/tasks.py#L75 if you want to see what happens on each worker
👍 1
(this function is executed on the worker when it picks up a task)
a
i was using
RemoteDagitRunLauncher
mostly because I wanted the parent pipeline to launch the new pipelines and then exit immediately, without needing to wait for the child pipelines to complete. As I understand it,
execute_pipeline
is synchronous and
execute_pipeline_iterator
requires the parent to handle the events through the returned iterator for the pipeline to execute. but I might be mistaken in my mental model
n
and this is what happens on the run master https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery/dagster_celery/engine.py#L36 each step in the execution plan is submitted to the celery queue via
_submit_task_k8s_job
a
thanks for the link! i'll go take a look now
oh sorry, i'm using
celery
and not
celery-k8s
. i misread. but your links still have the appropriate references so it's fine
n
yeah, the main difference between the two is that with
celery
, step execution will happen in-process in the workers
with
celery-k8s
it will happen in launched K8s Jobs which are created by the workers
a
ah. so then
execute_pipeline
in the
celery
executor will execute the child pipeline's steps in the same process ?
n
you’re calling
execute_pipeline
from another pipeline?
a
yep
n
yeah it depends on your pipeline run config when you invoke
execute_pipeline
if you specify
celery
there it should kick off execution and push new steps out to the celery queue
a
got it. I just took some time to look at the
execute_pipeline
logic. Aside from the use of
RemoteDagitRunLauncher
, how would I launch new pipelines with asynchronous behavior? That is, I want pipeline A to launch pipeline B, and I want A to exit as soon as it's done with all its steps. I don't need it to wait around until B is done.
a
This is a use case we haven’t really designed for, but “launching” the run is what you are looking for, though it may be a bit cumbersome to do at the moment. A
DagsterInstance
has a
RunLauncher
. You can configure it via
dagster.yaml
in
$DAGSTER_HOME
or use the default one which launches in a subprocess. For your case you may want to write a custom one that works for your constraints. From within a pipeline execution, you can theoretically call
context.instance.launch_run
from within a solid, though satisfying that API may prove challenging
a
got it. i'm working on that at the moment, and i'm just working my around the code base trying to figure out the details around
external_pipeline
.
a
that is indeed the tricky part, and honestly i think that API is likely to change over the coming weeks since
ExternalPipeline
isn’t quite the right object to require there
but for now lets seeeeee
a
well, i think the silver lining is i get to peek under the hood
a
well actually
if you write your own
RunLauncher
- you can just ignore that argument
i dont believe we have any type checks against it outside of our
RunLauncher
implementations
a
ah i'm afraid if i do write my own runlauncher, it'll start resembling the
RemoteDagitRunLauncher
. In my mental model, if my solid is executing within a celery worker, and I want to launch a new run, the solid needs to be told "hey, there's a dagit instance here. use it to launch this new run".
a
how are you deploying celery?
a
in a kubernetes cluster. i've dagit running in a pod, and celery running in other pods, and they are communicating over a rabbitmq broker
a
did you use our
helm
chart or do it all your self?
a
and the dagit instance is a long-running instance
i used the helm chart as a guide but did it by myself
a
if you got to
/instance
in dagit (or click the “Instance Details” button) what does it have set for
RunLauncher
currently?
a
Copy code
Run Launcher:
     module: dagster.core.launcher.cli_api_run_launcher
     class: CliApiRunLauncher
     config:
       {}
(In 0.7.15, I used
RemoteDagitRunLauncher
in
dagster.yaml
and the pipelines were being launched as expected. In the process of migrating to 0.8, i temporarily removed the Runlauncher configuration from
dagster.yaml
)
a
cool cool - ya now dagit execution goes through this machinery every time so the dagit box needs to have this one (or I guess the
K8sRunLauncher
) set to successfully execute
theoretically - you could set a forked
RemoteDagitRunLauncher
that didn’t require
external_pipeline
in the
dagster.yaml
that gets mounted in to the
celery
boxes and then in solid execution
context.instance.launch_run(run_id=run_id, external_pipeline=None)
should work
a
hmmmmmmmm
a
alternatively you could just make a GraphQL request to the
dagit
box from your solid
a
that's an interesting idea! let me think about this
a
i think thats probably the way to go
a
okay this was very helpful! thanks! i have something to chew on now. i'll report back
a
you could also exec
dagster-graphql -p launchPipelineExecution  -v <json encoded variables> --remote <dagit hostname>
you can crack open the websocket section of the network inspector in dagit and click the
launch
button to see an example of what the input structure looks like
a
oh that's very useful, that i can figure out what the query needs to look by inspecting dagit. i'm contemplating between graphql and a forked RemoteDagitRunLauncher
a
all the
RemoteDagitRunLauncher
is doing is making the same GraphQL call so doing it directly is just skipping unneeded steps
a
i'm happy to report back that the graphql approach works great. I'm just issuing a POST to http://dagit-url/graphql with the
LAUNCH_PIPELINE_EXECUTION_MUTATION
query and with the appropriate executionParameters set. this approach might be leaking the abstraction a little bit, since I have to peek underneath the dagster hood. but it's a way forward and I think I can improve upon this. thanks for the suggestion!
❤️ 1
a
awesome - glad it works. Dagit was always meant to be a GraphQL server that could be used for programatic API access so I wouldn’t consider anything about this approach wrong.
keep us posted on any improvements you make or insights you have