https://dagster.io/ logo
Title
a

Airton Neto

11/29/2022, 8:18 PM
Hi there, How can I create an scheduler for MultiPartitionKey assets? Im trying to do
ncar_jobs = {
    key: define_asset_job(
        key + "_job",
        selection=AssetSelection.assets(ast),
        partitions_def=MultiPartitionsDefinition(ncar_partition_defs[key]),
    )
    for key, ast in ncar_assets.items()
}

ncar_schedules = {
    key: build_schedule_from_partitioned_job(
        job,
        name=key + "_schedule",
    )
    for key, job in ncar_jobs.items()
}
where ast are multi-partitioned assets and jobs are multipartitioned as well.
btw,
ncar_partition_defs[key]
are like
{
        "date": daily_partition_definition,
        "fct_step": fct_step_partition_definition,
  }
assets and jobs are loading ok
c

claire

11/29/2022, 11:51 PM
Hi Airton. How do you want your scheduler to work? Do you want the job to run on daily basis, with a run request for each multi-partition key for the current day?
a

Airton Neto

11/29/2022, 11:52 PM
Just a question, if I want to run a multi-parted job just once, what is the best way to do that?
About the scheduler, I would need the "date" partition to be run every-day, covering all "fct_step" partitions, like a secondary partition
c

claire

11/29/2022, 11:53 PM
Probably easiest to select the certain partition from the Dagit UI. Alternatively, you could use
job.execute_in_process(partition_key=MultiPartitionKey({"dim_1_name": "dim_1_partition_key", "dim_2_name": "dim_2_partition_key"}))
a

Airton Neto

11/29/2022, 11:54 PM
I think that could solve my problem, indeed, thanks!
c

claire

11/30/2022, 12:10 AM
We don't have a built-in way to build a schedule from a multi-partitioned job, but you could define a custom schedule by doing something like this (untested code below):
composite = MultiPartitionsDefinition(
    {
        "abc": static_partitions,
        "date": time_window_partitions,
    }
)

def get_multipartition_keys_with_dimension_value(
    partition_def: MultiPartitionsDefinition, dimension_values: Mapping[str, str]
) -> List[str]:
    matching_keys = []
    for partition_key in partition_def.get_partition_keys():
        keys_by_dimension = partition_key.keys_by_dimension()
        if all(
            [
                keys_by_dimension.get(dimension, None) == value
                for dimension, value in dimension_values.items()
            ]
        ):
            matching_keys.append(partition_key)
    return matching_keys


@schedule(
    cron_schedule=time_window_partitions.get_cron_schedule(),
    job=my_job,
    execution_timezone=time_window_partitions.timezone,
    name="my_schedule",
)
def schedule_def(context):
    time_partitions = time_window_partitions.get_partition_keys(context.scheduled_execution_time)

    # Run for the latest time partition. Prior partitions will have been handled by prior ticks.
    curr_date = time_partitions[-1]

    for multipartition_key in get_multipartition_keys_with_dimension_value(
        composite, {"date": curr_date}
    ):
        yield my_job.run_request_for_partition(
            partition_key=multipartition_key,
            run_key=multipartition_key,
        )
I can file a feature request for this, since it would be nice to have this functionality built in
@Dagster Bot issue build schedule from multi-partitioned job
d

Dagster Bot

11/30/2022, 12:11 AM
a

Airton Neto

11/30/2022, 12:12 AM
Thanks for the detailed answer, nice work! I will try this out.