Jakub Hettler
02/25/2023, 8:03 AMJakub Hettler
02/27/2023, 9:18 AMfrom dagster import asset, AssetSelection, define_asset_job, AssetKey, SourceAsset, build_asset_reconciliation_sensor
AssetSelection.downstream
a = SourceAsset(key=AssetKey("a"))
@asset
def ranger2(a):
print("I am a ranger 2")
@asset
def ranger1(ranger2):
print("I am a ranger 1")
# add a reconciliation sensor
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all()
)
update_job = define_asset_job(name="update_jobX")
But if I check the logs of update sensor, there is an issue
KeyError: AssetKey(['a'])
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
yield
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 328, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 428, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 593, in _wrapped_fn
result = fn(context)
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 954, in _sensor
run_requests, updated_cursor = reconcile(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 769, in reconcile
) = determine_asset_partitions_to_reconcile(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 446, in determine_asset_partitions_to_reconcile
to_reconcile = asset_graph.bfs_filter_asset_partitions(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 381, in bfs_filter_asset_partitions
if condition_fn(candidates_unit, result):
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 439, in should_reconcile
return all(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 440, in <genexpr>
parents_will_be_reconciled(candidate, to_reconcile) for candidate in candidates_unit
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 406, in parents_will_be_reconciled
return all(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 416, in <genexpr>
or (instance_queryer.is_reconciled(asset_partition=parent, asset_graph=asset_graph))
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 72, in helper
result = method(self, *args, **kwargs)
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_utils/caching_instance_queryer.py", line 354, in is_reconciled
for parent in asset_graph.get_parents_partitions(
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 235, in get_parents_partitions
for parent_asset_key in self.get_parents(asset_key):
File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 154, in get_parents
return self._asset_dep_graph["upstream"][asset_key]
It looks like the reconciliation sensor can’t use assets created with SourceAsset class