https://dagster.io/ logo
Title
a

Arun Kumar

07/29/2021, 2:32 AM
Hi team, whenever there is backfill triggered on an upstream pipeline, our sensors will automatically trigger all the downstream pipelines based on the assets produced by the upstream pipelines. But currently these downstream pipelines runs are triggered as normal runs, Is there any way by which I can trigger them as backfill runs? How would I identify if the assets are produced by backfill runs or normal runs?
p

prha

07/29/2021, 3:13 AM
hmm, this isn’t exactly easy, but the
event_log_entry
from your asset sensor should have a
run_id
property on it that you can use to look up the pipeline run tags
a

Arun Kumar

07/29/2021, 3:18 AM
Got it. Thanks Phil. Would there be any issue or conflict if I manually set the
dagster/backfill
tag from the sensor ? Not sure what backfill id I would fill in though, but for now I just want to differentiate between normal vs backfill runs in my solid
p

prha

07/29/2021, 3:23 AM
hmm, I’m not sure if I’m following. the tags are just to distinguish backfill-initiated runs from normal runs. Is this to populate partition information on the runs?
I originally thought from your question that you were trying to detect within the sensor whether the asset materialization itself was coming from a backfill run
a

Arun Kumar

07/29/2021, 3:26 AM
Yes, with in the sensor I would detect if an asset from an upstream pipeline was from a backfill run or a normal run. If it was from a backfill run, I would want to yield
RunRequest
for the downstream pipelines with backfill tags.
p

prha

07/29/2021, 3:28 AM
you could do something like this (untested):
@asset_sensor(asset_key=MY_ASSET_KEY, pipeline_name=MY_PIPELINE_NAME)
def my_sensor(context, asset_event):
    run_id = asset_event.run_id
    run = context.instance.get_run_by_id(run_id)
    is_backfill_event = BACKFILL_ID_TAG in run.tags
    if is_backfill_event:
        return
    else:
        yield RunRequest(run_key=MY_RUN_KEY, run_config=MY_RUN_CONFIG)
the run request can have whatever tags you want on it
BTW, not sure if you’re using the newest example for asset sensors, or one of the older ones
but the idea is basically the same
a

Arun Kumar

07/29/2021, 3:35 AM
Thanks a lot for sharing the code. I am trying to do the exact same thing, but the only difference is when it is a backfill event I am still yielding a run request on the pipeline with a backfill tag. Something like this
@asset_sensor(asset_key=MY_ASSET_KEY, pipeline_name=MY_DOWNSTREAM_PIPELINE_NAME)
def my_sensor(context, asset_event):
    partition_set = ....
    partition = ....
    run_id = asset_event.run_id
    run = context.instance.get_run_by_id(run_id)
    is_backfill_event = BACKFILL_ID_TAG in run.tags
    materialization = asset_event.dagster_event.step_materialization_data.materialization
    partition_date = materialization.partition

    if is_backfill_event:
         yield RunRequest(run_key=MY_RUN_KEY, run_config=partition_set.run_config_for_partition(partition), 
tags = {BACKFILL_ID_TAG: true, **partition_set.tags_for_partition(partition)})

    else:
        yield RunRequest(run_key=MY_RUN_KEY, run_config=partition_set.run_config_for_partition(partition), 
tags = {**partition_set.tags_for_partition(partition)})
p

prha

07/29/2021, 3:37 AM
yep, I think that should work….
is there a reason you want to use the
BACKFILL_ID_TAG
? Trying to think of any custom behavior we use for it and I can’t really think of anything.
a

Arun Kumar

07/29/2021, 3:43 AM
That was my concern too initially. There are two reasons why I am using the same
BACKFILL_ID_TAG
1. All my solids are already checking for this tag and change their behavior if it is backfill run. If I add a new custom tag, I need to change the code to look for this tag too 2. I was hoping if I use the same
BACKFILL_ID_TAG
as the Dagster, may be Dagit can identify these runs as Backfill runs and I can view them in the Backfill view. Not sure if this is true?
I can also use a custom tag if needed and change the solids code accordingly, if you feel this might cause some issues in the future. Thanks a lot for all the help 🙂
p

prha

07/29/2021, 3:48 AM
I’m not sure, but I think that using the same backfill id tag value will throw off the counts in the backfill progress (we use the created runs vs the requested runs to track progress)
e.g. created a backfill requesting 10 runs, it kicks of 10 runs and then 10 downstreams runs are created, you might get 200% progress
a

Arun Kumar

07/29/2021, 3:54 AM
Makes sense. Cool, I will use a custom tag here. Thanks
p

prha

07/29/2021, 3:56 AM
you could also use the same
BACKFILL_ID_TAG
key and a different (made up) value, and then the counts wouldn’t get messed up
e.g.
is_backfill_event = BACKFILL_ID_TAG in run.tags
backfill_id = run.tags.get(BACKFILL_ID_TAG)
new_backfill_id = `{backfill_id}_downstream`
new_tags = {BACKFILL_ID_TAG: new_backfill_id, **partition_set.tags_for_partition(partition)}
a

Arun Kumar

07/29/2021, 4:00 AM
Sure, let me try that and see if it works 👍