Hello - I am using Dagster 1.17 and creating asset...
# ask-community
a
Hello - I am using Dagster 1.17 and creating assets using an asset factory approach. I need to limit the number of assets that are materialized at once. I have all assets being triggered by a reconciliation sensor and the assets are currently a one-to-one relationship with their upstream dependencies. I have set a run_coordinator in the dagster.yaml with tag_concurrency_limits to 1 and have tried all of the following to no avail: • Setting the
ops_tag
on the
asset
function in my asset factory • Setting the
run_tags
of the reconciliation sensor • Setting the tags in the config_schema of the
asset
function In all instances, when the assets are deemed stale by the reconciliation sensor, a single job runs with all stale assets running concurrently. This risks putting too large of a strain on our upstream DB so we have to see this being limited. Can someone tell me what I might be missing here?
Hmm - This MR seems to suggest this issue was fixed back in Oct but, if so, I'm not seeing the proper behavior from the run_coordinator. Perhaps this is a run_coordinator issue....? I don't know.... My run_coordinator is as below:
Copy code
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "db"
        value: "foo"
        limit: 1
d
Hey Adam - I think what you would want for that limit to apply would be a db tag on the dagster job or run. I’ll need to check if the run_tags bullet you mentioned above would work as I’m not yet deeply familiar with the asset reconciliation sensor, but that certainly looks the closest. What that queue config will do once it’s working is make it so that at most one run with that tag can be in progress at once.
a
Ah ok so I would set that in the
define_asset_job
fn in the repository?
d
That’s the one I’m confident should work, yeah
a
Fantastic! Thank you!
condagster 1
d
I would also expect run_tags on the reconciliation sensor to work though - since that applies tags to launched runs, which are what are checked here
The thing I would check to debug that is whether or not the launched run actually has those tags applied
a
So, the job is created with the run_tag but all stale assets are added to that one job and get run concurrently.
d
It sounds like you might be looking for something slightly different than what the QueuedRunCoordinator provides - that only offers concurrency at the run level
We actually just landed the ability to set tag_concurrency_limits as well within a single run - that's going live tomorrow
Would that be more in line with what you're looking for? That would let you say that for each run, at most one asset tagged with db:foo can execute at once (but crucially, it would not apply across multiple runs)
a
Hmm - What I'm looking for is to limit the strain put on our DB. Each asset being materialized is hitting the DB so, if I am understanding this new functionality correctly, I think that might get us what we need. Then we would just need to ensure that the jobs are also restricted to a single concurrency. Am I following?
d
Yeah, between those two settings (# of runs at once and # of assets at once within a single run) I think you could have some confidence in the amount of total assets with that tag running
a
Wonderful! Do you have a version # for that release we can watch for?
d
1.1.8, planned to go live tomorrow
a
Excellent! Thank you, Daniel! Much appreciated
condagster 1
Sorry - One more question: Is there an issue or commit for this change? Hoping to point our team to that at standup today.
a
You rock! Thank you!
d
a
Hi @daniel - So, I see the new release and functionality this morning. What I can't figure out, however, is how to pass execution config to the underlying asset job that the
build_reconciliation_sensor
function creates. I tried passing it in the
run_tags
of the reconciliation sensor, but to no avail. Any suggestions?
d
you can configure those on the default executor for your Definitions / repository: Something like:
Copy code
from dagster import Definitions, multiprocess_executor

defs = Definitions(
    ...,
    executor=multiprocess_executor.configured(
        {
            "tag_concurrency_limits": [
                {
                    "key": "database",
                    "value": "redshift",
                    "limit": 2,
                }
            ]
        }
    ),
)
or if you're using a repository:
Copy code
from dagster import multiprocess_executor, repository


@repository(
    default_executor_def=multiprocess_executor.configured(
        {
            "tag_concurrency_limits": [
                {
                    "key": "database",
                    "value": "redshift",
                    "limit": 2,
                }
            ]
        }
    )
)
def repo():
    ...
a
Ah! I knew there was a way. I just didn't know where to start. Thank you!
condagster 1
That did it! Man, big kudos to the team! This is fantastic!
👌 1