Hey all, Is it possible to provide optional inputs...
# ask-community
f
Hey all, Is it possible to provide optional inputs to a graph? Basically looking for something to hook into a graph of graphs that has some conditional OPs
Copy code
@graph(
    description="Optional sub-graph",
    tags={"dagster-k8s/config": k8s_tags()},
)
def merge_variants_graph(
    optional_input_1: Optional[Any],
    optional_input_2: Optional[Any]
) -> Any:
    merge_op([optional_input_1, optional_input_2])
Doing this throws the following error:
Copy code
Operation name: PipelineExplorerRootQuery

Message: 'Optional.Any'

Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","inputDefinitions",0,"type"]

Locations: [{"line":238,"column":5}]
Update: The actual input can be
None
but dagster cannot handle the
Optional
type annotation on a graph input
p
Can you include the definition of
merge_op
? The following works for me:
Copy code
@op
def show_number(context, number: Optional[Int]):
    if number is None:
        <http://context.log.info|context.log.info>("No number")

    else:
        <http://context.log.info|context.log.info>(f"Number = {number}")


@graph
def my_graph(optional_input_1: Optional[Int], optional_input_2: Optional[Int]):
    show_number.alias('one')(optional_input_1)
    show_number.alias('two')(optional_input_2)
f
For sure, let me minimize the example a bit more real quick
This is somewhat what I have
Copy code
@op(
    description="Optional",
    out={"optional_output": Out(Any, is_required=False)},
)
def yield_optional(optional_input: Any) -> Any:
    if optional_input is not None:
        yield Output(optional_input, "optional_output")

@op(
    description="Merges",
)
def merge_op(context: OpExecutionContext, vcfs: list[Path]):
    for vcf in vcfs:
        <http://context.log.info|context.log.info>(vcf)

@graph(
    description="Merges multiple variant calling pipeline VCFs into a single MAF file output.",
    tags={"dagster-k8s/config": k8s_tags()},
)
def merge_variants_graph(
    vcf1: Optional[Path],
    vcf2: Optional[Path]
):
    merge_op([yield_optional(vcf1), yield_optional(vcf2)])
Copy code
vcf1: Optional[Path],
    vcf2: Optional[Path]
Removing that type annotation allows dagit to pick up the graph. Otherwise I get the graphql error.
p
I was able to load the following (swapped
Int
instead of
Path
to simplify):
Copy code
@op(out={"optional_output": Out(Any, is_required=False)})
def yield_optional(optional_input: Any) -> Any:
    if optional_input is not None:
        yield Output(optional_input, "optional_output")

@op( description="Merges")
def merge_op(context, vcfs: List[Int]):
    for vcf in vcfs:
        <http://context.log.info|context.log.info>(vcf)

@graph
def merge_variants_graph(vcf1: Optional[Int], vcf2: Optional[Int]):
    merge_op([yield_optional(vcf1), yield_optional(vcf2)])
f
Let me try
Int
Weird - still getting
Copy code
Operation name: PipelineExplorerRootQuery

Message: '<http://Optional.Int|Optional.Int>'

Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","inputDefinitions",0,"type"]

Locations: [{"line":239,"column":5}]
I think this is more related to how I'm calling the graph with optional inputs then. Might be able to work with that - thanks!
p
Sure, let me know if there’s an example I can help dig into!
f
Thanks, I think i'm running into some confusion because ops/graphs can conditionally output optional data, but cannot take these conditional outputs as python optional inputs (A dagster optional Out doesn't truly map to a python Optional type)
Instead the graph/op just doesn't run at all
p
right, the optional inputs (vcf1, vcf2) mean the Python type where it’s a value that is an
int
or
None
. Those get mapped to the actual inputs in
yield_optional
, which conditional yield an Output.
Consider the following graph:
Copy code
@graph
def example_graph(vcf1: Optional[Int], vcf2: Optional[Int]):
    one = yield_optional.alias('a')(vcf1)
    two = yield_optional.alias('b')(vcf2)
    process_one(one)
    process_two(two)
where
process_one
/
process_two
are also ops. If
vcf1=3
and
vcf2=None
, then the aliased op “a” and the aliased op “b” will both execute. Aliased op “a” will emit an output, and aliased op “b” will not. Then,
process_one
will execute and
process_two
will skip.
f
Makes sense, which means I should probably use the fan-in paradigm rather than optional inputs if I know the upstream graph/op is production output conditionally
Unless there's some other way to force execution with python
None
when upstream produces no output
p
So, Optional here refers to the value that’s being passed in, but non-required Outs refer to the conditional branching: https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#conditional-branching
I think if you wanted to force the downstream execution, you would
yield Output(None)
in the
else
case…
Copy code
@op
def yield_optional(optional_input: Any) -> Any:
    if optional_input is not None:
        yield Output(optional_input, "optional_output")
    else:
        # still yield an output, it will just be None
        yield Output(optional_input, "optional_output")
f
Oh, good call!