https://dagster.io/ logo
m

Mark Kudryk

04/08/2021, 10:38 PM
Is it possible to configure a sensor to detect when a backfill is complete? I can find out when the individual partitions (PipelineRun’s) are complete, but I’d like to know when they are all complete (i.e. backfill is complete, and here is the list of executed partitions associated with the backfill); while a PipelineRun does have a backfill tag with the backfill hash, my sensor doesn’t know how many PipelineRun’s to expect for a backfill, hence why it would be nice for a “BackfillRun” to provide that list of PipelineRun’s.
s

schrockn

04/08/2021, 11:06 PM
This is an interesting use case! I’ll let @prha speak the feasibility of this.
m

Mark Kudryk

04/08/2021, 11:09 PM
We’re using partitions in a bit of an unconventional way. All partitions process the same data set, but each partition will process the data with its own set of rules, each partition outputs a data set that conforms to these rules. In a subsequent pipeline, which I’d like to have triggered by a “Backfill” sensor, I’m bringing all of these data sets together for the final output.
p

prha

04/09/2021, 1:39 AM
Yeah, this is super interesting. This is very feasible, but would rely on a number of pseudo-private APIs (probably will be stable but are not guaranteed to be stable in between major releases)
Here’s kind of what it would look like (haven’t tested it):
Copy code
@sensor(pipeline="my_cool_pipeline")
def backfill_alert_sensor(context):
    from dagster.core.execution.backfill import BulkActionStatus
    from dagster.core.storage.pipeline_run import PipelineRunsFilter

    # not ideal, fetches the full set of known backfills each time, maybe you can do some
    # filtering by time, o.w. we'd have to provide some way to provide an `after_cursor`
    # parameter or something
    backfills = context.instance.get_backfills(status=BulkActionStatus.COMPLETED)

    for backfill in backfills:
        backfill_id = backfill.backfill_id
        backfill_run_filter = PipelineRunsFilter.for_backfill(backfill_id)
        backfill_runs = context.instance.get_runs(filters=backfill_run_filter)
        run_ids = [run.run_id for run in backfill_runs]
        yield RunRequest(
            run_key=backfill_id,
            run_config=_my_cool_pipeline_run_config_based_on_run_ids(run_ids)
        )
m

Mark Kudryk

04/09/2021, 4:43 PM
Thanks @prha. I’ve tried this out and it’s not working. Even with removing
status=BulkActionStatus.COMPLETED
the
get_backfills()
method is returning nothing after backfills have completed. When I look in the
bulk_actions
table where
get_backfills()
retrieves its data from, that table is empty; perhaps I’ve missed a configuration here (I’m using the default configuration of
dagster.yaml
as suggested by the tutorial. I’m using Dagster 0.10.7. Sensors are essentially polling for changes in what I’ve tried to do and you’ve suggested above, and then needs to filter for the desired backfill/pipeline. I tried to set a tag a PipelineRun to mark it as having been dealt with so I know to ignore that instance in the future when the sensor runs again; but doing so doesn’t persist the tag – subsequent calls does not include the added tag. It would be nice if a sensor worked as an event listener, such that when a backfill is completed, it would emit an event indicating this, freeing the listener from having to retrieve backfills/runs and filtering for the latest instance (I couldn’t find where one can set a filter for time).
p

prha

04/09/2021, 4:46 PM
Ah okay, can you check to see if you have a
backfill
entry in your
dagster.yaml
?
Starting in
0.10.7
we enabled daemon-based persisted backfill entries, but it had to be configured in
dagster.yaml
: https://github.com/dagster-io/dagster/releases/tag/0.10.7
m

Mark Kudryk

04/09/2021, 4:59 PM
Thanks for that @prha. I did not have that set in the config file. I’m now getting data back. 🙂
👍🏻 1
p

prha

04/09/2021, 5:06 PM
Great! Should note that starting with
0.11.0
, we made the daemon-backfill default, so you would remove that config entry in your
dagster.yaml
m

Mark Kudryk

04/09/2021, 5:12 PM
I think there is a bug in the code that updates the
runs
table. The
created_timestamp
column values are saved in UTC, but the
update_timestamp
column values are saved in local time. I’m in Mountain, and looking at the runs the record was updated before it was created.
p

prha

04/09/2021, 5:26 PM
Yes, that looks like a bug… Do you mind filing a github issue? In practice though, I don’t think there’s anything that reads that value…
m

Mark Kudryk

04/09/2021, 5:26 PM
I will.
4 Views