Hello everyone, I have a `@job` that involves exe...
# ask-community
Hello everyone, I have a
that involves executing a processing flow (
@graph video_processing_graph
) on each of 7 videos. The
@graph video_processing_graph
comprises two operations:
@op first_video_processing
@op second_video_processing
. Once all the videos have completed the processing flow, the
@op finalization_op
is executed. I want to run more than 10 jobs simultaneously. My goal is to assign concurrency limits to each of the mentioned operations in all runs. For instance: - Concurrency limit for first_video_processing: 2 - Concurrency limit for second_video_processing: 7 - Concurrency limit for finalization_op: 10 Could you guide me on how to achieve this? Here is my code:
Copy code
from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List

def get_video_list(context) -> List[str]:
    # Replace with your logic to get list of videos
    return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]

def first_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_1"

def second_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_2"

def run_processing_flow_for_each_video(context, videos: List[str]):
    for video in videos:
        yield DynamicOutput(value=video, mapping_key=video)

def finalization_op(context, processed_videos: List[str]):
    <http://context.log.info|context.log.info>(f"Finalized {processed_videos}")

def video_processing_graph(video: str):
    first_processed_video = first_video_processing(video)
    second_processed_video = second_video_processing(first_processed_video)
    return second_processed_video

def video_processing_job():
    videos = get_video_list()
    processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
@jamie Thank you for your answer. Everything works in one run. But if I run 2 runs, then each of them is executed according to the amount that I specified in the limit. I want a certain number of @ops to run across all parallel runs. How to do it? Here is modified code:
Copy code
import time

from dagster import op, job, DynamicOut, DynamicOutput, graph
from typing import List

def get_video_list(context) -> List[str]:
    # Replace with your logic to get list of videos
    return ["video1", "video2", "video3", "video4", "video5", "video6", "video7"]

@op(tags={"database": "redshift"})
def first_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processing {video}")
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_1"

def second_video_processing(context, video: str):
    # Replace with your logic to process each video
    <http://context.log.info|context.log.info>(f"Processing {video}")
    <http://context.log.info|context.log.info>(f"Processed {video}")
    return f"{video}_processed_2"

def run_processing_flow_for_each_video(context, videos: List[str]):
    for video in videos:
        yield DynamicOutput(value=video, mapping_key=video)

def finalization_op(context, processed_videos: List[str]):
    <http://context.log.info|context.log.info>(f"Finalized {processed_videos}")

def video_processing_graph(video: str):
    first_processed_video = first_video_processing(video)
    second_processed_video = second_video_processing(first_processed_video)
    return second_processed_video

        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 4,
                    "tag_concurrency_limits": [
                            "key": "database",
                            "value": "redshift",
                            "limit": 2,
def video_processing_job():
    videos = get_video_list()
    processed_videos = run_processing_flow_for_each_video(videos).map(video_processing_graph).collect()
In addition I didn't find how to configure dagster.yaml for this
it sounds like you’ll need this flavor of concurrency https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#limiting-opasset-concurrency-across-runs the dagster.yaml file can be found in the folder pointed to by your
environment variable. if there is no dagster.yaml file there, you can create one and add the configuration you need