https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

Slackbot

02/06/2024, 3:27 PM
This message was deleted.
s

Scout

02/06/2024, 3:27 PM
To run a backfill job for an asset with limited concurrency based on a tag, you can follow these steps: 1. Define the Asset Job with Tags: When defining your asset job, you can specify tags that will be used to control concurrency. For example, you can define an asset job with a specific tag that indicates it should be run serially.
Copy code
python     @asset(op_tags={"my_concurrency_tag": "my_value"})     def my_asset():         ...      assets_job = define_asset_job(         name="assets_job",         # ... other configurations ...         tags={"my_concurrency_tag": "my_value"}     )
2. Configure the Run Queue: In your
dagster.yaml
configuration file, set up a run queue with
tag_concurrency_limits
to control the concurrency based on the tag you've defined. You can apply a limit for each unique value of a tag using
applyLimitPerUniqueValue
.
Copy code
yaml     run_coordinator:       module: dagster.core.run_coordinator       class: QueuedRunCoordinator       config:         tag_concurrency_limits:           - key: "my_concurrency_tag"             limit: 1             value:               applyLimitPerUniqueValue: true
With this configuration, each unique value of the
my_concurrency_tag
tag will be limited to one concurrent run. 3. Launch the Backfill: When launching the backfill from the Dagster UI, ensure that the tags you've defined in the asset job are included. If you're using the Dagster UI, you can supply a set of tags in the backfill dialog. By following these steps, you can control the concurrency of backfill jobs for assets using tags. The asset job will have the necessary tags, and the run queue will enforce the concurrency limits based on those tags. Please note that the exact syntax for defining assets and jobs, as well as the configuration for the run queue, may vary depending on the version of Dagster you are using. Always refer to the latest documentation for the most accurate and up-to-date information.
m

Marcel Steinbach

02/06/2024, 3:48 PM
@Scout can I also configure the concurrency limit for the tag in
define_asset_job
?
s

Scout

02/06/2024, 3:48 PM
No, you cannot configure the concurrency limit for a tag directly within the
define_asset_job
function. The
define_asset_job
function is used to define a job that materializes a set of assets, and while you can specify tags for the job using the
tags
parameter, the concurrency limits themselves are not set at the job level. To configure concurrency limits based on tags, you need to use the run queue configuration in your
dagster.yaml
file. The
tag_concurrency_limits
configuration within the run coordinator section is where you specify the limits for tags. Here's an example of how you might configure this in `dagster.yaml`:
Copy code
yaml run_coordinator:   module: dagster.core.run_coordinator   class: QueuedRunCoordinator   config:     tag_concurrency_limits:       - key: "my_concurrency_tag"         limit: 1         value:           applyLimitPerUniqueValue: true
This configuration ensures that runs with the same tag value will be limited according to the specified concurrency limit. The
define_asset_job
function will only associate the tags with the job, but the actual enforcement of concurrency limits based on those tags is handled by the run coordinator as specified in the
dagster.yaml
configuration.
As of the latest information available to me, you cannot directly configure concurrency limits for a tag within the
define_asset_job
function. The
define_asset_job
function is used to define a job that materializes a set of assets, and while you can specify tags for the job using the
tags
parameter, the concurrency limits based on tags are typically configured at the instance level in the
dagster.yaml
file, not within the job definition itself. To enforce concurrency limits based on tags, you would use the
tag_concurrency_limits
configuration in the
dagster.yaml
file under the
run_coordinator
section. This is where you specify the limits for tags that will be respected by the run coordinator when queuing runs. Here's an example of how you might configure this in `dagster.yaml`:
Copy code
yaml run_coordinator:   module: dagster.core.run_coordinator   class: QueuedRunCoordinator   config:     tag_concurrency_limits:       - key: "my_concurrency_tag"         limit: 1         value:           applyLimitPerUniqueValue: true
This configuration ensures that runs with the same tag value will be limited according to the specified concurrency limit. The
define_asset_job
function will only associate the tags with the job, but the actual enforcement of concurrency limits based on those tags is handled by the run coordinator as specified in the
dagster.yaml
configuration. If you need to apply concurrency limits directly within the job definition, you would typically do so by configuring the executor within the job's configuration, but this would not be based on tags—it would be a general concurrency limit for the job's execution. For example, using the
multiprocess_executor
, you can limit the number of processes:
Copy code
python from dagster import multiprocess_executor, define_asset_job  assets_job = define_asset_job(     name="assets_job",     config={         "execution": {             "config": {                 "multiprocess": {                     "max_concurrent": 3,  # limits concurrent asset computations to 3                 },             }         }     } )
Please note that the exact syntax and capabilities may vary depending on the version of Dagster you are using, and you should always refer to the latest documentation for the most accurate and up-to-date information.
m

Marcel Steinbach

02/06/2024, 3:50 PM
@Scout how do I then apply different concurrency limits for different assets?
s

Scout

02/06/2024, 3:50 PM
To apply different concurrency limits for different assets, you can use tags in combination with the
tag_concurrency_limits
configuration in your
dagster.yaml
file. This allows you to specify limits for assets based on the tags associated with them. Here's how you can do it: 1. Tag Your Assets: When defining your assets, you can use the
op_tags
parameter to apply tags that will be used for concurrency control.
Copy code
python     @asset(op_tags={"concurrency_group": "group_a"})     def asset_group_a():         # ...      @asset(op_tags={"concurrency_group": "group_b"})     def asset_group_b():         # ...
2. Configure Tag Concurrency Limits: In your
dagster.yaml
file, configure the
tag_concurrency_limits
under the
run_coordinator
section to apply different concurrency limits based on the tags you've defined.
Copy code
yaml     run_coordinator:       module: dagster.core.run_coordinator       class: QueuedRunCoordinator       config:         tag_concurrency_limits:           - key: "concurrency_group"             value: "group_a"             limit: 1  # Only one run at a time for assets tagged with group_a           - key: "concurrency_group"             value: "group_b"             limit: 2  # Up to two runs at a time for assets tagged with group_b
With this configuration, assets tagged with
concurrency_group: group_a
will be limited to one concurrent run, while assets tagged with
concurrency_group: group_b
will be allowed up to two concurrent runs. 3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on the tags associated with each asset. By using tags and configuring
tag_concurrency_limits
in the
dagster.yaml
file, you can control the concurrency of asset computations at a granular level. This setup allows different assets to have different concurrency limits based on their resource requirements or other considerations. Remember to always check the latest Dagster documentation for any updates or changes to how concurrency limits and tags are handled.
m

Marcel Steinbach

02/06/2024, 3:51 PM
@Scout I need this in combination with a limit on the partition tag. How do I do that?
s

Scout

02/06/2024, 3:52 PM
To apply different concurrency limits for different assets in combination with a limit on the partition tag, you can use a combination of asset-specific tags and partition tags. Here's how you can set this up: 1. Tag Your Assets: Define your assets with specific tags that identify their concurrency group. Additionally, ensure that your assets are partitioned and that runs are tagged with partition information.
Copy code
python
    @asset(op_tags={"concurrency_group": "group_a"})
    def asset_group_a_partitioned():
        # ...

    @asset(op_tags={"concurrency_group": "group_b"})
    def asset_group_b_partitioned():
        # ...
2. Configure Tag Concurrency Limits: In your
dagster.yaml
file, configure the
tag_concurrency_limits
under the
run_coordinator
section to apply different concurrency limits based on the tags you've defined. You can include limits for both the concurrency group and the partition tag.
Copy code
yaml
    run_coordinator:
      module: dagster.core.run_coordinator
      class: QueuedRunCoordinator
      config:
        tag_concurrency_limits:
          - key: "concurrency_group"
            value: "group_a"
            limit: 1  # Only one run at a time for assets tagged with group_a
          - key: "concurrency_group"
            value: "group_b"
            limit: 2  # Up to two runs at a time for assets tagged with group_b
          - key: "dagster/partition"
            limit: 1
            applyLimitPerUniqueValue: true  # Only one run at a time per unique partition value
With this configuration, you enforce that only one run at a time can execute for each unique partition value due to the
dagster/partition
tag limit. Additionally, you enforce concurrency limits for assets in different concurrency groups. 3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on both the asset-specific tags and the partition tags. By combining asset-specific tags with partition tags, you can control concurrency at two levels: ensuring that runs for the same partition do not overlap and that assets within the same concurrency group adhere to their specified limits. Please note that the
dagster/partition
tag is automatically applied by Dagster when you launch partitioned runs. If you have custom partition tags or use a different tagging strategy, you'll need to adjust the tag keys in the
tag_concurrency_limits
configuration accordingly. Always refer to the latest Dagster documentation to ensure that your configuration aligns with the current capabilities and syntax of the platform. How does one backfill a huge selection o... button Thank you for your feedback!
m

Marcel Steinbach

02/06/2024, 4:13 PM
@Scout I have two assets, both using the the same partition scheme. I need the first asset to run for only one partition at a time, the second can run up to 10 partitions at a time.
s

Scout

02/06/2024, 4:13 PM
To achieve different concurrency limits for two assets that use the same partition scheme, you can use a combination of asset-specific tags and the
tag_concurrency_limits
configuration in your
dagster.yaml
file. Here's how you can set this up: 1. Tag Your Assets: Define your assets with specific tags that identify their concurrency group. Make sure that these tags are unique to each asset if they need different concurrency behaviors.
Copy code
python
    @asset(op_tags={"concurrency_limit": "single_partition"})
    def asset_single_partition():
        # ...

    @asset(op_tags={"concurrency_limit": "multiple_partitions"})
    def asset_multiple_partitions():
        # ...
2. Configure Tag Concurrency Limits: In your
dagster.yaml
file, configure the
tag_concurrency_limits
under the
run_coordinator
section to apply different concurrency limits based on the tags you've defined. You can include limits for both the concurrency group and the partition tag.
Copy code
yaml
    run_coordinator:
      module: dagster.core.run_coordinator
      class: QueuedRunCoordinator
      config:
        tag_concurrency_limits:
          - key: "concurrency_limit"
            value: "single_partition"
            limit: 1  # Only one run at a time for the asset tagged with single_partition
          - key: "concurrency_limit"
            value: "multiple_partitions"
            limit: 10  # Up to ten runs at a time for the asset tagged with multiple_partitions
          - key: "dagster/partition_set"
            limit: 1
            applyLimitPerUniqueValue: true  # Only one run at a time per unique partition set value
With this configuration, you enforce that only one run at a time can execute for each unique partition set value due to the
dagster/partition_set
tag limit. Additionally, you enforce concurrency limits for each asset based on their specific concurrency tags. 3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on both the asset-specific tags and the partition tags. By using asset-specific tags and configuring
tag_concurrency_limits
in the
dagster.yaml
file, you can control concurrency at two levels: ensuring that runs for the same partition do not overlap and that each asset adheres to its specified concurrency limit. Please note that the
dagster/partition_set
tag is an example, and you should use the actual tag that Dagster applies for partitioned runs. If you have custom partition tags or use a different tagging strategy, you'll need to adjust the tag keys in the
tag_concurrency_limits
configuration accordingly. Always refer to the latest Dagster documentation to ensure that your configuration aligns with the current capabilities and syn
tax of the platform.
m

Marcel Steinbach

02/06/2024, 4:18 PM
tax of the platform.
@Scout 😂 what do you mean by that?
s

Scout

02/06/2024, 4:18 PM
Apologies for the confusion—that was a typographical error. I meant to refer to the "syntax of the platform," which means the correct way to write configurations and code according to the rules and capabilities of the Dagster platform. It's important to follow the syntax outlined in the Dagster documentation to ensure that your configurations and code work as expected. If you have any more questions or need further clarification, feel free to ask!
m

Marcel Steinbach

02/06/2024, 4:20 PM
@Scout the two assets would not have completely independent scheduling behaviour though.
s

Scout

02/06/2024, 4:20 PM
I understand your concern. If you have two assets that use the same partition scheme but require different concurrency limits for their runs, you can achieve this by using a combination of tags and the
tag_concurrency_limits
configuration in your
dagster.yaml
file. Here's how you can set up independent scheduling behavior for each asset: 1. Define Your Assets with Tags: When defining your assets, use the
op_tags
parameter to apply unique tags that will be used for concurrency control. Make sure to include a tag that represents the partition, which will be common to both assets, and a tag that represents the concurrency group, which will be unique to each asset.
Copy code
python
    from dagster import asset, OpExecutionContext, DailyPartitionsDefinition

    @asset(partitions_def=DailyPartitionsDefinition(), op_tags={"concurrency_group": "asset_1"})
    def asset_1(context: OpExecutionContext):
        date = context.partition_key
        # Your computation logic here

    @asset(partitions_def=DailyPartitionsDefinition(), op_tags={"concurrency_group": "asset_2"})
    def asset_2(context: OpExecutionContext):
        date = context.partition_key
        # Your computation logic here
2. Configure Tag Concurrency Limits: In your
dagster.yaml
file, configure the
tag_concurrency_limits
under the
run_coordinator
section to apply different concurrency limits based on the tags you've defined.
Copy code
yaml
    run_coordinator:
      module: dagster.core.run_coordinator
      class: QueuedRunCoordinator
      config:
        tag_concurrency_limits:
          - key: "concurrency_group"
            value: "asset_1"
            limit: 1  # Only one run at a time for asset_1
          - key: "concurrency_group"
            value: "asset_2"
            limit: 10  # Up to ten runs at a time for asset_2
With this configuration,
asset_1
will only allow one run at a time for a given partition, while
asset_2
will allow up to ten concurrent runs for different partitions. 3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on the tags associated with each asset. The partition tag ensures that runs for the same partition are coordinated, while the concurrency group tag allows you to set different concurrency limits for each asset. By using this approach, you can have two assets with the same partition scheme but with different concurrency behaviors, as enforced by the run coordinator based on the tags and the
tag_concurrency_limits
configuration. Please ensure that the tags you use match the ones specified in the
tag_concurrency_limits
configuration and that your assets are correctly partitioned. Always refer to the latest Dagster documentation for the
most accurate guidance on configuring concurrency limits and tags. Introduce automaterialize policy(s) that... button Thank you for your feedback!