Hi all - need advice regarding how to organize a l...
# ask-community
a
Hi all - need advice regarding how to organize a large volume of Assets across multiple days The use case: • Every minute, a business process creates a new (roughly 500 MB) file • There is no end date, so as time goes on, more and more new minutes of data will be produced • A @sensor should detect a new file has been written and trigger asset materilization • The @asset should process the file and store the result via IO manager ◦ (The processing of each file is significant enough that it warrants having its own asset) • Several downstream assets should trigger materialization based on the above asset, e.g. ◦ Each hour of minute-files should auto-materialize a combined-hour asset ◦ Each day of minute-files should auto-materialize a combined-day asset The issue: I see 2 major ways of designing/implementing this. Option 1: Infinitely long time-based partition of minutes I guess this technically works, assuming I can create a TimeWindowPartitionsDefinition with no end date, that will keep creating new minute partitions forever. But this will quickly become unwieldy in the UI, both to look at/backfill, and performance-wise, as we plan to run this pipeline for years, which will create 1*365*24*60=525600 partitions per year. And based on https://github.com/dagster-io/dagster/issues/12441 I guess Dagster can't handle this scale. Option 2: Dynamic asset definitions Something like as discussed in https://github.com/dagster-io/dagster/discussions/15119. The idea is I would dynamically create an Asset Group for each new day, and each Asset Group would contain 1 day's worth of minute-file partition-based asset, and each downstream materialization would operate only on the data within this Asset Group date (because the assets themselves would contain the date in their name, I guess). Unfortunately, as referenced in the linked discussion, this is also not supported. As a hacky workaround, I'm contemplating restarting the Dagster definitions container on some schedule in order to reload the definitions to create Assets, Asset Groups, sensors, etc for the new dates. This may bring other challenges as well, in terms of turning off old day's sensors, dynamic function names with decorators, and other complications of specifying the dynamic relationships between sensors, downstream asset materializations, etc. Any guidance here is appreciated! Maybe there is a 3rd option that I haven't thought of.
Update on this - I was able to accomplish what I want using Option 2. Asset Groups are really great - although I think I read something about them being deprecated, I really hope that isn't the case! See the first attached screenshot for what the UI looks like from dynamically creating assets (and sensors) based on the date, and grouping them into Asset Groups by date The second screenshot has the dynamic sensors, and one sensor which uses the Dagster GraphQL API to reload the code location on a time interval, which creates additional dynamic date assets etc as needed. And here is the code for it:
Copy code
import os
from datetime import datetime, timedelta
from typing import Dict
from minio import Minio
from dagster import (asset, AssetIn, FilesystemIOManager, Definitions, ConfigurableResource,
    sensor, RunRequest, Config,
    AutoMaterializePolicy, AssetExecutionContext,
    TimeWindowPartitionsDefinition, AssetSelection, job, op)
from dagster_graphql import DagsterGraphQLClient
from pydantic import PrivateAttr


BUCKET_NAME = "files"
START_DATE = datetime(2023, 9, 1)
END_DATE = datetime.now() + timedelta(days=1)


class S3ClientResource(ConfigurableResource):
    _client: Minio = PrivateAttr()

    def setup_for_execution(self, context) -> None:
        self._client = Minio(...)

    def client(self):
        return self._client


assets = []
sensors = []

cur_date = START_DATE
while cur_date < END_DATE:
    dt_suffix = cur_date.strftime("%Y%m%d")
    group_name = f"FILE_{dt_suffix}"

    s3_bucket_asset_name = f"s3_bucket_{dt_suffix}"
    @asset(
        group_name=group_name,
        auto_materialize_policy=AutoMaterializePolicy.eager(),
        name=s3_bucket_asset_name,
    )
    def s3_bucket_asset(context: AssetExecutionContext, s3: S3ClientResource) -> str:
        ...
        return BUCKET_NAME
    assets.append(s3_bucket_asset)


    raw_minute_files_asset_name = f"raw_minute_files_{dt_suffix}"
    @asset(
        group_name=group_name,
        partitions_def=TimeWindowPartitionsDefinition(
            cron_schedule="* * * * *",
            start=cur_date,
            end=cur_date + timedelta(hours=1),
            fmt="%Y-%m-%d %H:%M:%S",
        ),
        ins={"bucket": AssetIn(s3_bucket_asset_name)},
        name=raw_minute_files_asset_name,
    )
    def raw_minute_files_asset(
        context: AssetExecutionContext,
        bucket: str,
        s3: S3ClientResource
    ) -> str:
        dt_range = context.partition_key_range
        <http://context.log.info|context.log.info>(f"minute files, bucket: '{bucket}', date: '{dt_range.start}'")
        if dt_range.start != dt_range.end:
            raise NotImplementedError("Dagster doesn't support multi-partition runs yet")
        context.add_output_metadata(metadata={"partition": dt_range.start})
        return "..."
    assets.append(raw_minute_files_asset)


    combined_hour_file_asset_name = f"combined_hour_file_{dt_suffix}"
    @asset(
        group_name=group_name,
        auto_materialize_policy=AutoMaterializePolicy.eager(),
        ins={
            "bucket": AssetIn(s3_bucket_asset_name),
            "minute_files": AssetIn(raw_minute_files_asset_name)
        },
        name=combined_hour_file_asset_name,
    )
    def combined_hour_file_asset(
        context: AssetExecutionContext,
        bucket: str,
        minute_files: Dict[str, str],
        s3: S3ClientResource
    ) -> str:
        <http://context.log.info|context.log.info>(f"Combined asset, bucket: '{bucket}', minute_files: '{minute_files}'")
        return "..."
    assets.append(combined_hour_file_asset)

    raw_file_sensor_name = f"raw_file_{dt_suffix}"
    @sensor(
        asset_selection=AssetSelection.keys(raw_minute_files_asset_name),
        minimum_interval_seconds=30,
        name=raw_file_sensor_name,
    )
    def raw_file_sensor(context) -> None:
        partition = "..."
        yield RunRequest(
            run_key=partition,
            partition_key=partition,
        )
    sensors.append(raw_file_sensor)

    cur_date = cur_date + timedelta(days=1)

@asset(group_name=datetime.now().strftime("FOO_%Y%m%d_%H%M%S"))
def dummy_asset(context):
    return ""
assets.append(dummy_asset)

@op
def dummy_op(context):
    return

@job
def dummy_job():
    dummy_op()

@sensor(job=dummy_job, minimum_interval_seconds=30)
def reload_code_sensor(context):
    client = DagsterGraphQLClient("webserver", port_number=3000)
    status = client.reload_repository_location("example_user_code")
    <http://context.log.info|context.log.info>(str(status))
sensors.append(reload_code_sensor)


defs = Definitions(
    assets=assets,
    resources={
        "io_manager": FilesystemIOManager(base_dir="..."),
        "s3": S3ClientResource(),
    },
    sensors=sensors,
    jobs=[dummy_job],
)
Now I just need to figure out how to auto enable asset materializations and auto turn on/off sensors without using the UI .. but I think I can handle that
Also need to find a way to sort the Asset Groups in reverse order, makes sense to have the most recent date first on the upper left of the UI