https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Alec Ryan

05/01/2022, 12:16 AM
Hey all, when I run a backfill on a partitioned_job for more than 1 day, a downstream job that triggers on a sensor does not run. Any thoughts on how to run a sensor for each partition backfill run?
Here is a screen shot for context of my issue.
p

prha

05/02/2022, 12:26 AM
Hey Alec… are you using an asset sensor to trigger the second run? And if so, are you triggering the second run based on the partition on the asset materialization? You may be running into an issue with the current implementation of the asset sensor where it only returns the most recent materialization, instead of all materializations for a given interval
a

Alec Ryan

05/02/2022, 1:03 PM
Hey @Prateek Agarwal, I believe my sensor is configured correctly
Copy code
from dagster import AssetKey, asset_sensor, RunRequest, get_dagster_logger
from datetime import datetime

def create_snowflake_asset_sensor(key, snowflake_job):

    @asset_sensor(
        asset_key=AssetKey(key),
        job=snowflake_job
    )
    def snowflake_sensor(context, asset_event):
        asset_partition = asset_event.dagster_event.partition
        yield RunRequest(
            run_key=context.cursor,
            run_config={
                "resources": {
                    "snowflake": {
                        "config": {
                            'account': ACCOUNT,
                            'user':USER, 
                            'password':PASSWORD,
                            'database': DB,
                            'warehouse': 'NHL_ANALYTICS'
                            }
                        },
                    "run_parameters" : {
                        "config": {
                            "run_date" : asset_partition
                            }
                        }
                    }
                }
            )

    return snowflake_sensor
for each materialization of the asset key, a run request should be yielded
So if I backfill for 10 days, that should materialize an asset 10x and trigger the sensor 10x, right?
p

prha

05/02/2022, 3:41 PM
I think that’s what you want, yes, but that’s not how the default implementation of the asset_sensor works. Here’s a Github issue describing the problem: https://github.com/dagster-io/dagster/issues/5699 FWIW, I’m planning on tackling this issue this week, but you can also look at the current implementation of
asset_sensor
to create a custom sensor that gets the behavior you want.
❤️ 1
a

Alec Ryan

05/07/2022, 5:36 PM
Sorry for the delayed response but thank you.
@prha Any code examples of how to do this?
also, any idea if this will go out in an upcoming release? @prha
p

prha

05/09/2022, 6:50 PM
Copy code
from dagster import sensor, AssetKey, DagsterEventType, EventRecordsFilter, RunRequest

@sensor(job=my_job)
def my_asset_sensor(context):
    records_cursor = int(context.cursor) if context.cursor else None
    asset_records = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=AssetKey("table_a"),
            after_cursor=records_cursor,
        ),
        ascending=False,
    )

    if not asset_records:
        return

    for record in asset_records:
        yield RunRequest(
            run_key=str(record.record_id),
            run_config={},
        )

    last_asset_record = asset_records[-1]
    context.update_cursor(str(last_asset_record.record_id))
a

Alec Ryan

05/10/2022, 12:25 AM
ah I see
Thank you this is helpful!
p

prha

05/10/2022, 12:26 AM
🙏