mrdavidlaing
06/05/2021, 10:01 AMpdpark
06/23/2021, 5:42 PMDaniel Kim
08/08/2021, 1:10 AMplant1_schedule = ScheduleDefinition(
job=my_graph.to_job(
name='plant1_job',
config={
"ops": {
"do_some_task1": {"config": {"plant_code": "Plant1"}},
"do_some_task2": {"config": {"plant_code": "Plant1"}},
"do_some_task3": {"config": {"plant_code": "Plant1"}},
}
}
),
cron_schedule="0 10 * * 1"
)
plant2_schedule = ScheduleDefinition(
job=my_graph.to_job(
name='plant2_job',
config={
"ops": {
"do_some_task1": {"config": {"plant_code": "Plant2"}},
"do_some_task2": {"config": {"plant_code": "Plant2"}},
"do_some_task3": {"config": {"plant_code": "Plant2"}},
}
}
),
cron_schedule="0 10 * * 1"
)
...
plant12_schedule = ScheduleDefinition(
job=my_graph.to_job(
name='plant12_job',
config={
"ops": {
"do_some_task1": {"config": {"plant_code": "Plant12"}},
"do_some_task2": {"config": {"plant_code": "Plant12"}},
"do_some_task3": {"config": {"plant_code": "Plant12"}},
}
}
),
cron_schedule="0 10 * * 1"
)
Michael Russell
08/09/2021, 4:59 PM@schedule(job=my_graph, cron_schedule="0 10 * * 1")
def plant_schedule(_):
for i in range(12):
config={
"ops": {
"do_some_task1": {"config": {"plant_code": f"Plant{i}"}},
"do_some_task2": {"config": {"plant_code": f"Plant{i}"}},
"do_some_task3": {"config": {"plant_code": f"Plant{i}"}},
}
}
yield RunRequest(run_key=None, run_config=config)
I've looked into asset-based sensors which would work for each launched run and multi-asset sensors which would involve writing out each plant code again (code example below from docs):
@sensor(pipeline_name="my_pipeline")
def multi_asset_sensor(context):
cursor_dict = json.loads(context.cursor) if context.cursor else {}
a_cursor = cursor_dict.get("a")
b_cursor = cursor_dict.get("b")
a_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey("table_a"),
after_cursor=a_cursor,
),
ascending=False,
limit=1,
)
b_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey("table_a"),
after_cursor=b_cursor,
),
ascending=False,
limit=1,
)
Michael
08/11/2021, 7:42 PMHuib
08/16/2021, 2:48 PMmrdavidlaing
12/02/2021, 10:20 PMPaul Harrison
12/31/2021, 11:19 AMAlex Service
01/04/2022, 3:31 PMAdil Karim
02/07/2022, 11:11 AMparalellism
to 100 and it would only spin up pods as necessary. After collection a quasi-reduce stage would take the outputs from those collection tasks and output them as a dataset/publish to Kafka. My question is, can I get better granularity with Dagster in an idiomatic way? Can I spin up 100K tasks and have the Run Coordinator limit concurrency to 100, or will I need to batch the tasks together? Will I need Celery to handle this load? I’d prefer not to have another task queue to manage!Adil Karim
02/09/2022, 11:29 AMspec:
volumes:
- name: config
secret:
secretName: my-service-config
containers:
volumeMounts:
- mountPath: "/etc/my-service-config"
readOnly: true
name: "config"
From what I can see on the docs we can import that into the container using K8sRunLauncher
. Is that right?
The main question is, can I merge configs? It would be good for all the production string values for resources to come from this precompiled YAML file, but op configuration we might want to handle in the launchpad - this would be great DX for our Data Scientists.Sindbad Ioualalen
02/10/2022, 11:57 AMAlex Service
02/24/2022, 8:09 PMMax
03/10/2022, 3:00 PMSergey DE
03/21/2022, 1:03 PMSlackbot
03/30/2022, 8:10 PMEric Cheminot
03/31/2022, 9:52 AMJon Simpson
05/13/2022, 9:08 PMDavid Lakomski
05/18/2022, 12:56 PMyuhan
07/19/2022, 10:17 PMGeorge Pearse
09/26/2022, 9:23 PMFraser Marlow
10/31/2022, 4:59 PMnickvazz
11/04/2022, 4:45 PMTomas Vykruta (EIQ)
11/17/2022, 5:19 PMDaniel Mosesson
12/05/2022, 6:34 PMHui Zheng
01/16/2023, 10:33 PMChris Histe
03/17/2023, 2:46 PMAkshay Verma
08/15/2023, 12:45 PMChristian Rowlands
09/15/2023, 6:30 PM