Henry Tareque
03/07/2024, 10:11 PMScout
03/07/2024, 10:11 PMyaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "job" value: "my_unique_job_tag" limit: 1
And in your job definition, you would include the tag: python @job(tags={"job": "my_unique_job_tag"}) def my_job(): ...
2. Custom Sensor Logic: If you are using sensors to trigger your job runs, you can write custom logic in your sensor to check if there are any runs of the job currently in progress or queued. If a run is detected, the sensor can skip triggering a new run. Here's a rough example of how you might implement this: python @sensor(job=my_job) def my_sensor(context): runs = context.instance.get_runs( filters=RunsFilter( job_name=my_job.name, statuses=[DagsterRunStatus.STARTED, DagsterRunStatus.QUEUED], ), limit=1, ) if runs: # Skip if there's already a run in progress or queued return SkipReason(f"{my_job.name} is already running or queued.") else: # Trigger a new run if no runs are in progress or queued return RunRequest(run_key="unique_run_key")
3. Queued Run Coordinator: The QueuedRunCoordinator can be configured to manage run queuing and execution, including setting concurrency limits. This is similar to the tag concurrency limits method but managed through the run coordinator. Please note that the exact implementation details may vary based on your specific Dagster version and setup. The provided examples are based on the documentation and should be adapted to fit your environment. If you need further assistance with setting this up, please refer to the Dagster documentation or consider reaching out to the community for more detailed guidance.