fahad
05/06/2022, 8:36 PM@graph(
description="Optional sub-graph",
tags={"dagster-k8s/config": k8s_tags()},
)
def merge_variants_graph(
optional_input_1: Optional[Any],
optional_input_2: Optional[Any]
) -> Any:
merge_op([optional_input_1, optional_input_2])
Doing this throws the following error:
Operation name: PipelineExplorerRootQuery
Message: 'Optional.Any'
Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","inputDefinitions",0,"type"]
Locations: [{"line":238,"column":5}]
None
but dagster cannot handle the Optional
type annotation on a graph inputprha
05/06/2022, 8:55 PMmerge_op
?
The following works for me:
@op
def show_number(context, number: Optional[Int]):
if number is None:
<http://context.log.info|context.log.info>("No number")
else:
<http://context.log.info|context.log.info>(f"Number = {number}")
@graph
def my_graph(optional_input_1: Optional[Int], optional_input_2: Optional[Int]):
show_number.alias('one')(optional_input_1)
show_number.alias('two')(optional_input_2)
fahad
05/06/2022, 9:04 PM@op(
description="Optional",
out={"optional_output": Out(Any, is_required=False)},
)
def yield_optional(optional_input: Any) -> Any:
if optional_input is not None:
yield Output(optional_input, "optional_output")
@op(
description="Merges",
)
def merge_op(context: OpExecutionContext, vcfs: list[Path]):
for vcf in vcfs:
<http://context.log.info|context.log.info>(vcf)
@graph(
description="Merges multiple variant calling pipeline VCFs into a single MAF file output.",
tags={"dagster-k8s/config": k8s_tags()},
)
def merge_variants_graph(
vcf1: Optional[Path],
vcf2: Optional[Path]
):
merge_op([yield_optional(vcf1), yield_optional(vcf2)])
vcf1: Optional[Path],
vcf2: Optional[Path]
Removing that type annotation allows dagit to pick up the graph. Otherwise I get the graphql error.prha
05/06/2022, 9:19 PMInt
instead of Path
to simplify):
@op(out={"optional_output": Out(Any, is_required=False)})
def yield_optional(optional_input: Any) -> Any:
if optional_input is not None:
yield Output(optional_input, "optional_output")
@op( description="Merges")
def merge_op(context, vcfs: List[Int]):
for vcf in vcfs:
<http://context.log.info|context.log.info>(vcf)
@graph
def merge_variants_graph(vcf1: Optional[Int], vcf2: Optional[Int]):
merge_op([yield_optional(vcf1), yield_optional(vcf2)])
fahad
05/06/2022, 9:19 PMInt
Operation name: PipelineExplorerRootQuery
Message: '<http://Optional.Int|Optional.Int>'
Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","inputDefinitions",0,"type"]
Locations: [{"line":239,"column":5}]
prha
05/06/2022, 9:27 PMfahad
05/06/2022, 9:28 PMprha
05/06/2022, 9:32 PMint
or None
. Those get mapped to the actual inputs in yield_optional
, which conditional yield an Output.@graph
def example_graph(vcf1: Optional[Int], vcf2: Optional[Int]):
one = yield_optional.alias('a')(vcf1)
two = yield_optional.alias('b')(vcf2)
process_one(one)
process_two(two)
where process_one
/ process_two
are also ops.
If vcf1=3
and vcf2=None
, then the aliased op “a” and the aliased op “b” will both execute. Aliased op “a” will emit an output, and aliased op “b” will not. Then, process_one
will execute and process_two
will skip.fahad
05/06/2022, 9:38 PMNone
when upstream produces no outputprha
05/06/2022, 9:39 PMyield Output(None)
in the else
case…
@op
def yield_optional(optional_input: Any) -> Any:
if optional_input is not None:
yield Output(optional_input, "optional_output")
else:
# still yield an output, it will just be None
yield Output(optional_input, "optional_output")
fahad
05/06/2022, 9:45 PM