Hi Dagster! I have a project with about 50 reposit...
# ask-community
j
Hi Dagster! I have a project with about 50 repositories. I observed that before each execution the whole code is loaded, which can take a considerable time (1 minute in my case). I think that this loading allows to detect possible errors before the execution. However, in my case it slows down the process a lot. Is there a solution to prevent this ? Would it be possible to make this code loading optional?
wtf owl 1
y
Hi! What’s your use case / workflow like? Curious to learn and see if you can structure the code differently to be more efficient in loading
j
Hi ! My code looks like this :
Copy code
def build_assets(name):
    @asset(name=f"asset_1__job_{name}")
    def asset_1():
        return 1

    @asset(
        name=f"asset_2__job_{name}", ins={"asset_1": AssetIn(f"asset_1__job_{name}")}
    )
    def asset_2(asset_1):
        return asset_1 + 1

    @asset(
        name=f"asset_3__job_{name}", ins={"asset_2": AssetIn(f"asset_2__job_{name}")}
    )
    def asset_3(asset_2):
        return asset_2 + 1

    ...

    return [asset_1, asset_2, asset_3, ...]


def build_repo(repo_name):
    all_assets = []
    jobs = []
    schedules = []
    for job_name in range(1, 50):
        name = f"{repo_name}_{job_name}"
        assets = build_assets(name)
        job = define_asset_job(name=name, selection=assets)
        jobs.append(job)
        schedules.append(
            ScheduleDefinition(
                name=f"schedule_{job_name}",
                job=job,
                cron_schedule="0 0 * * *",
            )
        )
        all_assets.append(assets)
    return [*jobs, *all_assets, *schedules]


@repository()
def A():
    return build_repo("A")

...

@repository()
def Z():
    return build_repo("Z")
Actually, I have trouble understanding the interest of reading the whole code at each execution, especially in a deployment process where errors were detected when reloading the workspace. If I remember correctly, I had already discussed this a few months ago with @owen
o
At the very least, we do need to resolve the entire contents of at least the relevant repository to the run in order to find the definition for the relevant job within the repository. It's arguably reasonable for us not to run the repository functions for non-relevant repos, but when we're actually reading in the code we don't have a clear way of knowing what context the code is being loaded in, so it's not trivial to decide if a repository method should be executed or not. I understand that this is just an example, but it does seem like you're generating a lot of copies of very similar workflows, which perhaps could be represented in more compact ways (which would allow the repository code to be loaded quickly enough for it not to be a concern). For example, static partition sets could potentially do a similar job, obviating the need for creating as many copies. Around how many total assets (across all repositories) are being generated here? It's also possible that we have some low-hanging inefficiencies on our side which could improve the performance.
j
Hi ! Indeed, having only the repository associated with the current run would be beneficial for launching a job.
It is true that the code I have provided above is a bit synthetic and seems redundant. I can quickly present the context. We work with about 50 data providers where we have to process data daily. Each provider has a certain number of clients (between 20 and 40). The data is first retrieved from an API connector and then processed through a series of provider-specific transformations. The data is then separated into several tables (transactions, contacts, etc.) where the schema of each table is common to all providers in order to be stored in a Redshift data warehouse. In our case, a provider corresponds to a repository and a client corresponds to a job. Here is an extract of the code a little more complete than the previous one (in reality the assets are generated by a Factory pattern). Maybe you have some ideas to help us optimize this code, but I don't really see how to do it better.
Copy code
def build_assets(repo_name, job_name, API, cred):

    @asset(name=f"raw__{job_name}")
    def raw(context):
        data = API(cred).fetch_data(context.output_asset_partition_key())
        return data

    @asset(
        name=f"transform__{job_name}", ins={"raw": AssetIn(f"raw__{job_name}")}
    )
    def transform(raw):
        import importlib
        transform = getattr(
                importlib.import_module(repo_name),
                "specific_transform",
            )(raw)
        return transform

    @asset(
        name=f"transactions__{job_name}", ins={"transform": AssetIn(f"transform__{job_name}")}
    )
    def transactions(transform):

        data_transactions = get_transactions(transform)

        return data_transactions

    ...

    return [raw, transform, transactions ...]


def build_repo(repo_name, API):

    all_assets = []
    jobs = []
    schedules = []

    credentials = API.get_credentials()

    for cred in credentials:
        job_name = f"{repo_name}_{cred['database']}"
        assets = build_assets(repo_name, job_name, API, cred)
        job = define_asset_job(name=job_name, selection=assets)
        jobs.append(job)
        all_assets.append(assets)
    return [*jobs, *all_assets]


@repository()
def provider_A():
    return build_repo(repo_name ='provider_A', API = API_A)

...
In our case, some repositories have about a thousand assets and about fifty jobs, so I can understand that loading takes time. However, from what I've noticed, the loading time of a repository grows exponentially with the number of assets and jobs.
Maybe I can separate the repositories into several different workspaces to save time. I'm not a great specialist, do you think I need to have a new docker image as well as a new grpc server to set this up or maybe just adapting the following configuration can be enough? docker-compose.yaml
Copy code
user_code:
    build:
      context: .
      dockerfile: ./Dockerfile
      target: user_code
    image: "$REGISTRY_URL/$ECR_USERCODE_REPO"
    container_name: user_code
    command: "dagster api grpc -h 0.0.0.0 -p 4000 -f repository.py"
    environment:
      ...
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2048M
workspace.yaml
Copy code
load_from:
 - grpc_server:
      host: user_code
      port: 4000
      location_name: "dags"
o
Got it (and sorry for the slow response!) -- I'm not an expert on the deployment side of things (I'd recommend posting a new question in this channel on that topic so that it doesn't get lost). Now that I understand your model a bit more, the structure you have of breaking things into different repositories/jobs makes a lot of sense. I do wonder if there's any profiling you can do on your side to see specifically what is taking the most amount of time (something to the effect of
time sudo py-spy record -o profile.svg -- python repository.py
would give you a pretty good idea). It's likely that there's some inefficiencies somewhere on our side that show up at this scale, and it's also possible that there could be some easy wins on your side (maybe interfacing with the API takes an unexpectedly long amount of time, or something like that)
j
Thank you for your feedback. I will try to optimize the API calls to reduce the loading time. Although it is complicated on your side, I find it quite restrictive especially in a workspace with many jobs/repositories. Do you think that in the weeks/months the fact of reloading the whole workspace during an execution could be corrected ?
o
That makes sense -- we do have some internal interfaces that allow you to cache the result of certain expensive calls when you're spinning up your GRPC server, then send that cached information to the run/step workers when runs are launched, in order to sidestep the expensive calls on each step. this would be the CacheableAssetsDefinition . We use this in things like our fivetran integration to avoid having to call out to those external APIs on each step. Basically, you split up your asset generating code into two steps, the first one being "compute_cacheable_data", which does the expensive work, and only gets called when your code location is spun up, and the second one being "build_definitions", which gets that serialized data and returns a set of AssetsDefinitions. This gets called in each step. I hesitate to recommend it as it's not a public API (and basically undocumented), but some users have had success subclassing this to avoid these sorts of problems.
j
I have started a new thread following this discussion: https://dagster.slack.com/archives/C01U954MEER/p1675874269573189