https://dagster.io/ logo
m

Mykola Palamarchuk

11/02/2022, 2:50 PM
Hi team! I'm running some job which looks like this:
Copy code
initial_data = initial_op()

data1 = processor1_op(initial_data)
persist1_op(data1)

data2 = processor2_op(initial_data)
persist2_op(data2)
I was expecting that the order of ops will remain the same as in job definition, but Dagster run them like this:
initial_op -> processor1_op -> processor2_op -> persist1_op -> persist2_op
, which is kind of bad: results of
processorX_op
will be stored in io_manager (in memory) till the very end of the job execution, but that memory could be released probably if the op execution order was preserved. Is there any way to set some execution ordering strategies or priorities? I know there are "nothing" dependencies, but this is a bit different situation as there is no implicit dependency.
a

alex

11/02/2022, 3:26 PM
Is there any way to set some execution ordering strategies or priorities?
you can set
dagster/priority
tag on ops to influence execution order https://docs.dagster.io/_apidocs/execution#dagster.in_process_executor
m

Megan Beckett

11/15/2022, 9:28 AM
Hi, is there an example for this? I found adding metadata tags to a job with this:
Copy code
@job(
    metadata={
        "owner": "data team",  # will be converted to MetadataValue.text
        "docs": MetadataValue.url("<https://docs.dagster.io>"),
    }
)
def my_job_with_metadata():
    my_op()
But, trying something similar with ops doesn't work as the metadata argument is not recognised:
Copy code
@op(
    required_resource_keys={"database"},
    metadata = {
        "dagster/priority": 2
    }
    )
def my_op(context):
m

Mykola Palamarchuk

11/15/2022, 11:08 AM
I used "tags" on ops.
m

Megan Beckett

11/15/2022, 11:08 AM
I see, thanks!
m

Mykola Palamarchuk

11/15/2022, 11:49 AM
But, unfortunately, it doesn't seems to work 😞 . I'm checking it right now. @alex, could you please assist a little bit?
I've tried to set all priority tags for all ops - and now it works as expected, fyi @Megan Beckett
m

Megan Beckett

11/15/2022, 12:11 PM
Ok, thanks - so if you have a priority tag for one op, you have to have them for all ops in a job, even though the default is set to 0? As it says here:
Execution priority can be configured using the
dagster/priority
tag via solid/op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.
m

Mykola Palamarchuk

11/15/2022, 12:14 PM
I'm not sure. Should be tested more to confirm. I'd like to know the algorithm behind the priorities graph resolution to make sure.
👍 1
a

alex

11/15/2022, 3:29 PM
know the algorithm behind the priorities graph resolution
Each time the executor can select ops that are ready to execute (dependencies completed) to fill open execution slots - it will sort them based on the priority tag as the description above cites.
using the in process executor where there is only one execution slot in conjunction with priorities should allow you to control the execution order very specifically. if you are using the default multiprocess executor there will be N execution slots (determined by # of CPUs if not explicitly configured) and in that set-up you can end up with lower priority ops executing when they are all that is available to fill one of the N slots
👍 1
👍🏻 1
2 Views