Charles Lariviere
02/03/2021, 2:45 PMsandy
02/03/2021, 4:08 PMCharles Lariviere
02/03/2021, 5:04 PMWould using the cross-product of the two dimensions as the set of partitions work for you?Exactly! The pipeline expects a
day
and endpoint
parameter. Given a list of N endpoints, I would like to trigger a run for each endpoint, every day.
In your schedule function, you could submit multiple pipeline runs - one for each endpoint.Interesting! I did not realize you could submit multiple pipeline runs within the same schedule function. It’s not clear to me how I would do that as I understand the schedule definition. Given the example schedule below, would I instead return a list of run config? That seems to bypass the
Partition()
definition though, so I must be approaching this the wrong way.
@daily_schedule(
pipeline_name="example",
start_date=datetime(2021, 2, 1)
)
def example_schedule(date):
endpoints = ["endpoint_1", "endpoint_2"]
return {
"solids": {
"example_solid": {
"inputs": {
"date": {"value": date.strftime("%Y-%m-%d")},
"endpoint": {"value": endpoints[0]}
}
}
}
}
sandy
02/03/2021, 6:58 PMfrom dagster import pipeline, repository, solid, ScheduleDefinition, RunRequest
ENDPOINTS = ["a", "b", "c"]
@solid(config_schema={"endpoint": str, "date": str})
def collect_data_from_endpoint(_):
"""Collect data from the endpoint"""
@pipeline
def my_pipeline():
collect_data_from_endpoint()
def submit_runs(context):
scheduled_execution_time = context.scheduled_execution_time
for endpoint in ENDPOINTS:
yield RunRequest(
run_key=f"{scheduled_execution_time}_{endpoint}",
run_config={
"solids": {
"collect_data_from_endpoint": {
"config": {"endpoint": endpoint, "date": str(scheduled_execution_time)}
}
}
},
)
my_schedule = ScheduleDefinition(
execution_fn=submit_runs, pipeline_name="my_pipeline", cron_schedule="45 23 * * 6"
)
@repository
def multiple_runs_per_tick():
return [my_pipeline, my_schedule]
Charles Lariviere
02/03/2021, 7:27 PM