What's the best way of launching a pipeline throug...
# announcements
d
What's the best way of launching a pipeline through code in a nonblocking way? Both
execute_pipeline
and
execute_pipeline_iterator
seem to always block, and I tried copying how the CLI does it (https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/cli/pipeline.py#L405-L413) but both my code and the cli are throwing this:
Copy code
$ DAGSTER_HOME=tmp/dagster dagster pipeline launch my_pipeline -y pipelines/repository.yaml -e pipelines/my_pipeline_config.yaml 
Traceback (most recent call last):
  File "/home/danny/.pyenv/versions/project-3.7.7/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/cli/__init__.py", line 38, in main
    cli(obj={})  # pylint:disable=E1123
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/core/telemetry.py", line 231, in wrap
    result = f(*args, **kwargs)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/cli/pipeline.py", line 417, in pipeline_launch_command
    return instance.launch_run(pipeline_run.run_id)
  File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 788, in launch_run
    return self._run_launcher.launch_run(self, run)
AttributeError: 'NoneType' object has no attribute 'launch_run'
In case it matters: I haven't configured a dagster.yaml file anywhere. If it's a
run_launcher
config that I'm missing, what should it be if I'm running my code or the cli on the same machine where dagit is running on localhost?
Copy code
$ dagster --version
dagster, version 0.7.12
a
whats the context? A
run_launcher
is probably the right answer but we likely don’t have one that satisfies your constraints yet (were working on it at the moment).
m
@sashank ^^ if we had
launch_run
this would be straightforward
d
Thanks. @alex adding a
run_launcher
config to dagster.yaml will fix the exception? If so, even if it doesn't give me the nonblocking behaviour I want, what should it be for targeting a local dagit instance? This doesn't work:
Copy code
$ cat dagster.yaml
run_launcher:
  module: dagster.core.launcher
  class: RunLauncher
RE working on a way to launch via code in a nonblocking way, is there already a github issue where I can track this work?
a
what should it be for targeting a local dagit instance?
Thats a piece we dont really have right now, but are working on. Can you talk more about what exactly you are doing? You may be able to just use
threading
or
multiprocessing
libraries in python to move your
execute_pipeline
call to make it non-blocking depending on your setup.
d
I'm trying to get a solid to launch a sibling pipeline, which should run in parallel to the current one, and this solid should immediately cede control back to its own pipeline so it can continue
@alex threading/multiprocess is a good workaround for the nonblocking issue, thanks. RE targeting local dagit, should I look into setting up and targeting a local graphql endpoint via
dagster_graphql.launcher.RemoteDagitRunLauncher
config for the CLI?
a
that could work in the interim - that dagit would have to stay up and it cant be the same one you initiated the initial launch from due to weird flask bug with making a request to yourself
its worth trying for now
d
My dagit would be up permanently.
it cant be the same one you initiated the initial launch from
@alex just to clarify, you mean if my dagit is executing a solid and that solid is the one that starts a new thread/process to
execute_pipeline()
the sibling pipeline via the localhost graphql endpoint, I'll hit this flask bug?
a
I think the only real issue is hitting the
Launch Run
button in dagit if you are pointing at yourself - but keep an eye out for weird timeouts trying to reach dagit server
d
@alex ok, thanks so much for the help! Really solid project, I'm very much enjoying my current deep dive into it 👍 👍
a
and just to be clear - I would consider this particular approach risky and likely difficult to migrate forward since we are actively changing how these systems work.
but i am curious if it works
d
@alex I just noticed the tutorial shows clearly that a local cli can execute a pipeline in a local dagit server: https://docs.dagster.io/docs/tutorial/basics#executing-our-first-pipeline. So I'm a bit confused. I guess I was looking at the wrong command (
pipeline launch
instead of
pipeline execute
)? Why would I need graphql at all if I can just call
execute_pipeline()
in my code via threading/multiprocessing and get local nonblocking execution?
a
a local cli can execute a pipeline in a local dagit server
This CLI doesn’t go through
dagit
- the cli command runs
execute_pipeline
in process
d
Ahh. Gotcha 🙂
a
dagit
is a webserver - so when you click
Start Execution
in the web tool a request is sent to the server and it will create a subprocess and effectively execute
execute_pipeline
in there
d
Right, makes sense
a
a
RunLauncher
is used to transfer responsibility of execution to some other process / system
so the difference between threading / multiprocessing / suprocess.Popen vs using
launch_run
is who owns the process doing execution and all the related complexity of that processes life cycle
d
Yep. So graphql seems like the way to go here. It lets me run a central dagit webserver that'll accept requests and won't care if they come from a local or remote cli.
a
ya the
RemoteDagitRunLauncher
is a way to say “there is dagit running at
$WEB_ADDRESS
go tell it to do this pipeline execution”
you’ll need to make sure that youre using the same
DagsterInstance
across the board https://docs.dagster.io/docs/deploying/instance
👍 1
d
we are actively changing how these systems work
Change here means we're tweaking the API, or do you mean you're potentially phasing out graphql and/or planning to make some other major architectural change to everything you've just described above?
Don't mind tweaking my code to call new APIs, but would rather not bake in an entire architecture that is orthogonal to where the project is going...
a
well theres currently nothing besides
pipeline_name
used to communicate that we are infact talking about the same pipeline when we do hand-offs so a lot of the changes are introducing structure necessary to improve that situation
we’ve discuessed removign
RemoteDagitRunLauncher
but
RunLauncher
is meant to be a user extendable class so you could always implement your own variant. But we are hoping to write better ones - ie a background daemon responsible for processes instead of a web server - so it might just be a replacement situation
dagit
and
dagster-graphql
are not going anywhere
but a lot in flux so just wanted to give you fair warning
d
Got it, thanks
w
@Danny I had a similar use case and am launching pipelines with a graphql request like this https://gist.github.com/w-bonelli/ac73d9430236b1947a4a202ad1143392
👍 1
🙏 1
❤️ 1