Airton Neto
11/29/2022, 8:18 PMncar_jobs = {
key: define_asset_job(
key + "_job",
selection=AssetSelection.assets(ast),
partitions_def=MultiPartitionsDefinition(ncar_partition_defs[key]),
)
for key, ast in ncar_assets.items()
}
ncar_schedules = {
key: build_schedule_from_partitioned_job(
job,
name=key + "_schedule",
)
for key, job in ncar_jobs.items()
}
where ast are multi-partitioned assets and jobs are multipartitioned as well.ncar_partition_defs[key]
are like
{
"date": daily_partition_definition,
"fct_step": fct_step_partition_definition,
}
assets and jobs are loading okclaire
11/29/2022, 11:51 PMAirton Neto
11/29/2022, 11:52 PMclaire
11/29/2022, 11:53 PMjob.execute_in_process(partition_key=MultiPartitionKey({"dim_1_name": "dim_1_partition_key", "dim_2_name": "dim_2_partition_key"}))
Airton Neto
11/29/2022, 11:54 PMclaire
11/30/2022, 12:10 AMcomposite = MultiPartitionsDefinition(
{
"abc": static_partitions,
"date": time_window_partitions,
}
)
def get_multipartition_keys_with_dimension_value(
partition_def: MultiPartitionsDefinition, dimension_values: Mapping[str, str]
) -> List[str]:
matching_keys = []
for partition_key in partition_def.get_partition_keys():
keys_by_dimension = partition_key.keys_by_dimension()
if all(
[
keys_by_dimension.get(dimension, None) == value
for dimension, value in dimension_values.items()
]
):
matching_keys.append(partition_key)
return matching_keys
@schedule(
cron_schedule=time_window_partitions.get_cron_schedule(),
job=my_job,
execution_timezone=time_window_partitions.timezone,
name="my_schedule",
)
def schedule_def(context):
time_partitions = time_window_partitions.get_partition_keys(context.scheduled_execution_time)
# Run for the latest time partition. Prior partitions will have been handled by prior ticks.
curr_date = time_partitions[-1]
for multipartition_key in get_multipartition_keys_with_dimension_value(
composite, {"date": curr_date}
):
yield my_job.run_request_for_partition(
partition_key=multipartition_key,
run_key=multipartition_key,
)
Dagster Bot
11/30/2022, 12:11 AMAirton Neto
11/30/2022, 12:12 AM