How can I conditionally skip ops? I’d like to skip...
# ask-community
b
How can I conditionally skip ops? I’d like to skip down stream ops if my extract op doesn’t return any data.
c
Hi Bennett. You can specify an optional output and yield output objects from within the upstream op. If the input to a downstream op is not yielded from within the upstream op, dagster will skip execution of the downstream op. Some untested code:
Copy code
@op(out={"my_output": Out(is_required=False)})
def upstream_op():
    if should_return:
         yield Output(5, output_name="my_output")
b
Great thank you! This worked when the downstream nodes are ops but not when they are subgraphs. Dagit gave me a KeyError when trying to launch the run:
Copy code
KeyError: 'result'
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/grpc/impl.py", line 91, in core_execute_run
    yield from execute_run_iterator(
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/execution/api.py", line 878, in __iter__
    yield from self.execution_context_manager.prepare_context()
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/utils/__init__.py", line 465, in generate_setup_events
    obj = next(self.generator)
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/execution/context_creation_pipeline.py", line 321, in orchestration_context_event_generator
    context_creation_data = create_context_creation_data(
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/execution/context_creation_pipeline.py", line 139, in create_context_creation_data
    resource_keys_to_init=get_required_resource_keys_to_init(
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/execution/resources_init.py", line 341, in get_required_resource_keys_to_init
    get_required_resource_keys_for_step(pipeline_def, step, execution_plan)
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/execution/resources_init.py", line 395, in get_required_resource_keys_for_step
    output_def = solid_def.output_def_named(step_output.name)
  File "/Users/apple/opt/anaconda3/envs/pudl-usage-metrics/lib/python3.10/site-packages/dagster/core/definitions/node_definition.py", line 124, in output_def_named
    return self._output_dict[name]
Is it possible to skip downstream sub graphs?
c
Dagster will flatten each graph to build a flat input/output mapping between all the ops at the top level and in nested graphs. This means that any output you provide in an op that is passed to a graph must be passed as an input to an op within that graph. As long as you do this, skipping ops will still work as expected, for example:
Copy code
@op(out={"my_output": Out(is_required=False)})
def upstream_op():
    if False:
        yield Output(5, output_name="my_output")


@op
def downstream_op(my_input):
    return my_input


@graph
def my_graph(my_input):
    downstream_op(my_input)


@job
def my_job():
    my_graph(upstream_op())
In the example, downstream op is always skipped because its input is not yielded.
👍 1
b
Fantastic thank you!