Marcel Steinbach
02/06/2024, 3:30 PMdefine_asset_job
method?Scout
02/06/2024, 3:30 PMdefine_asset_job
method, you can follow these steps: 1. Define your asset job with the define_asset_job
method and include the desired tags within the job definition. These tags will be used to apply concurrency limits. 2. Configure the run_coordinator
in your dagster.yaml
file to include tag_concurrency_limits
. This configuration will specify the concurrency limits based on the tags you set in your job definition. Here's an example of how you might define an asset job with a tag and set up the concurrency limits in the `dagster.yaml`: python # Define your asset job with a tag test_controlled_backfill = define_asset_job( name="test_controlled_backfill", selection=AssetSelection.groups("your_asset_group"), tags={ "serial_backfill": "test_controlled_backfill" } )
And in your `dagster.yaml`: yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "serial_backfill" limit: 1 value: applyLimitPerUniqueValue: true
In this configuration, the tag_concurrency_limits
is set to allow only 1 concurrent run per unique value of the serial_backfill
tag. This means that backfills for the job test_controlled_backfill
will be limited to one run at a time. Please note that the exact configuration might vary depending on your specific use case and the version of Dagster you are using. Always refer to the latest Dagster documentation for the most accurate and up-to-date information.Marcel Steinbach
02/06/2024, 3:32 PMdagster.yaml
, can I also define the tag_concurrency_limits
in the `define_asset_job`'s config
attribute?Scout
02/06/2024, 3:32 PMtag_concurrency_limits
should be specified in the dagster.yaml
file under the run_coordinator
configuration, not directly within the `define_asset_job`'s config
attribute. The tag_concurrency_limits
configuration is part of the instance-wide settings that determine how runs are queued and executed. These settings are not part of the job's definition but rather part of the Dagster instance's operational configuration. To apply concurrency limits based on tags, you should update your dagster.yaml
file as shown in the previous example. This will ensure that the concurrency limits are respected by the Dagster instance when queuing and running jobs.Marcel Steinbach
02/06/2024, 3:35 PMBackfill failed for simple: dagster._core.errors.DagsterInvalidConfigError: Error in config for job meta_assets_job
Error 1: Received unexpected config entry "multiprocess" at path root:execution:config
Scout
02/06/2024, 3:35 PMmultiprocess
in the job configuration. This suggests that there might be a mismatch between the configuration you're providing and what Dagster expects. Based on the documentation, the tag_concurrency_limits
should be set in the dagster.yaml
for the Dagster instance, not within the job's config
attribute. The multiprocess
configuration entry is typically used to configure an executor within a job, but it seems like it's being placed incorrectly or is not expected in the context where you've added it. Here's what you should do: 1. Ensure that the tag_concurrency_limits
are specified in the dagster.yaml
file under the run_coordinator
configuration, not within the job's config
. 2. If you're trying to configure the executor for the job to use multiple processes, make sure you're using the correct configuration structure. For example, if you're using the multiprocess_executor
, it should be configured as follows: python from dagster import job, multiprocess_executor @job(executor_def=multiprocess_executor.configured({"max_concurrent": 4})) def my_job(): ...
3. If you're trying to set tag_concurrency_limits
for the job, this should be done in the dagster.yaml
as part of the run_coordinator
configuration: yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "my_tag" limit: 1
4. If you continue to encounter issues, double-check the documentation for the version of Dagster you're using, as configuration options can change between versions. If you've followed the documentation and are still encountering issues, it might be helpful to provide more context or the specific configuration you're using so that we can troubleshoot further.