Hello. How to enforce order of ops to be executed?...
# ask-community
p
Hello. How to enforce order of ops to be executed? I want to execute ops sequentially. Have a look at the below snippet:
Copy code
@op(out={"result": Out(is_required=False)})
def dynamic_op_A():
     if random.random() > 0.5:
        yield Output(True, "result")

@op(out={"result": Out(is_required=False)})
def dynamic_op_B():
     if random.random() > 0.5:
        yield Output(True, "result")

@op(ins={"foo": In()})
def optional_op_A(foo):
    print(foo)

@op(ins={"foo": In()})
def optional_op_B(foo):
    print(foo)

@graph()
def graph_A():
    dynamic_result = dynamic_op_A()
    optional_op_A(dynamic_result)
    
@graph()
def graph_B():
    dynamic_result = dynamic_op_B()
    optional_op_B(dynamic_result)

@graph()
def main_graph():
    graph_A()
    graph_B()
    
    
@job(
    config={
        "execution": {
            "config": {
                "in_process": {},
            }
        },
    },
)
def my_job():
    main_graph()
I want following order of execution when I run `my_job`: 1.
dynamic_op_A()
2.
optional_op_A()
(if above
dynamic_op_A()
yielded result) 3.
dynamic_op_B()
4.
optional_op_B()
(if above
dynamic_op_B()
yielded result) In the snippet above I'm using
graph
(
my_main_graph_thats_run_in_job
) that calls another
graphs
, but I can also do it as
graph
calling
ops
(
dynamic_op_A, optional_op_A, dynamic_op_B, optional_op_B
). I am unable to force it to run sequentially in the above order. What I tried to make it work: • Set
in_process
for execution in job's config - it runs indeed sequentially, but in different order (I cannot see any pattern how that order is determined) • Set tag
"dagster/priority"
to ops - did not work (maybe I did it wrong?) • Make the ops dependent on each other by passing a variable (
Ins
and
Out
) between them, for example using
Nothing
type - besides making everything messy, it did not work, because I have also optional ops (
optional_op_A
and
optional_op_B
) that sometimes run, but is entirely possible that they are skipped. In such instance (if they are skipped), further ops should still execute. However with this approach it did not work this way. Is there any way to achieve this? I would appreciate any help. It seems like a pretty easy and common usecase, however I've spent multiple days trying to make it work with Dagster. Any help appreciated!
c
Hi Piotr. I took a stab at making this work: • Execution config is applied by executor. In this case, your job is using the default multiprocess executor, so I think the
in_process
config defined is not applying. • I think the
dagster/priority
tag must be applied to all ops--the A ops should have a higher priority, and the B ops should have a lower priority. So instead, you could do something like this, which should work:
Copy code
@op(out={"result": Out(is_required=False)}, tags={"dagster/priority": "1"})
def dynamic_op_A():
    if random.random() > 0.5:
        yield Output(True, "result")


@op(out={"result": Out(is_required=False)}, tags={"dagster/priority": "0"})
def dynamic_op_B():
    if random.random() > 0.5:
        yield Output(True, "result")


@op(ins={"foo": In()}, tags={"dagster/priority": "1"})
def optional_op_A(foo):
    print(foo)


@op(ins={"foo": In()}, tags={"dagster/priority": "0"})
def optional_op_B(foo):
    print(foo)


...

@job(
    config={"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}},
)
def my_job():
    main_graph()
❤️ 1
Notice that the execution config is now for the multiprocess executor, which has # max concurrent ops == 1, which basically enforces that only one op can execute at a time.
❤️ 1
p
Hey @claire, thanks a lot for your help! It works exactly as I needed. Many thanks for your support. 🙂 By the way, out of curiosity - how is order of execution of ops determined after setting
in_process
?
c
I believe that dagster first respects ordering indicated by
dagster/priority
tags. If no priority is specified, it's the order that the ops are called in the job