Josh Lloyd
09/23/2021, 10:49 PM@sensor(pipeline_name="collective_pipeline", mode="stage_coll")
def collective_sensor(context):
""" Workaround for wildcard matching on asset_keys. From <https://github.com/dagster-io/dagster/issues/4582> """
after_cursor = int(context.cursor) if context.cursor else 0
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
after_cursor=after_cursor,
),
ascending=True,
)
for event_record in event_records:
...
if len(event_records) > 0:
context.update_cursor(str(event_records[0].storage_id))
else:
yield SkipReason(f"No Asset materialization event records found. Cursor: {after_cursor}")
The sensor will, every 30 seconds, reach the last line and produce a skip result saying No Asset materialization event records found. Cursor: 0
I can see all the asset materializations in the Asset tab just fine.
my dev environment uses the DockerRunLauncher
and LocalComputeLogManager
while my stage uses the EcsRunLauncher
and S3ComputeLogManager
respectively. Other than that, the two environments should be pretty much the same.… else None
while in stage it was ... else 0
. Gotta love miniscule differences making huge impacts … 😒igh: