wangm23456
06/03/2021, 1:21 PMimport random
from dagster import Output, OutputDefinition, pipeline, solid, composite_solid
@solid(
output_defs=[
OutputDefinition(name="branch_1", is_required=False),
OutputDefinition(name="branch_2", is_required=False),
]
)
def branching_solid_test(context):
num = 0
<http://context.log.info|context.log.info>(str(num))
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid
def branch_1_solid(_input):
return 1
@solid
def branch_2_solid(_input):
return 2
@solid
def s1_solid(_input):
return 1
@solid
def s2_solid(_input):
return 2
@composite_solid
def solid1(_input):
s = branch_1_solid(_input)
s = s1_solid(_input)
return s
@composite_solid
def solid2(_input):
s = branch_2_solid(_input)
s = s2_solid(_input)
return s
@solid
def final(_s):
return _s
@pipeline
def branching_pipeline():
branch_1, branch_2 = branching_solid_test()
s = solid1(branch_1)
s = solid2(branch_2)
final(s)