Hi Dagster, I need some support on the lazy loadin...
# ask-community
m
Hi Dagster, I need some support on the lazy loading aspect of dagster. This is related to https://docs.dagster.io/_apidocs/repositories and https://dagster.slack.com/archives/C01U954MEER/p1689751159913119. I made a minimal example trying to show what I want to achieve. Basically we have an asset_job that materializes a selection of Assets. The job runs on a schedule, but we have multiple instances of the job built dynamically (here the lazy loading would come in handy as building a job is an expensive operation).
Copy code
from datetime import datetime

from dagster import (
    repository, TimeWindowPartitionsDefinition, asset, define_asset_job, AssetSelection, ScheduleDefinition
)
from dagster._core.definitions.asset_graph import AssetGraph

assets_partitions = TimeWindowPartitionsDefinition(
        start=datetime(2023, 8, 29),
        fmt="%Y-%m-%d",
        cron_schedule="* * * * *"
    )

@asset(partitions_def=assets_partitions)
def initialization():
    return "foo"


def make_expensive_job(id):
    # consider this an expensive function
    assets = [initialization]
    job = define_asset_job(
        name=f"expensive_job_{id}",
        selection=AssetSelection.assets(*assets),
        partitions_def=assets_partitions
    )
    return job.resolve(asset_graph=AssetGraph.from_assets(assets))

def make_expensive_schedule(id):
    return ScheduleDefinition(name=f"expensive_schedule_{id}", cron_schedule="* * * * *", job_name=f"expensive_job_{id}")

@repository
def repository():
    jobs = {}
    schedules = {}
    for id in range(1000):
        job = (lambda number=id: make_expensive_job(number))
        schedule = (lambda number=id: make_expensive_schedule(number))
        jobs[f"expensive_job_{id}"] = job
        schedules[f"expensive_schedule_{id}"] = schedule
    return {
        'jobs': jobs,
        'schedules': schedules
    }
When triggering the job through the UI everything works fine (as the partition is selected in the UI), however when Run on a schedule, the tag
dagster/partition
is not set and we receive the following error:
Copy code
Cannot access partition_key for a non-partitioned run
Any idea on how to fix this?
build_schedule_from_partitioned_job
fixed this for us in the past, but I need an instance of the job here and I would like to avoid generating the job twice as it is very expensive.
s
Hi Manuel - might it make sense to use
functools.lru_cache
on
make_expensive_job
so that you can "create" it twice without actually creating it?
m
I can definitely do that, but I am trying to get my head around it first… Is a
ScheduleDefinition
not capable of handling partitioned jobs? Having a “lazy loading” support from dagster, but having to instantiate and cache the items that should be “lazy loaded” defeats the original purpose. I would rather use it the way dagster intends it to be used.
s
Is a
ScheduleDefinition
not capable of handling partitioned jobs?
It is - if you look at the implementation of
build_schedule_from_partitioned_job
, it invokes the
schedule
decorator, which creates a
ScheduleDefinition
under the covers