With software-defined assets, it was really surpri...
# dagster-feedback
c
With software-defined assets, it was really surprising that there isn't anything that stops overlapping execution of the same asset op - for example you can just click "materialize" a bunch of times on the same asset when dagit shows it's already running. I found this out the hard way when I was trying observable source assets with auto-materializing downstream assets, which started many overlapping staging table refresh actions and my process broke down. This is all with non-argument dependencies, if that makes a difference. I'm scrambling now to figure out celery or dask to avoid this problem with multiple users potentially materializing assets on demand.
1
👍 1
👀 2
a
100% agree. I want to use auto-materializing assets so bad to avoid the scheduling i currently have but they're either unpredictable or i dont fully get their execution.
v
I suspect you might be able to limit job concurrency using this approach: https://github.com/dagster-io/dagster/discussions/12251 Tag name is
auto_materialize
c
Sure, I'll give that auto_materialize tag limit a shot. It looks like using dagster-dask solved the problem with avoiding concurrent execution of the same asset materialization op. (this is a more general problem that isn't specific to auto-materialization). Dask distributed's Locks and Semaphores might be useful here.
s
@prha has been working on adding functionality that can limit concurrency at the level of individual assets. I think that would help here? Here's the issue where we're tracking this: https://github.com/dagster-io/dagster/issues/12470
👍 1
I found this out the hard way when I was trying observable source assets with auto-materializing downstream assets, which started many overlapping staging table refresh actions and my process broke down.
I believe this behavior is caused by a bug that is fixed in our latest release.
c
Following up on this one for @prha: yes, Dask's semaphores have been useful for me so far. I have this working with Dask scheduler only used for central Semaphore coordination (no Dask workers), and executing with multiprocess executor. I had problems with dagster-dask not reloading my modules and not working with observable source assets. It's a simple approach:
Copy code
def asset_lock(context: OpExecutionContext):
    """Uses dask.distributed Semaphore to prevent concurrent execution of the same asset op."""
    from distributed import Client, Semaphore

    dask_client = Client(address=DASK_SCHEDULER)
    lease_name = context.assets_def.asset_key.to_user_string()
    context.log.debug(f"Dask semaphore lease request: {lease_name}")
    return Semaphore(name=lease_name, max_leases=1)

@asset(...)
def asset_op(context: OpExecutionContext):
  with asset_lock(context):
    context.log.debug(f"Dask semaphore lease acquired")
    ... # op starts
Using a semaphore with max_leases=1 gets useful default behaviour for heartbeating active leases to avoid stuck locks (I'm not sure if Lock does the same thing yet). Dask cautions to avoid long-running tasks that tie up GIL and prevent heartbeating, so maybe this isn't valid for certain long-running ops. Redis could be another option. Sticking to postgres, a similar approach with leases that require heartbeating instead of row lock-for-update might work - avoids postgres concurrent connection problems.
@sandy This dask-scheduler semaphore setup also works with observable_source_asset and auto_materializing. Those observable sources assets don't have context.assets_def, so I used node_handle instead, which works for both assets and observable source assets.
Copy code
lease_name = context.node_handle.to_string()
I haven't done anything with partitions here but hopefully this is helpful.