Xu Zhang
11/21/2020, 7:53 PMdaniel
11/21/2020, 11:06 PMXu Zhang
11/22/2020, 1:28 AMdaniel
11/22/2020, 1:29 AMXu Zhang
11/22/2020, 1:46 AMfoobar
@solid(
output_defs=[
OutputDefinition(name="foo", dagster_type=int),
OutputDefinition(name="bar", dagster_type=int),
OutputDefinition(name="foobar", dagster_type=int),
],
)
def test(_):
yield Output(value=1, output_name="foo")
yield Output(value=2, output_name="bar")
time.sleep(5)
yield Output(value=3, output_name="foobar")
daniel
11/22/2020, 2:05 AMXu Zhang
11/22/2020, 2:19 AM@pipeline(
mode_defs=[
ModeDefinition(
name="multiprocess",
executor_defs=[multiprocess_executor],
)
]
)
def simple_pipeline():
execution:
multiprocess:
config:
max_concurrent: 6
storage:
filesystem:
daniel
11/22/2020, 3:03 PMXu Zhang
11/22/2020, 3:17 PMalex
11/23/2020, 3:50 PMtest
throws an exception after foo
and bar
have yielded but before foobar
. If we allow downstream nodes to proceed on the already yielded outputs, test
is in this partial failure
state which we would need to model correctly.
We have a lot of product surface area that operates against the current clear distinction between failure and success. Tweaking the engine code to start the downstream nodes is the easy part, making the rest of the experience coherent is the hard part.Xu Zhang
11/23/2020, 4:57 PM@solid(...)
async def fancy():
x = await do_stuff()
yield Output(x)
alex
11/23/2020, 5:07 PMXu Zhang
11/23/2020, 5:23 PMalex
11/24/2020, 3:50 PMXu Zhang
11/24/2020, 7:55 PMalex
11/24/2020, 7:57 PMexecutor
allows you to control how solids are processed within a runCeleryExecutor
will submit each solid/step to celery queues with the expectation that the workers have access to the same code so they can execute specific solidsXu Zhang
11/24/2020, 8:18 PMdaniel
11/24/2020, 8:20 PMXu Zhang
11/24/2020, 9:13 PMalex
11/24/2020, 9:25 PMXu Zhang
11/24/2020, 9:30 PMasync
support to dagster to support my use case: all solids of a pipeline get executed within 1 process, and all solids are async
I’ve tried two approaches, besides requiring changes in solid decorator and a new executor, both seemed work:
# 1 approach: change the whole call path starting from execute_pipeline
to async, and change all normal generators to async generators, and run execute_pipeline
inside an event loop;
# 2 approach: bubble up the async generator/awaitable to the place where we execute steps of the same level in a DAG, and run those async steps within an event loop, collecting the results and then yield like other normal events.
the # 1 approach changes a lot more files as async
is infectious.
the # 2 approach only changes files that need to handle the bubbled async generator/awaitable.
both approaches do not take the consideration of dagster as a whole but more my urgent use case. I’m wondering if I could get guidance from you guys to see which approach is preferable if it were you to implement it, or if there is a better way to do it. @alex @daniel @sashankdaniel
12/01/2020, 6:27 PMXu Zhang
12/01/2020, 6:31 PMdaniel
12/01/2020, 6:32 PMXu Zhang
12/01/2020, 6:35 PMalex
12/01/2020, 7:12 PMExecutor
subclass and @executor
/ ExecutorDefinition
to attach to your ModeDefinition
it should work pretty similar to https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/core/executor/multiprocess.pyasync
i think its going to look like approach #1, which as you noticed would be a very sprawling change so would be more difficult as an external contributor to pull offasync
stuff without threads (and without changing dagster core) - im not sure exactly how that would work thoughXu Zhang
12/01/2020, 7:31 PMfn
that gets wrapped, all we have is a wrapped generator compute_fn
. If the original function is preserved inside the SolidDefinition, it will open more possibility and flexibility.alex
12/01/2020, 7:53 PMgist
from git diff
or something and add it to https://github.com/dagster-io/dagster/issues/2268Xu Zhang
12/01/2020, 8:08 PMalex
12/03/2020, 3:47 PMXu Zhang
12/03/2020, 4:46 PMalex
12/03/2020, 5:00 PMasync
functions to turn them in to plain function solids, the async -> normal function wrapper drives the async function by interacting with the event loop defined in the executorXu Zhang
12/03/2020, 5:12 PMalex
12/03/2020, 5:30 PMXu Zhang
12/03/2020, 5:45 PMalex
12/10/2020, 6:53 PMXu Zhang
12/10/2020, 6:57 PM