Adam Ward
01/04/2023, 1:36 PMops_tag
on the asset
function in my asset factory
• Setting the run_tags
of the reconciliation sensor
• Setting the tags in the config_schema of the asset
function
In all instances, when the assets are deemed stale by the reconciliation sensor, a single job runs with all stale assets running concurrently. This risks putting too large of a strain on our upstream DB so we have to see this being limited.
Can someone tell me what I might be missing here?run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
- key: "db"
value: "foo"
limit: 1
daniel
01/04/2023, 2:43 PMAdam Ward
01/04/2023, 2:44 PMdefine_asset_job
fn in the repository?daniel
01/04/2023, 2:48 PMAdam Ward
01/04/2023, 2:48 PMdaniel
01/04/2023, 3:18 PMAdam Ward
01/04/2023, 3:20 PMdaniel
01/04/2023, 3:20 PMAdam Ward
01/04/2023, 3:23 PMdaniel
01/04/2023, 3:23 PMAdam Ward
01/04/2023, 3:24 PMdaniel
01/04/2023, 3:24 PMAdam Ward
01/04/2023, 3:25 PMdaniel
01/04/2023, 4:09 PMAdam Ward
01/04/2023, 4:09 PMdaniel
01/04/2023, 4:10 PMAdam Ward
01/06/2023, 3:36 PMbuild_reconciliation_sensor
function creates. I tried passing it in the run_tags
of the reconciliation sensor, but to no avail. Any suggestions?daniel
01/06/2023, 3:47 PMfrom dagster import Definitions, multiprocess_executor
defs = Definitions(
...,
executor=multiprocess_executor.configured(
{
"tag_concurrency_limits": [
{
"key": "database",
"value": "redshift",
"limit": 2,
}
]
}
),
)
from dagster import multiprocess_executor, repository
@repository(
default_executor_def=multiprocess_executor.configured(
{
"tag_concurrency_limits": [
{
"key": "database",
"value": "redshift",
"limit": 2,
}
]
}
)
)
def repo():
...
Adam Ward
01/06/2023, 4:40 PM