Eric Larson
12/16/2022, 9:22 PMbuild_asset_reconciliation_sensor
with some DBT assets but get an error every time. Basic definition:
survey_asset_sensor = build_asset_reconciliation_sensor(
asset_selection=AssetSelection.groups('raw_data'),
name="survey_asset_reconciliation",
minimum_interval_seconds=60*2,
description='Keep survey assets up to date.',
default_status=DefaultSensorStatus.STOPPED,
)
the asset key it is complaining about in the error is a DBT source table AssetKey(['pollfish', 'pollfish_qa'])
here is error message
dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor survey_asset_reconciliation
File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
KeyError: AssetKey(['pollfish', 'pollfish_qa'])
File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 199, in user_code_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 425, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 589, in _wrapped_fn
result = fn(context)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 890, in sensor_fn
run_requests, updated_cursor = reconcile(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 726, in reconcile
) = determine_asset_partitions_to_reconcile(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 383, in determine_asset_partitions_to_reconcile
all(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 396, in <genexpr>
instance_queryer.is_reconciled(
File "/usr/local/lib/python3.8/site-packages/dagster/_utils/cached_method.py", line 59, in helper
result = method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster/_utils/caching_instance_queryer.py", line 247, in is_reconciled
for parent in asset_graph.get_parents_partitions(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_graph.py", line 221, in get_parents_partitions
for parent_asset_key in self.get_parents(asset_key):
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_graph.py", line 151, in get_parents
return self.asset_dep_graph["upstream"][asset_key]
sandy
12/19/2022, 5:57 PMEric Larson
12/19/2022, 6:03 PMops
and a asset materliaztion event
is logged inside the op. Do I need to convert it to a SDA to get it to work?sandy
12/19/2022, 11:44 PMMikeVL
01/10/2023, 11:28 PMsandy
01/11/2023, 4:20 PMMikeVL
01/12/2023, 12:11 AM