NawafSheikh
03/02/2021, 5:49 AMalex
03/02/2021, 4:06 PMcelery-k8s
executor which allows for different celery queues to enforce solid level concurrency limitsNawafSheikh
03/02/2021, 4:32 PMalex
03/02/2021, 4:39 PMinformation from the already executed pipeline could be sent to the next pipelineIt depends what exact information you want to access but
IOManager
is a puggable part of the system that can be used to achieve this
https://docs.dagster.io/overview/io-managers/io-managers
@sandyprha
03/02/2021, 4:51 PMNawafSheikh
03/02/2021, 4:55 PMprha
03/02/2021, 5:00 PM@sensor(pipeline_name="my_pipeline")
def toy_asset_sensor(context):
events = context.instance.events_for_asset_key(
AssetKey(["my_asset"]), after_cursor=context.last_run_key, ascending=False, limit=1
)
if not events:
return
record_id, event = events[0] # take the most recent materialization
from_pipeline = event.pipeline_name
yield RunRequest(run_key=str(record_id), run_config={})
sandy
03/02/2021, 5:02 PMNawafSheikh
03/03/2021, 3:41 AMpaul.q
03/11/2021, 2:51 AMkeys-featureA
into something that looks like keys/featureA
, the sensor doesn't seem to pick it up. I was looking to follow an approach where we create pipeline dependencies by using a pure semaphore approach based on asset events. So I'd like to have asset keys like "`semaphores-pipelineA`", "`semaphores-pipelineB`", etc. Dagit would just represent them in the Assets catalog as "`semaphores/`" and not clutter up the view. But the sensors don't detect these asset keys. Is this expected behaviour?prha
03/11/2021, 4:42 PMevents_for_asset_key
does not match based on prefix - if that’s the functionality you’re looking for, can you file an issue?paul.q
03/12/2021, 2:27 AM@dagster.solid()
def solid_a(context)-> dagster.Nothing:
<http://context.log.info|context.log.info>("Starting solid_a")
yield dagster.AssetMaterialization(asset_key="sems-A",description="marker left for pipeline choreography")
@dagster.solid()
def solid_b(context):
<http://context.log.info|context.log.info>("Starting solid_b")
@dagster.pipeline()
def pipeline_a():
solid_a()
@dagster.pipeline()
def pipeline_b():
solid_b()
@dagster.sensor(pipeline_name="pipeline_b")
def semaphore_sensor(context):
events = context.instance.events_for_asset_key(
dagster.AssetKey(["sems-A"])
)
if not events:
return
record_id, event = events[0] # take the most recent materialization
from_pipeline = event.pipeline_name
yield dagster.RunRequest(run_key=str(record_id), run_config={})
def get_sensors():
return [semaphore_sensor]
@dagster.repository()
def test_semaphores():
return [pipeline_a, pipeline_b] + get_sensors()
I should also point out that we can see the asset key in the catalog after running solid_a, appears as:
sems/
A
using this example.