Aki Iwa
03/13/2024, 10:08 PMScout
03/13/2024, 10:08 PMdagster.yaml
configuration file to ensure that a job with a particular tag does not run more than once concurrently. Here's an example from the documentation where a unique tag is used to prevent the same job from running more than once at the same time: yaml tag_concurrency_limits: - key: single_run_per_job value: applyLimitPerUniqueValue: True limit: 1
In this configuration, the single_run_per_job
tag is used to enforce that only one run of a job with this tag can execute at a time. If a job is running with this tag, any subsequent attempts to run the same job will be blocked until the current run completes. This approach allows you to manage concurrency on a per-job basis, ensuring that jobs do not interfere with each other when resources are limited or when jobs are not designed to be run in parallel.Aki Iwa
03/13/2024, 10:14 PMScout
03/13/2024, 10:14 PMtag_concurrency_limits
mentioned earlier, is designed to control the execution of jobs as a whole. However, if you want to control the execution of a specific op within a job, you would typically structure your job and resources in such a way that the op respects the desired concurrency. For example, you could use resources or custom logic within the op to check if it should proceed with execution or wait. If you have a use case where you need to ensure that a specific op does not execute concurrently across multiple job runs, you might consider using a shared resource, such as a database lock or a distributed lock service (e.g., Redis, ZooKeeper), to coordinate the execution of that op. This would involve implementing custom logic within the op to acquire the lock before executing and release it afterward. The documentation does not provide a built-in feature for setting concurrency limits directly on individual ops. For more advanced concurrency patterns, you would need to implement custom logic within your ops or use external services to manage concurrency at the op level.Aki Iwa
03/13/2024, 10:20 PMScout
03/13/2024, 10:20 PMQueuedRunCoordinator
with tag_concurrency_limits
. This setup allows you to specify concurrency limits based on tags, which can help you control the number of concurrent runs of a job that contains the op you want to manage. Here's a step-by-step approach to prevent a rerun of a failed op from running multiple times concurrently: 1. Assign a Unique Tag to Your Job: Assign a unique tag to your job that you want to control concurrency for. This tag will be used to apply concurrency limits. 2. Configure the QueuedRunCoordinator
: In your dagster.yaml
configuration file, set up the QueuedRunCoordinator
with tag_concurrency_limits
that reference the unique tag you've assigned to your job. yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "my_unique_tag" limit: 1
3. Use the Tag When Launching Runs: When launching runs of the job, ensure that the unique tag is included. This will cause the QueuedRunCoordinator
to enforce the concurrency limit based on the tag. 4. Handle Failed Ops: If an op within the job fails and you want to rerun it, you can trigger a rerun of the entire job or just the failed steps. Since the concurrency limit is applied at the job level, the QueuedRunCoordinator
will ensure that only one instance of the job runs at a time, even if a rerun is triggered. By following these steps, you can control the concurrency of job runs, including reruns of failed ops. However, it's important to note that this approach applies to the entire job and not to individual ops within the job. If you need more granular control at the op level, you would need to implement custom logic within your ops to manage concurrency, potentially using external systems for distributed locking mechanisms. Remember that the QueuedRunCoordinator
works at the job level, so it will not differentiate between initial runs and reruns; it will simply apply the concurrency limits to all runs of the job with the specified tag.Aki Iwa
03/13/2024, 10:25 PMScout
03/13/2024, 10:25 PM@job
decorator. Here's an example of how to assign a unique tag to a job: python from dagster import job @job(tags={"my_unique_tag": "unique_value"}) def my_scheduled_job(): # Define the ops that make up this job ...
2. Configure the Scheduler: If you're using a scheduler to run your job, you can define a schedule that targets this job. The unique tag you've assigned to the job will automatically be applied to runs created by the schedule. python from dagster import schedule @schedule(cron_schedule="0 0 * * *", job=my_scheduled_job) def my_schedule_definition(): # This function returns the run configuration for the scheduled runs return {}
3. Set Up Concurrency Limits: In your dagster.yaml
configuration file, set up the QueuedRunCoordinator
with tag_concurrency_limits
that reference the unique tag you've assigned to your job, as described in the previous message. 4. Launch Runs from the Scheduler: When the scheduler triggers a run of the job, the unique tag will be included, and the QueuedRunCoordinator
will enforce the concurrency limit based on that tag. If you want to set up a unique tag directly from Dagit's Launchpad (the UI for launching runs), you can do so by adding tags in the Launchpad interface before launching the run. However, for scheduled runs, it's more common to define the tags in the job code itself, as the scheduler will use the job definition to create runs at the specified intervals. By following these steps, you ensure that the unique tag is consistently applied to all runs of the job, whether they are manually triggered or launched by a scheduler, and the concurrency limits will be enforced accordingly.