So, I am dynamically generating about 20 assets (t...
# ask-community
d
So, I am dynamically generating about 20 assets (timepartitioned by either day or hour), a job that targets each asset, and a schedule that has an eval function that returns a
RunRequest
for the most recent partition. I have everything apparently working in dagit. The hourly partitioned schedules all run as expected. However, the daily partitioned schedules all fail with
dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key "2023-06-05".
When I inspect the job, it has a list of daily partitions that all seem to match that pattern. Am I missing something?
Here's my schedule generation code:
Copy code
CRON_SCHEDULES = {
    "hours": "0 * * * *",
    "days": "0 0 * * *",
}

PARTITION_FUNS = {
    "hours": lambda c: (c.scheduled_execution_time - timedelta(hours=1)).strftime(
        "%Y-%m-%d-%H"
    )
    + ":00",
    "days": lambda c: (c.scheduled_execution_time - timedelta(days=1)).strftime(
        "%Y-%m-%d"
    ),
}


def dynamic_funding_schedule(t: str, interval: str, spec: dict) -> ScheduleDefinition:
    """Returns a schedule definition that yields the appropriate RunRequest
    for either hourly or daily funding over the desired period"""
    asset_key = ["group", dynamic_asset_key(t, interval, spec)]
    job_name = "_".join(asset_key)

    def _eval(context: ScheduleEvaluationContext):
        # get the current hour if hourly, otherwise the current day
        part = PARTITION_FUNS[interval](context)

        yield RunRequest(
            job_name=f"{job_name}_job",
            run_key=f"{job_name}:{part}",
            partition_key=part,
            asset_selection=[AssetKey(asset_key)],
        )

    return ScheduleDefinition(
        job_name=f"{job_name}",
        execution_fn=_eval,
        cron_schedule=CRON_SCHEDULES[interval],
    )


dynamic_schedules = [
    dynamic_funding_schedule(t, interval, spec)
    for t in ["swaps", "futures"]
    for interval in ["hours", "days"]
    for spec in SPECS
    if t in spec["start_date"]
]
These schedules work for the
hours
interval assets, so do I have a wrong assumption about the keys for daily partitioned assets?
The reason for all this complexity is to expand which partitions are requested once the simple version works.
o
would you mind sharing the code where you're generating the assets?
d
Copy code
TIME_DELTAS = {
    "days": timedelta(days=1),
    "hours": timedelta(hours=1),
}

INTERVAL_STRINGS = {
    "hours": "hourly",
    "days": "daily",
}


def build_partition(interval: str, spec: dict, t: str) -> PartitionsDefinition:
    if interval == "days":
        return DailyPartitionsDefinition(
            start_date=spec["start_date"][t],
            timezone="Etc/UTC",
            end_offset=-1,
        )
    elif interval == "hours":
        return HourlyPartitionsDefinition(
            start_date=f"{ spec['start_date'][t] }-00:00",
            timezone="Etc/UTC",
            end_offset=-1,
        )
    raise NotImplementedError(f"interval {interval} not supported")


ENDPOINT = {
    "futures": URL_HIST_FUNDING_EXCH_FUTURE,
    "swaps": URL_HIST_FUNDING_EXCH_SWAP,
}

INS = {
    "futures": AssetIn(key=["group", "futures_funding_info"]),
    "swaps": AssetIn(key=["group", "swaps_funding_info"]),
}


def dynamic_asset_key(t, interval, spec) -> str:
    return (
        f"{t}_funding_rates_historical_{INTERVAL_STRINGS[interval]}_{spec['exchange']}"
    )


def build_futures_funding_rates_historical_pair(
    interval: str, spec: dict, t: str
) -> Callable:
    """
    This looks intimidating, but this is just a function that returns an asset AssetsDefinition
    for a specific exchange, interval, and instrument type. 
    """

    @asset(
        partitions_def=build_partition(interval, spec, t),
        # auto_materialize_policy=AUTO_MATERIALIZE_POLICIES[interval],
        ins={"funding_info": INS[t]},
        name=dynamic_asset_key(t, interval, spec),
        required_resource_keys={"duckdb"},
    )
    def _asset(context: OpExecutionContext, funding_info):
        """
        For now, this just queries the last 31 days of funding rates for all
        exchanges and instruments.
        """
        ...
    

    return _asset
and in dagit, on the job page -- I have the correct partitions and can launch them :
here are the jobs definitions as well:
Copy code
dynamic_jobs = [
    define_asset_job(
        f"group_{dynamic_asset_key(t, interval, spec)}",
        selection=[f"group/{dynamic_asset_key(t, interval, spec)}"],
    )
    for t in ["swaps", "futures"]
    for interval in ["hours", "days"]
    for spec in SPECS
    if t in spec["start_date"]
]
do you have any thoughts on how I might debug this? I tried always returning a static partition key
"2023-06-13"
and still got the same partition not found error.
I'm also seeing this on my schedule -- is the lack of partition set here a problem?
so, I tossed a breakpoint partition.py:268: and poked around in pdb.
Copy code
(Pdb) self
TimeWindowPartitionsDefinition(start=1568073600.0, timezone='Etc/UTC', fmt='%Y-%m-%d', end_offset=-1, cron_schedule='0 0 * * *')
but
Copy code
self.get_partition_keys(current_time,dynamic_partitions_store)
['2019-09-10', '2019-09-11', ... , '2023-06-10', '2023-06-11', '2023-06-12']
so
"2023-06-13"
won't be a valid partition key, but this is unexpected given that today is
23 June
Ok, so the final answer here is that it was somewhat unexpected for me that the
Test Schedule
button not only changes the time at which the schedule will be executed but also changes the partition for the scheduled run. So, I was using constant dates after the test scheduled date and causing errors.
o
ah sorry to leave you hanging on this! that's definitely somewhat surprising (I hadn't realized that Test Schedule was involved, but even if I did I wouldn't have guessed that) -- glad things seem to be working now