hello! wondering what the right way to setup async...
# ask-community
hello! wondering what the right way to setup async calls for ops - for context, we are making some async calls to gather features from an internal service and we have something like this:
Copy code
def feature_names(df: pd.DataFrame):
    for feature in HeuristicRankingFeaturesV1.features:
        yield DynamicOutput((feature, df), mapping_key=feature)

async def compute_feature(item: tuple[str, pd.DataFrame]) -> tuple[str, pd.Series]:

    feature_name, df = item

    features = HeuristicRankingFeaturesV1()
    return (feature_name, await getattr(features, feature_name)(df))
but when we run our test, we run into the following error (🧵 )
Copy code
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/definitions/graph_definition.py:690: in execute_in_process
    return core_execute_in_process(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/execute_in_process.py:77: in core_execute_in_process
    event_list = list(execute_run_iterable)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
    yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1091: in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/executor/in_process.py:38: in execute
    yield from iter(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
    yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:120: in inner_plan_execution_iterator
    for step_event in check.generator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:325: in dagster_event_sequence_for_step
    raise dagster_user_error.user_exception
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py:47: in solid_execution_error_boundary
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_utils/__init__.py:447: in iterate_with_context
    next_output = next(iterator)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py:106: in gen_from_async_gen
    loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x288a12e20>

    def get_event_loop(self):
        """Get the event loop for the current context.
        Returns an instance of EventLoop or raises an exception.
        if (self._local._loop is None and
                not self._local._set_called and
                threading.current_thread() is threading.main_thread()):
        if self._local._loop is None:
>           raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
E           RuntimeError: There is no current event loop in thread 'MainThread'.

../../../.pyenv/versions/3.9.12/lib/python3.9/asyncio/events.py:642: RuntimeError
We don’t currently have support for an async executor: https://github.com/dagster-io/dagster/issues/4041
@rex is this documentation wrong then? https://docs.dagster.io/_apidocs/ops
Copy code
@op supports async def functions as well, including async generators when yielding multiple events or outputs. Note that async ops will generally be run on their own unless using a custom Executor implementation that supports running them together.
@alex / @chris mind chiming in here? I’m unsure if this is expected behavior with using async ops with dynamic outs or if some pre-requisite setup is required
hm, looks like something must have set an event loop (looks like
is true based on stack trace) in your code so our call fails. Looks like maybe we should update to just use
In the meantime, is there anything I can modify to get around this?
but when we run our test, we run into the following error
does this only error in test? I think
should fix, at least in this execute in process case
we ended up writing our own wrapper for it but it would be great to get a fix out