Piotr Danielczyk
06/01/2023, 5:47 PM@op(out={"result": Out(is_required=False)})
def dynamic_op_A():
if random.random() > 0.5:
yield Output(True, "result")
@op(out={"result": Out(is_required=False)})
def dynamic_op_B():
if random.random() > 0.5:
yield Output(True, "result")
@op(ins={"foo": In()})
def optional_op_A(foo):
print(foo)
@op(ins={"foo": In()})
def optional_op_B(foo):
print(foo)
@graph()
def graph_A():
dynamic_result = dynamic_op_A()
optional_op_A(dynamic_result)
@graph()
def graph_B():
dynamic_result = dynamic_op_B()
optional_op_B(dynamic_result)
@graph()
def main_graph():
graph_A()
graph_B()
@job(
config={
"execution": {
"config": {
"in_process": {},
}
},
},
)
def my_job():
main_graph()
I want following order of execution when I run `my_job`:
1. dynamic_op_A()
2. optional_op_A()
(if above dynamic_op_A()
yielded result)
3. dynamic_op_B()
4. optional_op_B()
(if above dynamic_op_B()
yielded result)
In the snippet above I'm using graph
(my_main_graph_thats_run_in_job
) that calls another graphs
, but I can also do it as graph
calling ops
( dynamic_op_A, optional_op_A, dynamic_op_B, optional_op_B
).
I am unable to force it to run sequentially in the above order. What I tried to make it work:
• Set in_process
for execution in job's config - it runs indeed sequentially, but in different order (I cannot see any pattern how that order is determined)
• Set tag "dagster/priority"
to ops - did not work (maybe I did it wrong?)
• Make the ops dependent on each other by passing a variable (Ins
and Out
) between them, for example using Nothing
type - besides making everything messy, it did not work, because I have also optional ops (optional_op_A
and optional_op_B
) that sometimes run, but is entirely possible that they are skipped. In such instance (if they are skipped), further ops should still execute. However with this approach it did not work this way.
Is there any way to achieve this? I would appreciate any help. It seems like a pretty easy and common usecase, however I've spent multiple days trying to make it work with Dagster. Any help appreciated!claire
06/01/2023, 6:27 PMin_process
config defined is not applying.
• I think the dagster/priority
tag must be applied to all ops--the A ops should have a higher priority, and the B ops should have a lower priority.
So instead, you could do something like this, which should work:
@op(out={"result": Out(is_required=False)}, tags={"dagster/priority": "1"})
def dynamic_op_A():
if random.random() > 0.5:
yield Output(True, "result")
@op(out={"result": Out(is_required=False)}, tags={"dagster/priority": "0"})
def dynamic_op_B():
if random.random() > 0.5:
yield Output(True, "result")
@op(ins={"foo": In()}, tags={"dagster/priority": "1"})
def optional_op_A(foo):
print(foo)
@op(ins={"foo": In()}, tags={"dagster/priority": "0"})
def optional_op_B(foo):
print(foo)
...
@job(
config={"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}},
)
def my_job():
main_graph()
claire
06/01/2023, 6:28 PMPiotr Danielczyk
06/02/2023, 8:18 AMin_process
?claire
06/02/2023, 5:55 PMdagster/priority
tags. If no priority is specified, it's the order that the ops are called in the job