Tadas Barzdžius
11/18/2021, 12:44 PM@job
def job() -> None:
query, file = check_input_type()
file_path = generate_file(query)
file_path = get_file(file)
process_file(file_path)
Or other way of asking question - how should I merge conditional branches back? Something similar like here: https://docs.prefect.io/core/idioms/conditional.html#merging-branches-in-a-flowsandy
11/18/2021, 6:53 PMimport random
from dagster import Out, Output, job, op
@op(out={"branch_1": Out(is_required=False), "branch_2": Out(is_required=False)})
def branching_op():
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@op
def branch_1_op(input1):
return input1
@op
def branch_2_op(input1):
return input1
@op
def merge(input1):
assert input1 == [1] or input1 == [2]
@job
def branching():
branch_1, branch_2 = branching_op()
r1 = branch_1_op(branch_1)
r2 = branch_2_op(branch_2)
merge([r1, r2])
I created a github discussion to track this for posterity: https://github.com/dagster-io/dagster/discussions/5718Tadas Barzdžius
11/19/2021, 10:20 AM