https://dagster.io/ logo
#dagster-support
Title
# dagster-support
m

Mark Fickett

04/20/2022, 4:46 PM
Is it possible to extend the
@op
decorator to introduce some common setup code? I don't think a resource can do it since I want access to the op's
context
(and a single-process executor wouldn't call it for each op). I tried making a wrapping decorator but I get errors from Dagster's inspection of the argument lists:
Copy code
def my_op(*args, **kwargs):
    """Dagster @op that opens a custom context manager based on some shared state."""
    def wrapper(func):
        @op(*args, **kwargs)
        def raw_op(context, *args, **kwrags): # my wrapper that does a little extra work
            with my_custom_setup(context):
                func(context, *args, **kwargs) # the original function that would normally be decorated with @op
        return raw_op
    return wrapper
a

alex

04/20/2022, 4:57 PM
I get errors from Dagster’s inspection of the argument lists
if you use
wraps
/
update_wrapper
it may get the function metadata corrected https://docs.python.org/3/library/functools.html#functools.wraps
ty thankyou 1
m

Mark Fickett

04/21/2022, 12:27 PM
That worked great, for posterity I ended up with:
Copy code
def my_op(*decorator_args, **decorator_kwargs):
    def wrapper(func):
        @op(*decorator_args, **decorator_kwargs)
        @wraps(func)
        # If the wrapped op is missing a context argument, this will get an error like
        # "raw_op() missing 1 required positional argument: 'context'". But we need
        # the context below, and want Dagster to pass args according to the wrapped
        # functions args so just require all @otel_ops to take context.
        def raw_op(context, *func_args, **func_kwargs):
            with do_my_custom_setup(context):
                return func(context, *func_args, **func_kwargs)
        return raw_op
    return wrapper
This is mostly working, but when I have a
@my_op
that yields multiple outputs, I'm having trouble. I wanted the decorated
func
to be inside the
do_my_custom_setup
context manager. That works fine with `@my_op`s with a single return value; if I look at the stack I see
raw_op
in the call stack. But when I
yield
, if I look at the stack within my decorated
func
I see iterator management and a
spawn
but I don't see
raw_op
anywhere in my stack. Is there a way I can adjust this so that can have
raw_op
in my call stack for the decorated
func
? This works (single return):
Copy code
-> for event in iterate_with_context(
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/utils/__init__.py(401)iterate_with_context()
-> next_output = next(iterator)
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(65)_coerce_solid_compute_fn_to_iterator()
-> result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  /home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/op_util.py(128)raw_op() <--- where I set my context manager
-> return func(context, *func_args, **func_kwargs)
> /home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/main.py(163)_list_test_ids_by_pipe() <--- my decorated function
But this doesn't work:
Copy code
(Pdb) w
  <string>(1)<module>()
  /usr/lib/python3.9/multiprocessing/spawn.py(116)spawn_main()
-> exitcode = _main(fd, parent_sentinel)
  /usr/lib/python3.9/multiprocessing/spawn.py(129)_main()
-> return self._bootstrap(parent_sentinel)
  /usr/lib/python3.9/multiprocessing/process.py(315)_bootstrap()
-> self.run()
  /usr/lib/python3.9/multiprocessing/process.py(108)run()
-> self._target(*self._args, **self._kwargs)
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/executor/child_process_executor.py(70)_execute_command_in_child_process()
-> for step_event in command.execute():
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/executor/multiprocess.py(82)execute()
-> yield from execute_plan_iterator(
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/api.py(883)__iter__()
-> yield from self.iterator(
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py(87)inner_plan_execution_iterator()
-> for step_event in check.generator(dagster_event_sequence_for_step(step_context)):
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py(232)dagster_event_sequence_for_step()
-> for step_event in check.generator(step_events):
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py(354)core_dagster_event_sequence_for_step()
-> for user_event in check.generator(
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py(70)_step_output_error_checked_user_event_sequence()
-> for user_event in user_event_sequence:
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py(170)execute_core_compute()
-> for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute.py(138)_yield_compute_results()
-> for event in iterate_with_context(
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/utils/__init__.py(401)iterate_with_context()
-> next_output = next(iterator)
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(66)_coerce_solid_compute_fn_to_iterator()
-> for event in _validate_and_coerce_solid_result_to_iterator(result, context, output_defs):
  /home/mfickett/Documents/mfickett/dev/data-pipeline/.direnv/python-3.9.5/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py(86)_validate_and_coerce_solid_result_to_iterator()
-> for event in result:
> /home/mfickett/Documents/mfickett/dev/data-pipeline/orchestration/main.py(264)_decide_whether_to_do_metadata() <--- my decorated function, but no raw_op in its stack
a

alex

04/27/2022, 4:13 PM
context managers and iterators are tricky business. What is happening is in the
return func(...
line you are returning an iterator/generator object and closing the context manager (since return exits that scope). you will have to rewrite things such that you consume the iterator within the scope of the context manager approximately (untested)
Copy code
def raw_op(context, *func_args, **func_kwargs):
            with do_my_custom_setup(context):
                res = func(context, *func_args, **func_kwargs)
                if inspect.isgenerator(res):
                    for x in res:
                        yield x
                else:
                    return res
        return raw_op
m

Mark Fickett

04/27/2022, 4:15 PM
Ah, thanks. That's much less mysterious than I was worried it could be!
I just had to change
return res
to
yield Output(res, "result")
. 👌🏻
2 Views