peay
04/06/2023, 8:55 AMfrom dagster import AssetOut, Output, multi_asset
async def make_outputs():
yield Output(1, output_name="A")
yield Output(2, output_name="B")
@multi_asset(
outs={"A": AssetOut(), "B": AssetOut()},
can_subset=True,
)
async def test_asset(context):
async for v in make_outputs():
<http://context.log.info|context.log.info>(v.output_name)
yield v
This shows:
STEP_START Started execution of step "test_asset".
INFO A
STEP_OUTPUT Yielded output "A" of type "Any". (Type check passed).
ASSET_MATERIALIZATION Materialized value A.
HANDLED_OUTPUT Handled output "A" using IO manager "io_manager"
INFO op 'test_asset' did not fire outputs {'B'}
STEP_FAILURE dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "test_asset" did not return an output for non-optional output "B"
Any thoughts on what is happening here and why both outputs are not correctly produced? Thanks!peay
04/06/2023, 12:42 PMasyncio.run
indicate
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.
This function cannot be called when another asyncio event loop is running in the same thread.
If debug is, the event loop will be run in debug mode.True
disables debug mode explicitly.False
is used to respect the global Debug Mode settings.None
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.and I think the issue here is the 'finalizing asynchronous generators', as well as the fact that it would create a new event loop. This means that for a multi-asset, it would close the event loop in between each output as well which would prevent using async clients correctly as well. Ideally, a single event loop would be used for all of the outputs produced by an asset, what do you think?
sandy
04/06/2023, 5:47 PMasync
functions that are asset or op defs - have you used this successfully with Dagster before?peay
04/07/2023, 7:52 AMgen_from_async_gen
switched from run_until_complete
to asyncio.run
(although I haven't checked)sandy
04/07/2023, 3:17 PMalex
04/10/2023, 6:01 PMalex
04/10/2023, 7:02 PM