Mark Fickett
04/20/2022, 4:46 PM@op
decorator to introduce some common setup code? I don't think a resource can do it since I want access to the op's context
(and a single-process executor wouldn't call it for each op). I tried making a wrapping decorator but I get errors from Dagster's inspection of the argument lists:
def my_op(*args, **kwargs):
"""Dagster @op that opens a custom context manager based on some shared state."""
def wrapper(func):
@op(*args, **kwargs)
def raw_op(context, *args, **kwrags): # my wrapper that does a little extra work
with my_custom_setup(context):
func(context, *args, **kwargs) # the original function that would normally be decorated with @op
return raw_op
return wrapper
alex
04/20/2022, 4:57 PMI get errors from Dagster’s inspection of the argument listsif you use
wraps
/ update_wrapper
it may get the function metadata corrected
https://docs.python.org/3/library/functools.html#functools.wrapsMark Fickett
04/21/2022, 12:27 PMdef my_op(*decorator_args, **decorator_kwargs):
def wrapper(func):
@op(*decorator_args, **decorator_kwargs)
@wraps(func)
# If the wrapped op is missing a context argument, this will get an error like
# "raw_op() missing 1 required positional argument: 'context'". But we need
# the context below, and want Dagster to pass args according to the wrapped
# functions args so just require all @otel_ops to take context.
def raw_op(context, *func_args, **func_kwargs):
with do_my_custom_setup(context):
return func(context, *func_args, **func_kwargs)
return raw_op
return wrapper
@my_op
that yields multiple outputs, I'm having trouble. I wanted the decorated func
to be inside the do_my_custom_setup
context manager. That works fine with `@my_op`s with a single return value; if I look at the stack I see raw_op
in the call stack. But when I yield
, if I look at the stack within my decorated func
I see iterator management and a spawn
but I don't see raw_op
anywhere in my stack. Is there a way I can adjust this so that can have raw_op
in my call stack for the decorated func
?
This works (single return):
-> for event in iterate_with_context(
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/utils/__init__.py(401)iterate_with_context()
-> next_output = next(iterator)
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(65)_coerce_solid_compute_fn_to_iterator()
-> result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
/home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/op_util.py(128)raw_op() <--- where I set my context manager
-> return func(context, *func_args, **func_kwargs)
> /home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/main.py(163)_list_test_ids_by_pipe() <--- my decorated function
But this doesn't work:
(Pdb) w
<string>(1)<module>()
/usr/lib/python3.9/multiprocessing/spawn.py(116)spawn_main()
-> exitcode = _main(fd, parent_sentinel)
/usr/lib/python3.9/multiprocessing/spawn.py(129)_main()
-> return self._bootstrap(parent_sentinel)
/usr/lib/python3.9/multiprocessing/process.py(315)_bootstrap()
-> self.run()
/usr/lib/python3.9/multiprocessing/process.py(108)run()
-> self._target(*self._args, **self._kwargs)
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/executor/child_process_executor.py(70)_execute_command_in_child_process()
-> for step_event in command.execute():
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/executor/multiprocess.py(82)execute()
-> yield from execute_plan_iterator(
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/api.py(883)__iter__()
-> yield from self.iterator(
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py(87)inner_plan_execution_iterator()
-> for step_event in check.generator(dagster_event_sequence_for_step(step_context)):
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py(232)dagster_event_sequence_for_step()
-> for step_event in check.generator(step_events):
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py(354)core_dagster_event_sequence_for_step()
-> for user_event in check.generator(
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py(70)_step_output_error_checked_user_event_sequence()
-> for user_event in user_event_sequence:
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py(170)execute_core_compute()
-> for step_output in _yield_compute_results(step_context, inputs, compute_fn):
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py(138)_yield_compute_results()
-> for event in iterate_with_context(
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/utils/__init__.py(401)iterate_with_context()
-> next_output = next(iterator)
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(66)_coerce_solid_compute_fn_to_iterator()
-> for event in _validate_and_coerce_solid_result_to_iterator(result, context, output_defs):
/home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(86)_validate_and_coerce_solid_result_to_iterator()
-> for event in result:
> /home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/main.py(264)_decide_whether_to_do_metadata() <--- my decorated function, but no raw_op in its stack
alex
04/27/2022, 4:13 PMreturn func(...
line you are returning an iterator/generator object and closing the context manager (since return exits that scope).
you will have to rewrite things such that you consume the iterator within the scope of the context manager
approximately (untested)
def raw_op(context, *func_args, **func_kwargs):
with do_my_custom_setup(context):
res = func(context, *func_args, **func_kwargs)
if inspect.isgenerator(res):
for x in res:
yield x
else:
return res
return raw_op
Mark Fickett
04/27/2022, 4:15 PMreturn res
to yield Output(res, "result")
. 👌🏻