Mark Kudryk
04/08/2021, 10:38 PMschrockn
04/08/2021, 11:06 PMMark Kudryk
04/08/2021, 11:09 PMprha
04/09/2021, 1:39 AM@sensor(pipeline="my_cool_pipeline")
def backfill_alert_sensor(context):
from dagster.core.execution.backfill import BulkActionStatus
from dagster.core.storage.pipeline_run import PipelineRunsFilter
# not ideal, fetches the full set of known backfills each time, maybe you can do some
# filtering by time, o.w. we'd have to provide some way to provide an `after_cursor`
# parameter or something
backfills = context.instance.get_backfills(status=BulkActionStatus.COMPLETED)
for backfill in backfills:
backfill_id = backfill.backfill_id
backfill_run_filter = PipelineRunsFilter.for_backfill(backfill_id)
backfill_runs = context.instance.get_runs(filters=backfill_run_filter)
run_ids = [run.run_id for run in backfill_runs]
yield RunRequest(
run_key=backfill_id,
run_config=_my_cool_pipeline_run_config_based_on_run_ids(run_ids)
)
Mark Kudryk
04/09/2021, 4:43 PMstatus=BulkActionStatus.COMPLETED
the get_backfills()
method is returning nothing after backfills have completed. When I look in the bulk_actions
table where get_backfills()
retrieves its data from, that table is empty; perhaps I’ve missed a configuration here (I’m using the default configuration of dagster.yaml
as suggested by the tutorial. I’m using Dagster 0.10.7.
Sensors are essentially polling for changes in what I’ve tried to do and you’ve suggested above, and then needs to filter for the desired backfill/pipeline. I tried to set a tag a PipelineRun to mark it as having been dealt with so I know to ignore that instance in the future when the sensor runs again; but doing so doesn’t persist the tag – subsequent calls does not include the added tag. It would be nice if a sensor worked as an event listener, such that when a backfill is completed, it would emit an event indicating this, freeing the listener from having to retrieve backfills/runs and filtering for the latest instance (I couldn’t find where one can set a filter for time).prha
04/09/2021, 4:46 PMbackfill
entry in your dagster.yaml
?0.10.7
we enabled daemon-based persisted backfill entries, but it had to be configured in dagster.yaml
: https://github.com/dagster-io/dagster/releases/tag/0.10.7Mark Kudryk
04/09/2021, 4:59 PMprha
04/09/2021, 5:06 PM0.11.0
, we made the daemon-backfill default, so you would remove that config entry in your dagster.yaml
Mark Kudryk
04/09/2021, 5:12 PMruns
table. The created_timestamp
column values are saved in UTC, but the update_timestamp
column values are saved in local time. I’m in Mountain, and looking at the runs the record was updated before it was created.prha
04/09/2021, 5:26 PMMark Kudryk
04/09/2021, 5:26 PM