https://dagster.io/ logo
Title
s

Spencer Nelson

03/29/2023, 11:22 PM
I’d like to launch a job when any partitions for 3 different assets are materialized. These assets will sometimes be materialized in bulk in big backfill jobs (across several thousand partitions), and sometimes with scheduled nightly runs (which update just one partition, or perhaps 2 in rare cases). @multi_asset_sensor is obviously the tool for the job - so far so good. But my job is really expensive computationally. In a big backfill, I don’t want to kick off the job a few thousand times. Ideally, I’d wait for an entire backfill to complete, or trigger off of the scheduled runs, but I’m open to other approaches. What are my options for managing this?
One specific question: Can I make a sensor that triggers on backfill completions?
Another specific question: Can I tell whether an EventLogRecord is related to a backfill?
One thing I am considering is some kind of stateful cursor-based approach to only launch the job conditionally. The cursor would hold the sensor’s state, one of two possible states: • WAITING - no materialization of any assets, nothing to be done • LAUNCHING - an asset has been materialized, but lets wait 1 hour for others to materialize before launching a job The cursor would also hold the last sensor-launched run’s start timestamp. The sensor’s body would switch based on what the cursor’s state is. In the WAITING state, call
context.instance.get_latest_materialization_event(…)
for each of the three assets to check for new materializations. A “new” materialization is one since the last run timestamp. If there are any new ones, go to “LAUNCHING” state, recording the latest materialization event’s time. In “LAUNCHING” state, check to see if at least 1 hour has elapsed since the triggering event. If so, launch the job, and go to WAITING state; else, stay in LAUNCHING.
This feels pretty complicated! Is there a better way?
z

Zach

03/30/2023, 12:37 AM
I haven't really used this feature yet but it seems like maybe something that freshness-based scheduling might be able to handle?
s

Spencer Nelson

03/30/2023, 1:04 AM
Freshness policy sensors can’t yield runrequests unfortunately. So, that will only work if I use the asset reconciliation sensor, which I dont want to use for various reasons: • no real way to test it at this time • Doesnt work with sourceassets • Some bad experiences with empty error messages - it just seems too immature to use (https://dagster.slack.com/archives/C01U954MEER/p1679947164841339)