https://dagster.io/ logo
x

Xu Zhang

11/21/2020, 7:53 PM
ok, i’ve tested it, it does not support that: B and C will not execute unless A has yielded all those outputs. if we could support that, it would be a huge win. as Python’s asyncio under the hood is yielding, i think technically it is possible. it just needs a bit more of tweak.
d

daniel

11/21/2020, 11:06 PM
Hi Xu, which executor are you using for this?
x

Xu Zhang

11/22/2020, 1:28 AM
Just the default one @daniel
d

daniel

11/22/2020, 1:29 AM
I would need to double check this, but I bet the multi process executor would do what you want here
x

Xu Zhang

11/22/2020, 1:46 AM
the way i tested it is to sleep 5 seconds before yielding the
foobar
Copy code
@solid(
    output_defs=[
        OutputDefinition(name="foo", dagster_type=int),
        OutputDefinition(name="bar", dagster_type=int),
        OutputDefinition(name="foobar", dagster_type=int),
    ],
)
def test(_):
    yield Output(value=1, output_name="foo")
    yield Output(value=2, output_name="bar")
    
    time.sleep(5)
    yield Output(value=3, output_name="foobar")
am i doing it wrongly?
d

daniel

11/22/2020, 2:05 AM
Are you using the multiprocess executor? I thought you said you were using the default (single process) one
x

Xu Zhang

11/22/2020, 2:19 AM
let me try multiprocess executor
so i chose to use multiprocess executor
Copy code
@pipeline(
    mode_defs=[
        ModeDefinition(
            name="multiprocess",
            executor_defs=[multiprocess_executor],
        )
    ]
)
def simple_pipeline():
dagster.yaml
Copy code
execution:
  multiprocess:
    config:
      max_concurrent: 6

storage:
  filesystem:
still waiting for A to complete before B and C can actually work. not sure if it is a configuration issue.
d

daniel

11/22/2020, 3:03 PM
Ah sorry, I may have misremembered :(
x

Xu Zhang

11/22/2020, 3:17 PM
So does that mean we don’t support what I described here? :(
a

alex

11/23/2020, 3:50 PM
Ya we currently don’t support the ability to operate on outputs before the solid/step is in a “success” state. The case to consider is what happens if
test
throws an exception after
foo
and
bar
have yielded but before
foobar
. If we allow downstream nodes to proceed on the already yielded outputs,
test
is in this
partial failure
state which we would need to model correctly. We have a lot of product surface area that operates against the current clear distinction between failure and success. Tweaking the engine code to start the downstream nodes is the easy part, making the rest of the experience coherent is the hard part.
x

Xu Zhang

11/23/2020, 4:57 PM
Understood! Another question: is anyone working on asyncio support? https://github.com/dagster-io/dagster/issues/2268 it is about asyncio support
Copy code
@solid(...)
async def fancy():
  x = await do_stuff()
  yield Output(x)
a

alex

11/23/2020, 5:07 PM
not actively at the moment but should be easier now that we are dropping python 2 support
x

Xu Zhang

11/23/2020, 5:23 PM
If you would guess a ship date, what would that date be? March 2021?
a

alex

11/24/2020, 3:50 PM
by then or likely sooner
x

Xu Zhang

11/24/2020, 7:55 PM
hey @alex, dumb question, if we have multiple dagster instances across multiple machines, will all solids belong to 1 pipeline get executed on the same machine, or can they be distributed across multiple machines?
a

alex

11/24/2020, 7:57 PM
the choice/implementation of the
executor
allows you to control how solids are processed within a run
for example the
CeleryExecutor
will submit each solid/step to celery queues with the expectation that the workers have access to the same code so they can execute specific solids
x

Xu Zhang

11/24/2020, 8:18 PM
got it; 1. my use case is not doing any big data processing stuff, so if each solid only takes like 1~10 seconds (calling some API), and the input/output are only small Python dictionary/string, and for pipelines that have < 10 solids, would it makes more sense just to use an in-process executor for the pipeline instead of distributing it and causing additional overhead? 2. i see we can config the concurrency for a multiprocess executor, but is they any similar config for control how many pipelines a dagster instance can execute concurrently?
d

daniel

11/24/2020, 8:20 PM
2) pipeline-level concurrency is one of the big new features in 0.10.0 !
x

Xu Zhang

11/24/2020, 9:13 PM
2) nice, does it mean that currently (0.9) each dagster instance can only execute 1 pipeline at a given time?
a

alex

11/24/2020, 9:25 PM
no its currently unbounded
x

Xu Zhang

11/24/2020, 9:30 PM
got it!
since async support is not in the roadmap yet but a dealbreaker for me, i have been trying to add
async
support to dagster to support my use case: all solids of a pipeline get executed within 1 process, and all solids are async I’ve tried two approaches, besides requiring changes in solid decorator and a new executor, both seemed work: # 1 approach: change the whole call path starting from
execute_pipeline
to async, and change all normal generators to async generators, and run
execute_pipeline
inside an event loop; # 2 approach: bubble up the async generator/awaitable to the place where we execute steps of the same level in a DAG, and run those async steps within an event loop, collecting the results and then yield like other normal events. the # 1 approach changes a lot more files as
async
is infectious. the # 2 approach only changes files that need to handle the bubbled async generator/awaitable. both approaches do not take the consideration of dagster as a whole but more my urgent use case. I’m wondering if I could get guidance from you guys to see which approach is preferable if it were you to implement it, or if there is a better way to do it. @alex @daniel @sashank
😻 1
d

daniel

12/01/2020, 6:27 PM
Hi Xu, I just wanted to confirm one thing - does it specifically need to use async? if there was an executor that ran each solid in its own thread in a single process, that wouldn't work?
x

Xu Zhang

12/01/2020, 6:31 PM
@daniel multithreading executor should help a lot too, but we do not have it right now right?
d

daniel

12/01/2020, 6:32 PM
we don't, no - but adding one wouldn't require making any changes to the rest of the dagster codebase I don't think
x

Xu Zhang

12/01/2020, 6:35 PM
in my use case, each solid does a lot of I/O, which is why our current code is using a lot of async — but i guess multithreading should achieve the same (besides the overhead brought by threading). may i ask what would be the ETA of that support? if I need to do it myself, I guess I should look into the multiprocess executor and start from there? @daniel
a

alex

12/01/2020, 7:12 PM
ya you would write a new
Executor
subclass and
@executor
/
ExecutorDefinition
to attach to your
ModeDefinition
it should work pretty similar to https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/core/executor/multiprocess.py
for
async
i think its going to look like approach #1, which as you noticed would be a very sprawling change so would be more difficult as an external contributor to pull off
there may be something clever you can do to have your custom executor drive
async
stuff without threads (and without changing dagster core) - im not sure exactly how that would work though
x

Xu Zhang

12/01/2020, 7:31 PM
@alex i tried to do that in approach #2 without touching the rest but that is really hard as the @solid does not expose the original
fn
that gets wrapped, all we have is a wrapped generator
compute_fn
. If the original function is preserved inside the SolidDefinition, it will open more possibility and flexibility.
i wanted to made a PR to preserve it but i did not really have a good reason to justify it so i gave up; not sure if this conversation could make that happen. @alex
a

alex

12/01/2020, 7:53 PM
if you want something more light weight than a PR you could make a
gist
from
git diff
or something and add it to https://github.com/dagster-io/dagster/issues/2268
x

Xu Zhang

12/01/2020, 8:08 PM
sure @alex. what i was saying is that in order to do some clever stuff you mentioned w/o touching the rest, it might require the solid definition to preserve the original function wrapped
Ping....
a

alex

12/03/2020, 3:47 PM
im sorry, whats the open question remaining here?
x

Xu Zhang

12/03/2020, 4:46 PM
Hey Alex, you mentioned that there might be a clever way to handle async functions within an executor without touching the rest part of the dagster; my question is that how is that possible if we don’t have access to the original functions wrapped by @solid.
a

alex

12/03/2020, 5:00 PM
That was just speculation - I don’t know if it could actually work. Roughly what i was thinking was something like: * set up an executor to drive an asyncio event loop and make it available using something like a process global * have a custom decorator that wraps your
async
functions to turn them in to plain function solids, the async -> normal function wrapper drives the async function by interacting with the event loop defined in the executor
x

Xu Zhang

12/03/2020, 5:12 PM
I agree with the first bullet, for the second bullet, my point was that if @solid can preserve a copy of the original function that it wraps, we might do not even need to add a new decorator, since we can access those original functions from the step context.
a

alex

12/03/2020, 5:30 PM
ya you are right but I believe that is an invasive change that we are unlikely to do any faster than proper async/await support. We are heads down on our planned features for the 0.10.0 release for the remainder of the year.
maybe its not as bad as I anticipate - you could give it a shot and send a PR for it turns out to be reasonable
x

Xu Zhang

12/03/2020, 5:45 PM
cool thank you
https://github.com/dagster-io/dagster/pull/3340 @alex hey alex, good Friday. I made a PR to address this with detail mentioned.
1
@alex morning Alex, when you feel bored, would you mind give the gist a glance to see whether that is the proper way to do that? If so, im going to productionaze it..
a

alex

12/10/2020, 6:53 PM
looks reasonable to me - I don’t see anything that stands out as problematic
you’ll have to test it to be sure
x

Xu Zhang

12/10/2020, 6:57 PM
got it, thank you!
6 Views