https://dagster.io/ logo
#ask-community
Title
# ask-community
k

Kevin Otte

03/07/2023, 5:04 PM
hello! wondering what the right way to setup async calls for ops - for context, we are making some async calls to gather features from an internal service and we have something like this:
Copy code
@op(out=DynamicOut())
def feature_names(df: pd.DataFrame):
    for feature in HeuristicRankingFeaturesV1.features:
        yield DynamicOutput((feature, df), mapping_key=feature)


@op
async def compute_feature(item: tuple[str, pd.DataFrame]) -> tuple[str, pd.Series]:

    feature_name, df = item

    features = HeuristicRankingFeaturesV1()
    return (feature_name, await getattr(features, feature_name)(df))
but when we run our test, we run into the following error (🧵 )
Copy code
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/definitions/graph_definition.py:690: in execute_in_process
    return core_execute_in_process(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/execute_in_process.py:77: in core_execute_in_process
    event_list = list(execute_run_iterable)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
    yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1091: in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/executor/in_process.py:38: in execute
    yield from iter(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
    yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:120: in inner_plan_execution_iterator
    for step_event in check.generator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:325: in dagster_event_sequence_for_step
    raise dagster_user_error.user_exception
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py:47: in solid_execution_error_boundary
    yield
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_utils/__init__.py:447: in iterate_with_context
    next_output = next(iterator)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py:106: in gen_from_async_gen
    loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x288a12e20>

    def get_event_loop(self):
        """Get the event loop for the current context.
    
        Returns an instance of EventLoop or raises an exception.
        """
        if (self._local._loop is None and
                not self._local._set_called and
                threading.current_thread() is threading.main_thread()):
            self.set_event_loop(self.new_event_loop())
    
        if self._local._loop is None:
>           raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
E           RuntimeError: There is no current event loop in thread 'MainThread'.

../../../.pyenv/versions/3.9.12/lib/python3.9/asyncio/events.py:642: RuntimeError
r

rex

03/07/2023, 5:08 PM
We don’t currently have support for an async executor: https://github.com/dagster-io/dagster/issues/4041
k

Kevin Otte

03/07/2023, 5:12 PM
@rex is this documentation wrong then? https://docs.dagster.io/_apidocs/ops
Copy code
@op supports async def functions as well, including async generators when yielding multiple events or outputs. Note that async ops will generally be run on their own unless using a custom Executor implementation that supports running them together.
r

rex

03/07/2023, 5:53 PM
@alex / @chris mind chiming in here? I’m unsure if this is expected behavior with using async ops with dynamic outs or if some pre-requisite setup is required
a

alex

03/07/2023, 5:59 PM
hm, looks like something must have set an event loop (looks like
self._local._set_called
is true based on stack trace) in your code so our call fails. Looks like maybe we should update to just use
asyncio.run
k

Kevin Otte

03/07/2023, 6:18 PM
In the meantime, is there anything I can modify to get around this?
a

alex

03/07/2023, 8:00 PM
but when we run our test, we run into the following error
does this only error in test? I think
asyncio.set_event_loop(asyncio.new_event_loop())
should fix, at least in this execute in process case
k

Kevin Otte

03/08/2023, 3:32 PM
we ended up writing our own wrapper for it but it would be great to get a fix out
52 Views