Hello, I am trying to use an async multi-asset, bu...
# ask-community
p
Hello, I am trying to use an async multi-asset, but I am hitting a weird behavior where the overall async generator seems to stop after only one output:
Copy code
from dagster import AssetOut, Output, multi_asset

async def make_outputs():
    yield Output(1, output_name="A")
    yield Output(2, output_name="B")

@multi_asset(
    outs={"A": AssetOut(), "B": AssetOut()},
    can_subset=True,
)
async def test_asset(context):
    async for v in make_outputs():
        <http://context.log.info|context.log.info>(v.output_name)
        yield v
This shows:
Copy code
STEP_START Started execution of step "test_asset".
INFO A
STEP_OUTPUT Yielded output "A" of type "Any". (Type check passed).
ASSET_MATERIALIZATION Materialized value A.
HANDLED_OUTPUT Handled output "A" using IO manager "io_manager"
INFO op 'test_asset' did not fire outputs {'B'}
STEP_FAILURE dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "test_asset" did not return an output for non-optional output "B"
Any thoughts on what is happening here and why both outputs are not correctly produced? Thanks!
@alex I am wondering if it could be related to https://github.com/dagster-io/dagster/pull/12785/files. The Python docs for
asyncio.run
indicate
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.
This function cannot be called when another asyncio event loop is running in the same thread.
If debug is
True
, the event loop will be run in debug mode.
False
disables debug mode explicitly.
None
is used to respect the global Debug Mode settings.
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
and I think the issue here is the 'finalizing asynchronous generators', as well as the fact that it would create a new event loop. This means that for a multi-asset, it would close the event loop in between each output as well which would prevent using async clients correctly as well. Ideally, a single event loop would be used for all of the outputs produced by an asset, what do you think?
s
Hi @peay - I don't think we currently support
async
functions that are asset or op defs - have you used this successfully with Dagster before?
p
Hello @sandy - I have not used it before but I assumed from https://github.com/dagster-io/dagster/issues/2268#issuecomment-819072543 that it was supported. It seems to work fine with a single output, and I suspect it might have been working with multiple outputs as well before
gen_from_async_gen
switched from
run_until_complete
to
asyncio.run
(although I haven't checked)
s
Ah - I am misinformed here. @alex might be able to provide more info when he gets back from vacation on Monday.
đź‘Ť 1
a
yea I am able to repro and i this looks like the correct assessment. That “finalizing asynchronous generators” bit is good to know.