Vlad Efanov
09/04/2023, 9:56 PM@job
that involves executing a processing flow (@graph video_processing_graph
) on each of 7 videos. The @graph video_processing_graph
comprises two operations: @op first_video_processing
and @op second_video_processing
. Once all the videos have completed the processing flow, the @op finalization_op
is executed.
I want to run more than 10 jobs simultaneously. My goal is to assign concurrency limits to each of the mentioned operations in all runs. For instance:
- Concurrency limit for first_video_processing: 2
- Concurrency limit for second_video_processing: 7
- Concurrency limit for finalization_op: 10
Could you guide me on how to achieve this?
Here is my code:
from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List
@op
def get_video_list(context) -> List[str]:
# Replace with your logic to get list of videos
return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]
@op
def first_video_processing(context, video: str):
# Replace with your logic to process each video
<http://context.log.info|context.log.info>(f"Processed {video}")
return f"{video}_processed_1"
@op
def second_video_processing(context, video: str):
# Replace with your logic to process each video
<http://context.log.info|context.log.info>(f"Processed {video}")
return f"{video}_processed_2"
@op(out=DynamicOut())
def run_processing_flow_for_each_video(context, videos: List[str]):
for video in videos:
yield DynamicOutput(value=video, mapping_key=video)
@op
def finalization_op(context, processed_videos: List[str]):
<http://context.log.info|context.log.info>(f"Finalized {processed_videos}")
@graph
def video_processing_graph(video: str):
first_processed_video = first_video_processing(video)
second_processed_video = second_video_processing(first_processed_video)
return second_processed_video
@job
def video_processing_job():
videos = get_video_list()
processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
finalization_op(processed_videos)
jamie
09/05/2023, 2:38 PMVlad Efanov
09/05/2023, 4:30 PMimport time
from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List
@op
def get_video_list(context) -> List[str]:
# Replace with your logic to get list of videos
return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]
@op(tags={"database": "redshift"})
def first_video_processing(context, video: str):
# Replace with your logic to process each video
<http://context.log.info|context.log.info>(f"Processing {video}")
time.sleep(20)
<http://context.log.info|context.log.info>(f"Processed {video}")
return f"{video}_processed_1"
@op
def second_video_processing(context, video: str):
# Replace with your logic to process each video
<http://context.log.info|context.log.info>(f"Processing {video}")
time.sleep(50)
<http://context.log.info|context.log.info>(f"Processed {video}")
return f"{video}_processed_2"
@op(out=DynamicOut())
def run_processing_flow_for_each_video(context, videos: List[str]):
for video in videos:
yield DynamicOutput(value=video, mapping_key=video)
@op
def finalization_op(context, processed_videos: List[str]):
<http://context.log.info|context.log.info>(f"Finalized {processed_videos}")
@graph
def video_processing_graph(video: str):
first_processed_video = first_video_processing(video)
second_processed_video = second_video_processing(first_processed_video)
return second_processed_video
@job(
config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 4,
"tag_concurrency_limits": [
{
"key": "database",
"value": "redshift",
"limit": 2,
}
],
},
}
}
}
)
def video_processing_job():
videos = get_video_list()
processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
finalization_op(processed_videos)
Vlad Efanov
09/05/2023, 4:32 PMjamie
09/05/2023, 4:58 PMDAGSTER_HOME
environment variable. if there is no dagster.yaml file there, you can create one and add the configuration you need