Chris Comeau
05/17/2023, 1:27 PMArthur
05/17/2023, 1:45 PMVitaly Markov
05/17/2023, 4:21 PMauto_materialize
Chris Comeau
05/17/2023, 4:44 PMsandy
05/17/2023, 8:10 PMI found this out the hard way when I was trying observable source assets with auto-materializing downstream assets, which started many overlapping staging table refresh actions and my process broke down.I believe this behavior is caused by a bug that is fixed in our latest release.
Chris Comeau
05/24/2023, 8:32 PMdef asset_lock(context: OpExecutionContext):
"""Uses dask.distributed Semaphore to prevent concurrent execution of the same asset op."""
from distributed import Client, Semaphore
dask_client = Client(address=DASK_SCHEDULER)
lease_name = context.assets_def.asset_key.to_user_string()
context.log.debug(f"Dask semaphore lease request: {lease_name}")
return Semaphore(name=lease_name, max_leases=1)
@asset(...)
def asset_op(context: OpExecutionContext):
with asset_lock(context):
context.log.debug(f"Dask semaphore lease acquired")
... # op starts
Using a semaphore with max_leases=1 gets useful default behaviour for heartbeating active leases to avoid stuck locks (I'm not sure if Lock does the same thing yet).
Dask cautions to avoid long-running tasks that tie up GIL and prevent heartbeating, so maybe this isn't valid for certain long-running ops. Redis could be another option.
Sticking to postgres, a similar approach with leases that require heartbeating instead of row lock-for-update might work - avoids postgres concurrent connection problems.lease_name = context.node_handle.to_string()
I haven't done anything with partitions here but hopefully this is helpful.