Slackbot
02/06/2024, 3:27 PMScout
02/06/2024, 3:27 PMpython @asset(op_tags={"my_concurrency_tag": "my_value"}) def my_asset(): ... assets_job = define_asset_job( name="assets_job", # ... other configurations ... tags={"my_concurrency_tag": "my_value"} )
2. Configure the Run Queue: In your dagster.yaml
configuration file, set up a run queue with tag_concurrency_limits
to control the concurrency based on the tag you've defined. You can apply a limit for each unique value of a tag using applyLimitPerUniqueValue
. yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "my_concurrency_tag" limit: 1 value: applyLimitPerUniqueValue: true
With this configuration, each unique value of the my_concurrency_tag
tag will be limited to one concurrent run. 3. Launch the Backfill: When launching the backfill from the Dagster UI, ensure that the tags you've defined in the asset job are included. If you're using the Dagster UI, you can supply a set of tags in the backfill dialog. By following these steps, you can control the concurrency of backfill jobs for assets using tags. The asset job will have the necessary tags, and the run queue will enforce the concurrency limits based on those tags. Please note that the exact syntax for defining assets and jobs, as well as the configuration for the run queue, may vary depending on the version of Dagster you are using. Always refer to the latest documentation for the most accurate and up-to-date information.Marcel Steinbach
02/06/2024, 3:48 PMdefine_asset_job
?Scout
02/06/2024, 3:48 PMdefine_asset_job
function. The define_asset_job
function is used to define a job that materializes a set of assets, and while you can specify tags for the job using the tags
parameter, the concurrency limits themselves are not set at the job level. To configure concurrency limits based on tags, you need to use the run queue configuration in your dagster.yaml
file. The tag_concurrency_limits
configuration within the run coordinator section is where you specify the limits for tags. Here's an example of how you might configure this in `dagster.yaml`: yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "my_concurrency_tag" limit: 1 value: applyLimitPerUniqueValue: true
This configuration ensures that runs with the same tag value will be limited according to the specified concurrency limit. The define_asset_job
function will only associate the tags with the job, but the actual enforcement of concurrency limits based on those tags is handled by the run coordinator as specified in the dagster.yaml
configuration.Scout
02/06/2024, 3:49 PMdefine_asset_job
function. The define_asset_job
function is used to define a job that materializes a set of assets, and while you can specify tags for the job using the tags
parameter, the concurrency limits based on tags are typically configured at the instance level in the dagster.yaml
file, not within the job definition itself. To enforce concurrency limits based on tags, you would use the tag_concurrency_limits
configuration in the dagster.yaml
file under the run_coordinator
section. This is where you specify the limits for tags that will be respected by the run coordinator when queuing runs. Here's an example of how you might configure this in `dagster.yaml`: yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "my_concurrency_tag" limit: 1 value: applyLimitPerUniqueValue: true
This configuration ensures that runs with the same tag value will be limited according to the specified concurrency limit. The define_asset_job
function will only associate the tags with the job, but the actual enforcement of concurrency limits based on those tags is handled by the run coordinator as specified in the dagster.yaml
configuration. If you need to apply concurrency limits directly within the job definition, you would typically do so by configuring the executor within the job's configuration, but this would not be based on tags—it would be a general concurrency limit for the job's execution. For example, using the multiprocess_executor
, you can limit the number of processes: python from dagster import multiprocess_executor, define_asset_job assets_job = define_asset_job( name="assets_job", config={ "execution": { "config": { "multiprocess": { "max_concurrent": 3, # limits concurrent asset computations to 3 }, } } } )
Please note that the exact syntax and capabilities may vary depending on the version of Dagster you are using, and you should always refer to the latest documentation for the most accurate and up-to-date information.Marcel Steinbach
02/06/2024, 3:50 PMScout
02/06/2024, 3:50 PMtag_concurrency_limits
configuration in your dagster.yaml
file. This allows you to specify limits for assets based on the tags associated with them. Here's how you can do it: 1. Tag Your Assets: When defining your assets, you can use the op_tags
parameter to apply tags that will be used for concurrency control. python @asset(op_tags={"concurrency_group": "group_a"}) def asset_group_a(): # ... @asset(op_tags={"concurrency_group": "group_b"}) def asset_group_b(): # ...
2. Configure Tag Concurrency Limits: In your dagster.yaml
file, configure the tag_concurrency_limits
under the run_coordinator
section to apply different concurrency limits based on the tags you've defined. yaml run_coordinator: module: dagster.core.run_coordinator class: QueuedRunCoordinator config: tag_concurrency_limits: - key: "concurrency_group" value: "group_a" limit: 1 # Only one run at a time for assets tagged with group_a - key: "concurrency_group" value: "group_b" limit: 2 # Up to two runs at a time for assets tagged with group_b
With this configuration, assets tagged with concurrency_group: group_a
will be limited to one concurrent run, while assets tagged with concurrency_group: group_b
will be allowed up to two concurrent runs. 3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on the tags associated with each asset. By using tags and configuring tag_concurrency_limits
in the dagster.yaml
file, you can control the concurrency of asset computations at a granular level. This setup allows different assets to have different concurrency limits based on their resource requirements or other considerations. Remember to always check the latest Dagster documentation for any updates or changes to how concurrency limits and tags are handled.Marcel Steinbach
02/06/2024, 3:51 PMScout
02/06/2024, 3:52 PMpython
@asset(op_tags={"concurrency_group": "group_a"})
def asset_group_a_partitioned():
# ...
@asset(op_tags={"concurrency_group": "group_b"})
def asset_group_b_partitioned():
# ...
2. Configure Tag Concurrency Limits: In your dagster.yaml
file, configure the tag_concurrency_limits
under the run_coordinator
section to apply different concurrency limits based on the tags you've defined. You can include limits for both the concurrency group and the partition tag.
yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "concurrency_group"
value: "group_a"
limit: 1 # Only one run at a time for assets tagged with group_a
- key: "concurrency_group"
value: "group_b"
limit: 2 # Up to two runs at a time for assets tagged with group_b
- key: "dagster/partition"
limit: 1
applyLimitPerUniqueValue: true # Only one run at a time per unique partition value
With this configuration, you enforce that only one run at a time can execute for each unique partition value due to the dagster/partition
tag limit. Additionally, you enforce concurrency limits for assets in different concurrency groups.
3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on both the asset-specific tags and the partition tags.
By combining asset-specific tags with partition tags, you can control concurrency at two levels: ensuring that runs for the same partition do not overlap and that assets within the same concurrency group adhere to their specified limits.
Please note that the dagster/partition
tag is automatically applied by Dagster when you launch partitioned runs. If you have custom partition tags or use a different tagging strategy, you'll need to adjust the tag keys in the tag_concurrency_limits
configuration accordingly.
Always refer to the latest Dagster documentation to ensure that your configuration aligns with the current capabilities and syntax of the platform. How does one backfill a huge selection o... button Thank you for your feedback!Marcel Steinbach
02/06/2024, 4:13 PMScout
02/06/2024, 4:13 PMtag_concurrency_limits
configuration in your dagster.yaml
file. Here's how you can set this up:
1. Tag Your Assets: Define your assets with specific tags that identify their concurrency group. Make sure that these tags are unique to each asset if they need different concurrency behaviors.
python
@asset(op_tags={"concurrency_limit": "single_partition"})
def asset_single_partition():
# ...
@asset(op_tags={"concurrency_limit": "multiple_partitions"})
def asset_multiple_partitions():
# ...
2. Configure Tag Concurrency Limits: In your dagster.yaml
file, configure the tag_concurrency_limits
under the run_coordinator
section to apply different concurrency limits based on the tags you've defined. You can include limits for both the concurrency group and the partition tag.
yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "concurrency_limit"
value: "single_partition"
limit: 1 # Only one run at a time for the asset tagged with single_partition
- key: "concurrency_limit"
value: "multiple_partitions"
limit: 10 # Up to ten runs at a time for the asset tagged with multiple_partitions
- key: "dagster/partition_set"
limit: 1
applyLimitPerUniqueValue: true # Only one run at a time per unique partition set value
With this configuration, you enforce that only one run at a time can execute for each unique partition set value due to the dagster/partition_set
tag limit. Additionally, you enforce concurrency limits for each asset based on their specific concurrency tags.
3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on both the asset-specific tags and the partition tags.
By using asset-specific tags and configuring tag_concurrency_limits
in the dagster.yaml
file, you can control concurrency at two levels: ensuring that runs for the same partition do not overlap and that each asset adheres to its specified concurrency limit.
Please note that the dagster/partition_set
tag is an example, and you should use the actual tag that Dagster applies for partitioned runs. If you have custom partition tags or use a different tagging strategy, you'll need to adjust the tag keys in the tag_concurrency_limits
configuration accordingly.
Always refer to the latest Dagster documentation to ensure that your configuration aligns with the current capabilities and synScout
02/06/2024, 4:14 PMMarcel Steinbach
02/06/2024, 4:18 PMtax of the platform.@Scout 😂 what do you mean by that?
Scout
02/06/2024, 4:18 PMMarcel Steinbach
02/06/2024, 4:20 PMScout
02/06/2024, 4:20 PMtag_concurrency_limits
configuration in your dagster.yaml
file. Here's how you can set up independent scheduling behavior for each asset:
1. Define Your Assets with Tags: When defining your assets, use the op_tags
parameter to apply unique tags that will be used for concurrency control. Make sure to include a tag that represents the partition, which will be common to both assets, and a tag that represents the concurrency group, which will be unique to each asset.
python
from dagster import asset, OpExecutionContext, DailyPartitionsDefinition
@asset(partitions_def=DailyPartitionsDefinition(), op_tags={"concurrency_group": "asset_1"})
def asset_1(context: OpExecutionContext):
date = context.partition_key
# Your computation logic here
@asset(partitions_def=DailyPartitionsDefinition(), op_tags={"concurrency_group": "asset_2"})
def asset_2(context: OpExecutionContext):
date = context.partition_key
# Your computation logic here
2. Configure Tag Concurrency Limits: In your dagster.yaml
file, configure the tag_concurrency_limits
under the run_coordinator
section to apply different concurrency limits based on the tags you've defined.
yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "concurrency_group"
value: "asset_1"
limit: 1 # Only one run at a time for asset_1
- key: "concurrency_group"
value: "asset_2"
limit: 10 # Up to ten runs at a time for asset_2
With this configuration, asset_1
will only allow one run at a time for a given partition, while asset_2
will allow up to ten concurrent runs for different partitions.
3. Launch Runs: When you launch runs for these assets, the run coordinator will enforce the concurrency limits based on the tags associated with each asset. The partition tag ensures that runs for the same partition are coordinated, while the concurrency group tag allows you to set different concurrency limits for each asset.
By using this approach, you can have two assets with the same partition scheme but with different concurrency behaviors, as enforced by the run coordinator based on the tags and the tag_concurrency_limits
configuration.
Please ensure that the tags you use match the ones specified in the tag_concurrency_limits
configuration and that your assets are correctly partitioned. Always refer to the latest Dagster documentation for theScout
02/06/2024, 4:21 PM