Hi, We have a repository where we create a job per...
# ask-community
m
Hi, We have a repository where we create a job per client. The list of clients along their configurations is fetched through a REST endpoint. Loading this repository is taking very long (~40 seconds) compared to the cadence that Dagster uses to reload it automatically (every minute). We are trying to move to the lazy loading approach, but we couldn't figure out how to parametrise the callables used by the lazy loading as we need to know at least the client id in order to be able to build their assets graph. • Going for option 2 here: https://docs.dagster.io/_apidocs/repositories • Consider the following simplified case:
Copy code
import datetime

from dagster import (
    define_asset_job,
    AssetSelection,
    TimeWindowPartitionsDefinition,
    JobDefinition,
    repository
)


class MyService:

    def to_job_name(self, client) -> JobDefinition:
        return f"my_job_{client.id}"

    def get_job_for_client(self, client) -> JobDefinition:
        assets_partitions = TimeWindowPartitionsDefinition(
            start=datetime.date(2023, 1, 1),
            fmt="%Y-%m-%d",
            cron_schedule=client.config.running_schedule
        )
        my_assets = ...
        run_config = ...  # run config is built based on the client config

        my_job = define_asset_job(
            name=self.to_job_name(client),
            selection=AssetSelection.assets(*my_assets),
            partitions_def=assets_partitions,
            config={
                "ops": run_config
            }
        )


@repository(name="my_repo")
def dagster():
    api = MyAPI()
    service = MyService()
    clients = api.load_all_clients()
    jobs = {
        service.to_job_name(client): service.get_job_for_client(client) # this call is not conformant with the interface specified by Dagster
        for client in clients
    }

    repository_data = {
        'jobs': jobs,
        # 'schedules': ...
    }

    return repository_data
Thank you!
🤖 1
s
Hi, not sure I fully understand but I think you could just wrap the `service.get_job_for_client(client) in a lambda:
Copy code
@repository(name="my_repo")
def dagster():
    api = MyAPI()
    service = MyService()
    clients = api.load_all_clients()
    jobs = {
        service.to_job_name(client): lambda: service.get_job_for_client(client)
        for client in clients
    }
That should lazy load jobs as needed, though the call to
api.load_all_clients()
appears unavoidable. If it doesn’t change much you could use some caching scheme though and just use the cached result until X minutes have elapsed.
m
@sean the issue here is that dagster expects a parameterless function to generate the job (see https://docs.dagster.io/_apidocs/repositories : lazy-loaded repository). the expected interface is:
Copy code
'jobs': Dict[str, Callable[[], JobDefinition]]
Therefore a mapping of
Copy code
job_name: get_job_for_client(client)
would not work due to the client parameter being present
s
Copy code
jobs = {
        service.to_job_name(client): lambda: service.get_job_for_client(client)
        for client in clients
    }
Maybe I’m being dense here but
lambda: service.get_job_for_client(client)
is a parameterless function, no? The cilent id here is defined in the closure, not a function parameter.
m
The issue here is that
lambda: service.get_job_for_client(client)
does not defer the resolution of
client
. That means that we have client A and client B the result is:
Copy code
jobs = {
  "A": service.get_job_for_client("B"),
  "B": service.get_job_for_client("B"),
}
which is not correct. But thanks to your hint I adapted the lambda to be parameterized, but can be called without parameters through defaults. The final solution to get it working is
Copy code
jobs = {
        service.to_job_name(client): lambda c=client: service.get_job_for_client(c)
        for client in clients
    }
s
Ah classic tricky variable scoping gotcha. Good solution. FWIW you can also get around it by creating the lambda in a separate function (scope):
Copy code
funcs = []

def make_func(x):
    return lambda: x

for x in [1, 2, 3]:
    funcs.append(make_func(x))
    
print([f() for f in funcs])  # => [1, 2, 3]
m
Thank you @sean for your answer 🙂
Hi @sean, The repository is now loading successfully, but I have noticed that it is still not lazy loading the job definitions. So the method
get_job_for_client
is actually being called for all clients during the repository load time. I have also tried the example from the Dagster docs, and it is behaving similarly so the
make_expensive_job
is actually also getting called during load time. Are we missing something or do you think there is an issue with that? Thank you!
a
the lazy loading prevents it from being loaded in contexts where that job is not needed from the repository (like executing a different job), but for serving the dagster ui the server needs all the definitions so will end up loading them all.
the cadence that Dagster uses to reload it automatically (every minute)
this is
dagster dev
or in a different context?
m
Thank you @alex for clarifying this. It would very helpful to have this added to the documentation explicitly. Now after your message when I read:
This form is intended to allow definitions to be created lazily when accessed by name
I can probably interpret it as you mentioned. As for the cadence of every minute, it used to happen with
dagster api grpc
but I have just tried it out and it is not happening anymore. Only with
dagster dev
as you stated.