ok, i’ve tested it, it does not support that: B an...
# announcements
x
ok, i’ve tested it, it does not support that: B and C will not execute unless A has yielded all those outputs. if we could support that, it would be a huge win. as Python’s asyncio under the hood is yielding, i think technically it is possible. it just needs a bit more of tweak.
d
Hi Xu, which executor are you using for this?
x
Just the default one @daniel
d
I would need to double check this, but I bet the multi process executor would do what you want here
x
the way i tested it is to sleep 5 seconds before yielding the
foobar
Copy code
@solid(
    output_defs=[
        OutputDefinition(name="foo", dagster_type=int),
        OutputDefinition(name="bar", dagster_type=int),
        OutputDefinition(name="foobar", dagster_type=int),
    ],
)
def test(_):
    yield Output(value=1, output_name="foo")
    yield Output(value=2, output_name="bar")
    
    time.sleep(5)
    yield Output(value=3, output_name="foobar")
am i doing it wrongly?
d
Are you using the multiprocess executor? I thought you said you were using the default (single process) one
x
let me try multiprocess executor
so i chose to use multiprocess executor
Copy code
@pipeline(
    mode_defs=[
        ModeDefinition(
            name="multiprocess",
            executor_defs=[multiprocess_executor],
        )
    ]
)
def simple_pipeline():
dagster.yaml
Copy code
execution:
  multiprocess:
    config:
      max_concurrent: 6

storage:
  filesystem:
still waiting for A to complete before B and C can actually work. not sure if it is a configuration issue.
d
Ah sorry, I may have misremembered :(
x
So does that mean we don’t support what I described here? :(
a
Ya we currently don’t support the ability to operate on outputs before the solid/step is in a “success” state. The case to consider is what happens if
test
throws an exception after
foo
and
bar
have yielded but before
foobar
. If we allow downstream nodes to proceed on the already yielded outputs,
test
is in this
partial failure
state which we would need to model correctly. We have a lot of product surface area that operates against the current clear distinction between failure and success. Tweaking the engine code to start the downstream nodes is the easy part, making the rest of the experience coherent is the hard part.
x
Understood! Another question: is anyone working on asyncio support? https://github.com/dagster-io/dagster/issues/2268 it is about asyncio support
Copy code
@solid(...)
async def fancy():
  x = await do_stuff()
  yield Output(x)
a
not actively at the moment but should be easier now that we are dropping python 2 support
x
If you would guess a ship date, what would that date be? March 2021?
a
by then or likely sooner
x
hey @alex, dumb question, if we have multiple dagster instances across multiple machines, will all solids belong to 1 pipeline get executed on the same machine, or can they be distributed across multiple machines?
a
the choice/implementation of the
executor
allows you to control how solids are processed within a run
for example the
CeleryExecutor
will submit each solid/step to celery queues with the expectation that the workers have access to the same code so they can execute specific solids
x
got it; 1. my use case is not doing any big data processing stuff, so if each solid only takes like 1~10 seconds (calling some API), and the input/output are only small Python dictionary/string, and for pipelines that have < 10 solids, would it makes more sense just to use an in-process executor for the pipeline instead of distributing it and causing additional overhead? 2. i see we can config the concurrency for a multiprocess executor, but is they any similar config for control how many pipelines a dagster instance can execute concurrently?
d
2) pipeline-level concurrency is one of the big new features in 0.10.0 !
x
2) nice, does it mean that currently (0.9) each dagster instance can only execute 1 pipeline at a given time?
a
no its currently unbounded
x
got it!
since async support is not in the roadmap yet but a dealbreaker for me, i have been trying to add
async
support to dagster to support my use case: all solids of a pipeline get executed within 1 process, and all solids are async I’ve tried two approaches, besides requiring changes in solid decorator and a new executor, both seemed work: # 1 approach: change the whole call path starting from
execute_pipeline
to async, and change all normal generators to async generators, and run
execute_pipeline
inside an event loop; # 2 approach: bubble up the async generator/awaitable to the place where we execute steps of the same level in a DAG, and run those async steps within an event loop, collecting the results and then yield like other normal events. the # 1 approach changes a lot more files as
async
is infectious. the # 2 approach only changes files that need to handle the bubbled async generator/awaitable. both approaches do not take the consideration of dagster as a whole but more my urgent use case. I’m wondering if I could get guidance from you guys to see which approach is preferable if it were you to implement it, or if there is a better way to do it. @alex @daniel @sashank
😻 1
d
Hi Xu, I just wanted to confirm one thing - does it specifically need to use async? if there was an executor that ran each solid in its own thread in a single process, that wouldn't work?
x
@daniel multithreading executor should help a lot too, but we do not have it right now right?
d
we don't, no - but adding one wouldn't require making any changes to the rest of the dagster codebase I don't think
x
in my use case, each solid does a lot of I/O, which is why our current code is using a lot of async — but i guess multithreading should achieve the same (besides the overhead brought by threading). may i ask what would be the ETA of that support? if I need to do it myself, I guess I should look into the multiprocess executor and start from there? @daniel
a
ya you would write a new
Executor
subclass and
@executor
/
ExecutorDefinition
to attach to your
ModeDefinition
it should work pretty similar to https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/core/executor/multiprocess.py
for
async
i think its going to look like approach #1, which as you noticed would be a very sprawling change so would be more difficult as an external contributor to pull off
there may be something clever you can do to have your custom executor drive
async
stuff without threads (and without changing dagster core) - im not sure exactly how that would work though
x
@alex i tried to do that in approach #2 without touching the rest but that is really hard as the @solid does not expose the original
fn
that gets wrapped, all we have is a wrapped generator
compute_fn
. If the original function is preserved inside the SolidDefinition, it will open more possibility and flexibility.
i wanted to made a PR to preserve it but i did not really have a good reason to justify it so i gave up; not sure if this conversation could make that happen. @alex
a
if you want something more light weight than a PR you could make a
gist
from
git diff
or something and add it to https://github.com/dagster-io/dagster/issues/2268
x
sure @alex. what i was saying is that in order to do some clever stuff you mentioned w/o touching the rest, it might require the solid definition to preserve the original function wrapped
Ping....
a
im sorry, whats the open question remaining here?
x
Hey Alex, you mentioned that there might be a clever way to handle async functions within an executor without touching the rest part of the dagster; my question is that how is that possible if we don’t have access to the original functions wrapped by @solid.
a
That was just speculation - I don’t know if it could actually work. Roughly what i was thinking was something like: * set up an executor to drive an asyncio event loop and make it available using something like a process global * have a custom decorator that wraps your
async
functions to turn them in to plain function solids, the async -> normal function wrapper drives the async function by interacting with the event loop defined in the executor
x
I agree with the first bullet, for the second bullet, my point was that if @solid can preserve a copy of the original function that it wraps, we might do not even need to add a new decorator, since we can access those original functions from the step context.
a
ya you are right but I believe that is an invasive change that we are unlikely to do any faster than proper async/await support. We are heads down on our planned features for the 0.10.0 release for the remainder of the year.
maybe its not as bad as I anticipate - you could give it a shot and send a PR for it turns out to be reasonable
x
cool thank you
https://github.com/dagster-io/dagster/pull/3340 @alex hey alex, good Friday. I made a PR to address this with detail mentioned.
1
@alex morning Alex, when you feel bored, would you mind give the gist a glance to see whether that is the proper way to do that? If so, im going to productionaze it..
a
looks reasonable to me - I don’t see anything that stands out as problematic
you’ll have to test it to be sure
x
got it, thank you!