Gustavo Carvalho
02/19/2023, 8:31 PMdefine_asset_job
with only one asset. However, this asset is using a Multipartition, The first dimension is a daily partiton, and the second dimension is a static partition with ~100 elements.
Any idea on how to make this schedule work (and hopefully make the code-location not crash, too)?
Feel free to ask for the code defining the described entities.Gustavo Carvalho
02/21/2023, 12:32 PM# values.yaml
dasgterDaemon:
...
env:
DAGSTER_GRPC_TIMEOUT_SECONDS: '180'
As a feedback, it was rather cryptic to find this configuration option, I had to dig inside the code of dagster._grpc.client
.
The docs directed me to https://docs.dagster.io/deployment/dagster-instance#grpc-servers , but it does not work (or at least I could not make it work) on Dagster K8s deployment.daniel
02/21/2023, 3:18 PMGustavo Carvalho
02/21/2023, 3:18 PMGustavo Carvalho
02/21/2023, 3:19 PMdaniel
02/21/2023, 3:19 PMGustavo Carvalho
02/21/2023, 3:20 PMGustavo Carvalho
02/21/2023, 3:20 PMGustavo Carvalho
02/21/2023, 3:21 PMGustavo Carvalho
02/21/2023, 4:21 PMGustavo Carvalho
02/21/2023, 4:21 PMge_influxdb_daily_schedule = build_schedule_from_multipartitioned_job(
ge_influxdb_daily_job,
hour_of_day=1,
minute_of_hour=5,
tags={
"dagster/priority": "10",
},
)
Gustavo Carvalho
02/21/2023, 4:22 PMdef build_schedule_from_multipartitioned_job(
job: JobDefinition,
description=None,
name=None,
minute_of_hour=None,
hour_of_day=None,
day_of_week=None,
day_of_month=None,
default_status=DefaultScheduleStatus.STOPPED,
tags=None,
):
multipartitions_def: MultiPartitionsDefinition = job.partitions_def
time_window_partitions_def: TimeWindowPartitionsDefinition = (
multipartitions_def.primary_dimension.partitions_def
)
static_partitions_def: StaticPartitionsDefinition = (
multipartitions_def.secondary_dimension.partitions_def
)
cron_schedule = time_window_partitions_def.get_cron_schedule(
minute_of_hour, hour_of_day, day_of_week, day_of_month
)
@schedule(
cron_schedule=cron_schedule,
job=job,
default_status=default_status,
execution_timezone=time_window_partitions_def.timezone,
name=check.opt_str_param(name, "name", f"{job.name}_schedule"),
tags=tags,
description=check.opt_str_param(description, "description"),
)
def _schedule(context: ScheduleEvaluationContext):
# Run for the latest partition.
# Prior partitions will have been handled by prior ticks.
time_key = time_window_partitions_def.get_last_partition_key(
context.scheduled_execution_time
)
for static_key in static_partitions_def.get_partition_keys():
multipartition_key = MultiPartitionKey(
{
multipartitions_def.primary_dimension.name: time_key,
multipartitions_def.secondary_dimension.name: static_key,
}
)
yield job.run_request_for_partition(
partition_key=multipartition_key,
run_key=multipartition_key,
tags=tags,
)
return _schedule
daniel
02/21/2023, 4:25 PMGustavo Carvalho
02/21/2023, 4:26 PMGustavo Carvalho
02/21/2023, 4:26 PMGustavo Carvalho
02/21/2023, 4:26 PMGustavo Carvalho
02/21/2023, 4:26 PMGustavo Carvalho
02/21/2023, 4:26 PMdaniel
02/21/2023, 4:27 PMdaniel
02/21/2023, 4:27 PMGustavo Carvalho
02/21/2023, 4:27 PMGustavo Carvalho
02/21/2023, 4:27 PMdaniel
02/21/2023, 4:28 PMGustavo Carvalho
02/21/2023, 4:28 PMGustavo Carvalho
02/21/2023, 4:28 PMGustavo Carvalho
02/21/2023, 5:03 PMdaniel
02/21/2023, 5:04 PMGustavo Carvalho
02/21/2023, 5:04 PMGustavo Carvalho
02/21/2023, 5:05 PMdaniel
02/21/2023, 5:06 PMdaniel
02/21/2023, 5:17 PMGustavo Carvalho
02/21/2023, 5:17 PMGustavo Carvalho
02/21/2023, 5:32 PMGustavo Carvalho
02/21/2023, 5:32 PMdaniel
02/21/2023, 5:44 PMGustavo Carvalho
02/21/2023, 5:45 PMclaire
02/21/2023, 6:15 PMrun_request_for_partition
will iterate through each multipartition for each run request which gets unwieldy.daniel
02/21/2023, 6:17 PMGustavo Carvalho
03/03/2023, 12:19 PMdaniel
03/03/2023, 1:51 PMGustavo Carvalho
03/03/2023, 1:51 PM