https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Adam Ward

01/04/2023, 1:36 PM
Hello - I am using Dagster 1.17 and creating assets using an asset factory approach. I need to limit the number of assets that are materialized at once. I have all assets being triggered by a reconciliation sensor and the assets are currently a one-to-one relationship with their upstream dependencies. I have set a run_coordinator in the dagster.yaml with tag_concurrency_limits to 1 and have tried all of the following to no avail: • Setting the
ops_tag
on the
asset
function in my asset factory • Setting the
run_tags
of the reconciliation sensor • Setting the tags in the config_schema of the
asset
function In all instances, when the assets are deemed stale by the reconciliation sensor, a single job runs with all stale assets running concurrently. This risks putting too large of a strain on our upstream DB so we have to see this being limited. Can someone tell me what I might be missing here?
Hmm - This MR seems to suggest this issue was fixed back in Oct but, if so, I'm not seeing the proper behavior from the run_coordinator. Perhaps this is a run_coordinator issue....? I don't know.... My run_coordinator is as below:
Copy code
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "db"
        value: "foo"
        limit: 1
d

daniel

01/04/2023, 2:43 PM
Hey Adam - I think what you would want for that limit to apply would be a db tag on the dagster job or run. I’ll need to check if the run_tags bullet you mentioned above would work as I’m not yet deeply familiar with the asset reconciliation sensor, but that certainly looks the closest. What that queue config will do once it’s working is make it so that at most one run with that tag can be in progress at once.
a

Adam Ward

01/04/2023, 2:44 PM
Ah ok so I would set that in the
define_asset_job
fn in the repository?
d

daniel

01/04/2023, 2:48 PM
That’s the one I’m confident should work, yeah
a

Adam Ward

01/04/2023, 2:48 PM
Fantastic! Thank you!
condagster 1
d

daniel

01/04/2023, 3:18 PM
I would also expect run_tags on the reconciliation sensor to work though - since that applies tags to launched runs, which are what are checked here
The thing I would check to debug that is whether or not the launched run actually has those tags applied
a

Adam Ward

01/04/2023, 3:20 PM
So, the job is created with the run_tag but all stale assets are added to that one job and get run concurrently.
d

daniel

01/04/2023, 3:20 PM
It sounds like you might be looking for something slightly different than what the QueuedRunCoordinator provides - that only offers concurrency at the run level
We actually just landed the ability to set tag_concurrency_limits as well within a single run - that's going live tomorrow
Would that be more in line with what you're looking for? That would let you say that for each run, at most one asset tagged with db:foo can execute at once (but crucially, it would not apply across multiple runs)
a

Adam Ward

01/04/2023, 3:23 PM
Hmm - What I'm looking for is to limit the strain put on our DB. Each asset being materialized is hitting the DB so, if I am understanding this new functionality correctly, I think that might get us what we need. Then we would just need to ensure that the jobs are also restricted to a single concurrency. Am I following?
d

daniel

01/04/2023, 3:23 PM
Yeah, between those two settings (# of runs at once and # of assets at once within a single run) I think you could have some confidence in the amount of total assets with that tag running
a

Adam Ward

01/04/2023, 3:24 PM
Wonderful! Do you have a version # for that release we can watch for?
d

daniel

01/04/2023, 3:24 PM
1.1.8, planned to go live tomorrow
a

Adam Ward

01/04/2023, 3:25 PM
Excellent! Thank you, Daniel! Much appreciated
condagster 1
Sorry - One more question: Is there an issue or commit for this change? Hoping to point our team to that at standup today.
a

Adam Ward

01/04/2023, 4:09 PM
You rock! Thank you!
d

daniel

01/04/2023, 4:10 PM
a

Adam Ward

01/06/2023, 3:36 PM
Hi @daniel - So, I see the new release and functionality this morning. What I can't figure out, however, is how to pass execution config to the underlying asset job that the
build_reconciliation_sensor
function creates. I tried passing it in the
run_tags
of the reconciliation sensor, but to no avail. Any suggestions?
d

daniel

01/06/2023, 3:47 PM
you can configure those on the default executor for your Definitions / repository: Something like:
Copy code
from dagster import Definitions, multiprocess_executor

defs = Definitions(
    ...,
    executor=multiprocess_executor.configured(
        {
            "tag_concurrency_limits": [
                {
                    "key": "database",
                    "value": "redshift",
                    "limit": 2,
                }
            ]
        }
    ),
)
or if you're using a repository:
Copy code
from dagster import multiprocess_executor, repository


@repository(
    default_executor_def=multiprocess_executor.configured(
        {
            "tag_concurrency_limits": [
                {
                    "key": "database",
                    "value": "redshift",
                    "limit": 2,
                }
            ]
        }
    )
)
def repo():
    ...
a

Adam Ward

01/06/2023, 4:40 PM
Ah! I knew there was a way. I just didn't know where to start. Thank you!
condagster 1
That did it! Man, big kudos to the team! This is fantastic!
👌 1
4 Views