https://dagster.io/ logo
Title
m

Mark Nemec

05/17/2023, 6:26 PM
Hi, I’m trying to set up a (monthly partitioned?) asset for an API that gives me results in monthly batches. However, the data in the API has to be refreshed daily because it’s constantly being updated. My initial plan was to model this as an asset with a monthly partition definition running on a daily cron schedule but that doesn’t work because the job doesn’t try to materialise the asset as a partitioned asset. I also looked into using a freshness policy with automaterialize policy but that doesn’t seem to work for partitioned asset in a way I’d like it to work. What would be a better way to model this?
this is the error I get when using a combination of partitioned asset and a daily cron schedule:
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
j

jamie

05/17/2023, 8:44 PM
hey @Mark Nemec could you share your sensor definition?
m

Mark Nemec

05/17/2023, 8:52 PM
hey @jamie I’m not using a sensor currently. Just the following asset definition
@asset(
partitions_def=MonthlyPartitionsDefinition(
        start_date="2022-01-01", timezone="America/New_York", end_offset=1
    ),
)
def invoices(…)
and schedule
daily_refresh_schedule = ScheduleDefinition(
    job=define_asset_job(
        name="invoices_job",
        selection=[invoices],
    ),
    cron_schedule="0 0 * * *",
)
I think your question was all I needed… I replaced the schedule with a sensor that returns run request for every partition in the asset and added a date into the run_key so it runs max once a day
j

jamie

05/19/2023, 4:12 PM
cool! so is it working how you expect now or you still running into issues?
m

Mark Nemec

05/19/2023, 4:13 PM
all good now
😛artydagster: 1