Hello Dagster, The tool is awesome and has allowed...
# ask-community
r
Hello Dagster, The tool is awesome and has allowed us to implement workflows according to our needs. One of our favorite feature is the ability to limit concurrency using tags. In our use-case, we would like to limit concurrency on operations/assets interacting to our external resources to avoid overconsumption limitation (API call limit). Since asset/op tags only limit within a given run, we have to set run tags through jobs or reconciliation_sensor to ensure all automated executions are tagged according to used resources and per-resource concurrency limits are enforced. • The first issue: manual materializations (either from asset or asset group page) can run without run tags and thus not comply to our concurrency limit. It's especially troublesome when mistakenly launching large backfill without tags. How can we either forbid materialization outside of jobs/schedule/sensor or enforce run tags even on manual adhoc materialization ? • The second issue: auto-materialization seems to slowly replace the previous reconciliation sensor. We are only able to provide global tags to auto-materialize runs so the current per-resource/job/sensor tags strategy does not seem possible with this new approach. Will reconciliation sensor still be improved or auto-materialization will replace it but allow implementation of the "previous" features ?
c
Hi Lucas. These are all good points. I agree limiting concurrency through tags is a powerful feature that currently doesn't translate the best to executing assets, especially in the cases you mentioned like backfills or manual materializations. I think ideally there would be a param on asset construction that accepts a dictionary of tags that are applied to runs. Then, every run that targets that asset would be tagged with all of the given tags. I think we intend on replacing the asset reconciliation sensor with auto-materialize functionality, because auto-materialize offers key improvements such as executing within a daemon rather than user code, etc. But auto-materialization is still actively under development and we recognize limiting concurrency here is valuable. Here's a relevant issue: https://github.com/dagster-io/dagster/issues/14053 Feel free to comment with any thoughts, or create a new issue for enabling the ability to tag runs from the asset definition.
Another hacky workaround would be to add run tags from within the asset, which might help as a short-term solution. You can do this by doing:
Copy code
context.instance.add_run_tags(...)
r
Hello Claire, Thank you for the response. How does Dagster manages tag concurrency if they are added at runtime?
Hello claire, I attempted to test the workaround but it does not seem to work
Copy code
from dagster import asset, Definitions, OpExecutionContext, HourlyPartitionsDefinition
import time

# Assets
@asset(partitions_def=HourlyPartitionsDefinition("2023-05-30-00:00"))
def test_asset(context: OpExecutionContext) -> None:
    time.sleep(20)
    context.instance.add_run_tags(context.run_id, {"foo":"bar"})


defs = Definitions(
    assets = [test_asset],
)
Dagster instance launching a large backfill of above asset is not able to limit concurrency (which could be explained because tag is only computed after asset materialization has started). Is there something I am missing from this workaround ?
c
Hi Lucas--yeah... thinking about it more, I think the workaround doesn't work for the same reasons you mentioned. I think you may just have to file an issue describing the functionality you're looking for