Kevin Otte
03/07/2023, 5:04 PM@op(out=DynamicOut())
def feature_names(df: pd.DataFrame):
for feature in HeuristicRankingFeaturesV1.features:
yield DynamicOutput((feature, df), mapping_key=feature)
@op
async def compute_feature(item: tuple[str, pd.DataFrame]) -> tuple[str, pd.Series]:
feature_name, df = item
features = HeuristicRankingFeaturesV1()
return (feature_name, await getattr(features, feature_name)(df))
but when we run our test, we run into the following error (🧵 )Kevin Otte
03/07/2023, 5:04 PM_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/definitions/graph_definition.py:690: in execute_in_process
return core_execute_in_process(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/execute_in_process.py:77: in core_execute_in_process
event_list = list(execute_run_iterable)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1091: in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/executor/in_process.py:38: in execute
yield from iter(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/api.py:1187: in __iter__
yield from self.iterator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:120: in inner_plan_execution_iterator
for step_event in check.generator(
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py:325: in dagster_event_sequence_for_step
raise dagster_user_error.user_exception
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py:47: in solid_execution_error_boundary
yield
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_utils/__init__.py:447: in iterate_with_context
next_output = next(iterator)
../../../.pyenv/versions/3.9.12/envs/default/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py:106: in gen_from_async_gen
loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x288a12e20>
def get_event_loop(self):
"""Get the event loop for the current context.
Returns an instance of EventLoop or raises an exception.
"""
if (self._local._loop is None and
not self._local._set_called and
threading.current_thread() is threading.main_thread()):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
> raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
E RuntimeError: There is no current event loop in thread 'MainThread'.
../../../.pyenv/versions/3.9.12/lib/python3.9/asyncio/events.py:642: RuntimeError
rex
03/07/2023, 5:08 PMKevin Otte
03/07/2023, 5:12 PM@op supports async def functions as well, including async generators when yielding multiple events or outputs. Note that async ops will generally be run on their own unless using a custom Executor implementation that supports running them together.
rex
03/07/2023, 5:53 PMalex
03/07/2023, 5:59 PMself._local._set_called
is true based on stack trace) in your code so our call fails. Looks like maybe we should update to just use asyncio.run
Kevin Otte
03/07/2023, 6:18 PMalex
03/07/2023, 8:00 PMbut when we run our test, we run into the following errordoes this only error in test? I think
asyncio.set_event_loop(asyncio.new_event_loop())
should fix, at least in this execute in process caseKevin Otte
03/08/2023, 3:32 PMalex
03/08/2023, 5:20 PM