https://dagster.io/ logo
#ask-community
Title
# ask-community
v

Vlad Efanov

09/04/2023, 9:56 PM
Hello everyone, I have a
@job
that involves executing a processing flow (
@graph video_processing_graph
) on each of 7 videos. The
@graph video_processing_graph
comprises two operations:
@op first_video_processing
and
@op second_video_processing
. Once all the videos have completed the processing flow, the
@op finalization_op
is executed. I want to run more than 10 jobs simultaneously. My goal is to assign concurrency limits to each of the mentioned operations in all runs. For instance: - Concurrency limit for first_video_processing: 2 - Concurrency limit for second_video_processing: 7 - Concurrency limit for finalization_op: 10 Could you guide me on how to achieve this? Here is my code:
Copy code
from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List


@op
def get_video_list(context) -> List[str]:
    # Replace with your logic to get list of videos
    return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]


@op
def first_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_1"


@op
def second_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_2"


@op(out=DynamicOut())
def run_processing_flow_for_each_video(context, videos: List[str]):
    for video in videos:
        yield DynamicOutput(value=video, mapping_key=video)


@op
def finalization_op(context, processed_videos: List[str]):
    <http://context.log.info|context.log.info>(f"Finalized {processed_videos}")


@graph
def video_processing_graph(video: str):
    first_processed_video = first_video_processing(video)
    second_processed_video = second_video_processing(first_processed_video)
    return second_processed_video


@job
def video_processing_job():
    videos = get_video_list()
    processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
    finalization_op(processed_videos)
j

jamie

09/05/2023, 2:38 PM
v

Vlad Efanov

09/05/2023, 4:30 PM
@jamie Thank you for your answer. Everything works in one run. But if I run 2 runs, then each of them is executed according to the amount that I specified in the limit. I want a certain number of @ops to run across all parallel runs. How to do it? Here is modified code:
Copy code
import time

from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List


@op
def get_video_list(context) -> List[str]:
    # Replace with your logic to get list of videos
    return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]


@op(tags={"database": "redshift"})
def first_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processing {video}")
    time.sleep(20)
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_1"


@op
def second_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processing {video}")
    time.sleep(50)
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_2"


@op(out=DynamicOut())
def run_processing_flow_for_each_video(context, videos: List[str]):
    for video in videos:
        yield DynamicOutput(value=video, mapping_key=video)


@op
def finalization_op(context, processed_videos: List[str]):
    <http://context.log.info|context.log.info>(f"Finalized {processed_videos}")


@graph
def video_processing_graph(video: str):
    first_processed_video = first_video_processing(video)
    second_processed_video = second_video_processing(first_processed_video)
    return second_processed_video


@job(
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 4,
                    "tag_concurrency_limits": [
                        {
                            "key": "database",
                            "value": "redshift",
                            "limit": 2,
                        }
                    ],
                },
            }
        }
    }
)
def video_processing_job():
    videos = get_video_list()
    processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
    finalization_op(processed_videos)
In addition I didn't find how to configure dagster.yaml for this
j

jamie

09/05/2023, 4:58 PM
it sounds like you’ll need this flavor of concurrency https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#limiting-opasset-concurrency-across-runs the dagster.yaml file can be found in the folder pointed to by your
DAGSTER_HOME
environment variable. if there is no dagster.yaml file there, you can create one and add the configuration you need