https://dagster.io/ logo
#ask-community
Title
# ask-community
j

Jakub Hettler

02/25/2023, 8:03 AM
Hi everyone, is it possible to use reconcilication sensors accross code locations? I have tried the following setup, reconciliation sensor defined in both code location, but if I clear asset A, all assets in definition.py (b,c,d) gets materialized, but in definition2.py are switched into stale state, but never get materialized. If I clear ranger2 asset, ranger1 is materialized as expected. Should reconciliation sensors work accross code locations? Thanks!
the code definition2.py look like below
Copy code
from dagster import asset, AssetSelection, define_asset_job, AssetKey, SourceAsset, build_asset_reconciliation_sensor

AssetSelection.downstream

a = SourceAsset(key=AssetKey("a"))

@asset
def ranger2(a):
    print("I am a ranger 2")

@asset
def ranger1(ranger2):
    print("I am a ranger 1")

# add a reconciliation sensor
update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)


update_job = define_asset_job(name="update_jobX")
But if I check the logs of update sensor, there is an issue
Copy code
KeyError: AssetKey(['a'])
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
    yield
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 328, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 428, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 593, in _wrapped_fn
    result = fn(context)
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 954, in _sensor
    run_requests, updated_cursor = reconcile(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 769, in reconcile
    ) = determine_asset_partitions_to_reconcile(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 446, in determine_asset_partitions_to_reconcile
    to_reconcile = asset_graph.bfs_filter_asset_partitions(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 381, in bfs_filter_asset_partitions
    if condition_fn(candidates_unit, result):
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 439, in should_reconcile
    return all(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 440, in <genexpr>
    parents_will_be_reconciled(candidate, to_reconcile) for candidate in candidates_unit
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 406, in parents_will_be_reconciled
    return all(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 416, in <genexpr>
    or (instance_queryer.is_reconciled(asset_partition=parent, asset_graph=asset_graph))
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 72, in helper
    result = method(self, *args, **kwargs)
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_utils/caching_instance_queryer.py", line 354, in is_reconciled
    for parent in asset_graph.get_parents_partitions(
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 235, in get_parents_partitions
    for parent_asset_key in self.get_parents(asset_key):
  File "/Users/hettlerj/GIT/bi-monorepo/.venv/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 154, in get_parents
    return self._asset_dep_graph["upstream"][asset_key]
It looks like the reconciliation sensor can’t use assets created with SourceAsset class
2 Views