Drew You
06/15/2023, 6:03 PMRunRequest
for the most recent partition. I have everything apparently working in dagit. The hourly partitioned schedules all run as expected. However, the daily partitioned schedules all fail with dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key "2023-06-05".
When I inspect the job, it has a list of daily partitions that all seem to match that pattern. Am I missing something?Drew You
06/15/2023, 6:03 PMCRON_SCHEDULES = {
"hours": "0 * * * *",
"days": "0 0 * * *",
}
PARTITION_FUNS = {
"hours": lambda c: (c.scheduled_execution_time - timedelta(hours=1)).strftime(
"%Y-%m-%d-%H"
)
+ ":00",
"days": lambda c: (c.scheduled_execution_time - timedelta(days=1)).strftime(
"%Y-%m-%d"
),
}
def dynamic_funding_schedule(t: str, interval: str, spec: dict) -> ScheduleDefinition:
"""Returns a schedule definition that yields the appropriate RunRequest
for either hourly or daily funding over the desired period"""
asset_key = ["group", dynamic_asset_key(t, interval, spec)]
job_name = "_".join(asset_key)
def _eval(context: ScheduleEvaluationContext):
# get the current hour if hourly, otherwise the current day
part = PARTITION_FUNS[interval](context)
yield RunRequest(
job_name=f"{job_name}_job",
run_key=f"{job_name}:{part}",
partition_key=part,
asset_selection=[AssetKey(asset_key)],
)
return ScheduleDefinition(
job_name=f"{job_name}",
execution_fn=_eval,
cron_schedule=CRON_SCHEDULES[interval],
)
dynamic_schedules = [
dynamic_funding_schedule(t, interval, spec)
for t in ["swaps", "futures"]
for interval in ["hours", "days"]
for spec in SPECS
if t in spec["start_date"]
]
Drew You
06/15/2023, 6:04 PMhours
interval assets, so do I have a wrong assumption about the keys for daily partitioned assets?Drew You
06/15/2023, 6:10 PMowen
06/15/2023, 6:19 PMDrew You
06/15/2023, 6:25 PMTIME_DELTAS = {
"days": timedelta(days=1),
"hours": timedelta(hours=1),
}
INTERVAL_STRINGS = {
"hours": "hourly",
"days": "daily",
}
def build_partition(interval: str, spec: dict, t: str) -> PartitionsDefinition:
if interval == "days":
return DailyPartitionsDefinition(
start_date=spec["start_date"][t],
timezone="Etc/UTC",
end_offset=-1,
)
elif interval == "hours":
return HourlyPartitionsDefinition(
start_date=f"{ spec['start_date'][t] }-00:00",
timezone="Etc/UTC",
end_offset=-1,
)
raise NotImplementedError(f"interval {interval} not supported")
ENDPOINT = {
"futures": URL_HIST_FUNDING_EXCH_FUTURE,
"swaps": URL_HIST_FUNDING_EXCH_SWAP,
}
INS = {
"futures": AssetIn(key=["group", "futures_funding_info"]),
"swaps": AssetIn(key=["group", "swaps_funding_info"]),
}
def dynamic_asset_key(t, interval, spec) -> str:
return (
f"{t}_funding_rates_historical_{INTERVAL_STRINGS[interval]}_{spec['exchange']}"
)
def build_futures_funding_rates_historical_pair(
interval: str, spec: dict, t: str
) -> Callable:
"""
This looks intimidating, but this is just a function that returns an asset AssetsDefinition
for a specific exchange, interval, and instrument type.
"""
@asset(
partitions_def=build_partition(interval, spec, t),
# auto_materialize_policy=AUTO_MATERIALIZE_POLICIES[interval],
ins={"funding_info": INS[t]},
name=dynamic_asset_key(t, interval, spec),
required_resource_keys={"duckdb"},
)
def _asset(context: OpExecutionContext, funding_info):
"""
For now, this just queries the last 31 days of funding rates for all
exchanges and instruments.
"""
...
return _asset
Drew You
06/15/2023, 6:40 PMDrew You
06/15/2023, 7:53 PMdynamic_jobs = [
define_asset_job(
f"group_{dynamic_asset_key(t, interval, spec)}",
selection=[f"group/{dynamic_asset_key(t, interval, spec)}"],
)
for t in ["swaps", "futures"]
for interval in ["hours", "days"]
for spec in SPECS
if t in spec["start_date"]
]
Drew You
06/21/2023, 2:55 PM"2023-06-13"
and still got the same partition not found error.Drew You
06/21/2023, 2:56 PMDrew You
06/23/2023, 5:59 PMDrew You
06/23/2023, 6:00 PM(Pdb) self
TimeWindowPartitionsDefinition(start=1568073600.0, timezone='Etc/UTC', fmt='%Y-%m-%d', end_offset=-1, cron_schedule='0 0 * * *')
but
self.get_partition_keys(current_time,dynamic_partitions_store)
['2019-09-10', '2019-09-11', ... , '2023-06-10', '2023-06-11', '2023-06-12']
Drew You
06/23/2023, 6:01 PM"2023-06-13"
won't be a valid partition key, but this is unexpected given that today is 23 June
Drew You
06/23/2023, 6:56 PMTest Schedule
button not only changes the time at which the schedule will be executed but also changes the partition for the scheduled run. So, I was using constant dates after the test scheduled date and causing errors.owen
06/23/2023, 11:13 PM