Yevhen Samoilenko
08/25/2022, 4:42 PMdef load_input(self, context: InputContext) -> str:
materialized = context.upstream_output.materialized
if materialized:
return 'materialized'
return ''
Saul Burgos
08/25/2022, 9:02 PMSaul Burgos
08/25/2022, 9:02 PMfrom dagster.core.storage.pipeline_run import RunsFilter
@sensor(job=do_the_second_thing)
def thing_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name="do_the_first_thing",
statuses=[PipelineRunStatus.SUCCESS],
updated_after=..., # can also filter by timestamp to do more efficient fetching
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(
run_key=run_record.run_id, # avoid double firing for the same run
)
chris
08/25/2022, 9:08 PMload_input
method. Something like:
def load_input(self, context: InputContext):
for record in context.instance.get_asset_records(my_asset_key):
if record.asset_entry.last_run_id == context.run_id:
# this means that the asset was materialized in the current run
chris
08/25/2022, 9:09 PMYevhen Samoilenko
08/26/2022, 7:59 AM