For the experimental global op/asset concurrency l...
# ask-community
c
For the experimental global op/asset concurrency limits, the main thing I'm looking for is avoiding concurrent execution of the same asset op (for reasons like conflicting attempts to refresh the same staged table, leading to duplicated records etc). I'm handling that right now with a Dask scheduler used to handle semaphores, where the semaphore lease name is just
context.<http://node_handle.to|node_handle.to>_string()
for unpartitioned assets, and
f"{context.<http://node_handle.to|node_handle.to>_string()}__{context.asset_partition_key_for_output()}"
if partitioned. I'm wondering if I'm missing something fundamental about the execution model here. It looks like I would need to add a unique concurrency limit for every asset name to achieve this in the experimental setup, which doesn't seem right (like it's so surprising that Dagster doesn't do this already that I feel like I'm missing something basic) In the screenshot here, just showing what happens if I repeatedly click "materialize" on the same asset - those runs+ops all start overlapping execution and lead to various messes in the target table depending on timing (e.g. one drops while another is inserting, etc). It might make sense to have this asset-level op concurrency as a sensible default when the global concurrency limits are enabled, so these asset ops would have queued instead, just as a result of being the same asset op (not pooling for other reasons like resource utilization, just avoiding this kind of data processing error with concurrent execution of the same thing) My approach with the semaphore has this situation covered in the mean time. Ideally I think this would be a default safeguard when the global limits are enabled, so all assets would avoid concurrent execution with themselves without needing to add those asset-level keys for all of them. It looks like it doesn't work that way yet, since this still happens with the experimental feature enabled, but maybe it could.
s
@prha - mind taking a look at this one?
p
Yes, you’re correct… with the current implementation we need to set up a concurrency limit for every asset name. This is because we’re setting up limits at the instance level, not at the definitional level, which is dynamic. Is your proposal to set every limit to
1
by default?
👍 2
c
That's right, 1 as a safe default than people can set to 0 or a higher value if they know it's not affected by problems with concurrent execution. I think of it as an inherent property of being an external asset, so maybe associated with the asset decorator and other methods of asset creation, not generalized to ops.
a
I am facing the same issue and have the same proposal