https://dagster.io/ logo
#announcements
Title
# announcements
n

NawafSheikh

03/02/2021, 5:49 AM
Hi, I wanted to ask two questions: 1. Is it possible to run a pipeline when another pipeline finishes it’s task or run? 2. Is it possible for a single solid in a pipeline to have a tag based run concurrency so that if the concurrency is set to 4 and 10 pipelines are running concurrently at a time, then at a point where all these 10 reaches that specific solid, 6 out of 10 waits for 4 solids of different instance of the pipeline to complete?
a

alex

03/02/2021, 4:06 PM
1. The current way this can be done is with sensors https://docs.dagster.io/overview/schedules-sensors/sensors#main 2. The only solid level controlled concurrency set-up we have at this time is our
celery-k8s
executor which allows for different celery queues to enforce solid level concurrency limits
n

NawafSheikh

03/02/2021, 4:32 PM
But in sensors, how are we able to find that a specific pipeline run just got ended and can information from the already executed pipeline could be sent to the next pipeline?
a

alex

03/02/2021, 4:39 PM
This is still a pretty new feature so its not simple to do but it is possible. @prha do we have a pipeline sensor example on hand?
information from the already executed pipeline could be sent to the next pipeline
It depends what exact information you want to access but
IOManager
is a puggable part of the system that can be used to achieve this https://docs.dagster.io/overview/io-managers/io-managers @sandy
p

prha

03/02/2021, 4:51 PM
We don’t have a pipeline sensor example handy - we may need to make some changes to better support cursor-based run queries …. @NawafSheikh what kind of information are you looking to extract from the pipeline?
We’ve seen people gravitating a lot more to asset-based sensors, which could fit your needs as long as your target pipeline emits an asset materialization at its end state
n

NawafSheikh

03/02/2021, 4:55 PM
Basically I want to transfer a result computed in solid and that info gets transfered to the pipeline somehow. And triggers it. But now I'm seeing the assets materialization. Which could work. But how a sensor gets triggered on a new assets creation?
Copy code
@sensor(pipeline_name="my_pipeline")
    def toy_asset_sensor(context):
        events = context.instance.events_for_asset_key(
            AssetKey(["my_asset"]), after_cursor=context.last_run_key, ascending=False, limit=1
        )

        if not events:
            return

        record_id, event = events[0]  # take the most recent materialization
        from_pipeline = event.pipeline_name

        yield RunRequest(run_key=str(record_id), run_config={})
I’m making a note to add a documentation section for asset sensors specifically
s

sandy

03/02/2021, 5:02 PM
@NawafSheikh - out of curiosity, why not combine the two pipelines into a single pipeline? Is it that you want them to execute independently sometimes? Do they have conflicting dependencies that mean they can't live in the same repository? Does it feel too unwieldy to have one massive pipeline?
n

NawafSheikh

03/03/2021, 3:41 AM
Actually the problem of combining two pipeline into one is that one pipeline would have an ability to run multiple instances concurrently based on the resources of the system the dagster is running on. While the second pipeline would eventually have a db bottleneck at the end which needs to be controlled at dagster level. So limited number of pipeline run would be created at a specific time. That is why I asked before if it was possible to have a limit of concurrency based on tags on solids level, so that eventually this one pipeline could have multiple instances based on the resources of the dagster’s system, but would be queued at the last solid.
Thanks @prha. I guess this would work.
p

paul.q

03/11/2021, 2:51 AM
When using asset based sensors, it seems that "flat" asset keys work, but "hieararchical" ones don't. By this I mean that if I want to exploit dagit's ability to folderize asset keys like this:
keys-featureA
into something that looks like
keys/featureA
, the sensor doesn't seem to pick it up. I was looking to follow an approach where we create pipeline dependencies by using a pure semaphore approach based on asset events. So I'd like to have asset keys like "`semaphores-pipelineA`", "`semaphores-pipelineB`", etc. Dagit would just represent them in the Assets catalog as "`semaphores/`" and not clutter up the view. But the sensors don't detect these asset keys. Is this expected behaviour?
p

prha

03/11/2021, 4:42 PM
Hi @paul.q… this is not expected. Do you mind sharing a snippet of code from your asset sensor? I’m am able to successfully detect events for hierarchical asset keys. However,
events_for_asset_key
does not match based on prefix - if that’s the functionality you’re looking for, can you file an issue?
p

paul.q

03/12/2021, 2:27 AM
Hi @prha, hopefully this snippet is good enough to demonstrate. Here, the sensor does not trigger, but if we changed the asset_key to something that doesn't folderize, e.g. "sems_A", it works. Also tried "sems/A" without success.
@dagster.solid()
def solid_a(context)-> dagster.Nothing:
<http://context.log.info|context.log.info>("Starting solid_a")
yield dagster.AssetMaterialization(asset_key="sems-A",description="marker left for pipeline choreography")
@dagster.solid()
def solid_b(context):
<http://context.log.info|context.log.info>("Starting solid_b")
@dagster.pipeline()
def pipeline_a():
solid_a()
@dagster.pipeline()
def pipeline_b():
solid_b()
@dagster.sensor(pipeline_name="pipeline_b")
def semaphore_sensor(context):
events = context.instance.events_for_asset_key(
dagster.AssetKey(["sems-A"])
)
if not events:
return
record_id, event = events[0]  # take the most recent materialization
from_pipeline = event.pipeline_name
yield dagster.RunRequest(run_key=str(record_id), run_config={})
def get_sensors():
return [semaphore_sensor]
@dagster.repository()
def test_semaphores():
return [pipeline_a, pipeline_b] + get_sensors()
I should also point out that we can see the asset key in the catalog after running solid_a, appears as:
sems/
A
using this example.
19 Views