https://dagster.io/ logo
c

Charles Lariviere

02/03/2021, 2:45 PM
Hey folks 👋 Does Dagster support partitions that have more than 1 dimension, or pipeline runs with multiple partitions? For example, I have a pipeline that pulls data from multiple endpoints of an API every day — therefore, hoping to partition that pipeline along “days” and “endpoints”. If for a given day, the pipeline fails for a given endpoint, it feels optimal to only re-run this specific day/endpoint, instead of re-running the pipeline for this given day on all endpoints.
s

sandy

02/03/2021, 4:08 PM
Would using the cross-product of the two dimensions as the set of partitions work for you?In your schedule function, you could submit multiple pipeline runs - one for each endpoint.
c

Charles Lariviere

02/03/2021, 5:04 PM
Would using the cross-product of the two dimensions as the set of partitions work for you?
Exactly! The pipeline expects a
day
and
endpoint
parameter. Given a list of N endpoints, I would like to trigger a run for each endpoint, every day.
In your schedule function, you could submit multiple pipeline runs - one for each endpoint.
Interesting! I did not realize you could submit multiple pipeline runs within the same schedule function. It’s not clear to me how I would do that as I understand the schedule definition. Given the example schedule below, would I instead return a list of run config? That seems to bypass the
Partition()
definition though, so I must be approaching this the wrong way.
Copy code
@daily_schedule(
    pipeline_name="example",
    start_date=datetime(2021, 2, 1)
)
def example_schedule(date):
    endpoints = ["endpoint_1", "endpoint_2"]
    return {
        "solids": {
            "example_solid": {
                "inputs": {
                    "date": {"value": date.strftime("%Y-%m-%d")},
                    "endpoint": {"value": endpoints[0]}
                }
            }
        }
    }
s

sandy

02/03/2021, 6:58 PM
I haven't tested this yet, but I believe something like the following should work:
Copy code
from dagster import pipeline, repository, solid, ScheduleDefinition, RunRequest

ENDPOINTS = ["a", "b", "c"]


@solid(config_schema={"endpoint": str, "date": str})
def collect_data_from_endpoint(_):
    """Collect data from the endpoint"""


@pipeline
def my_pipeline():
    collect_data_from_endpoint()


def submit_runs(context):
    scheduled_execution_time = context.scheduled_execution_time
    for endpoint in ENDPOINTS:
        yield RunRequest(
            run_key=f"{scheduled_execution_time}_{endpoint}",
            run_config={
                "solids": {
                    "collect_data_from_endpoint": {
                        "config": {"endpoint": endpoint, "date": str(scheduled_execution_time)}
                    }
                }
            },
        )


my_schedule = ScheduleDefinition(
    execution_fn=submit_runs, pipeline_name="my_pipeline", cron_schedule="45 23 * * 6"
)


@repository
def multiple_runs_per_tick():
    return [my_pipeline, my_schedule]
c

Charles Lariviere

02/03/2021, 7:27 PM
Awesome, I’ll give that a try — thanks Sandy 🙏