https://dagster.io/ logo
#dagster-support
Title
# dagster-support
e

Eric Larson

12/16/2022, 9:22 PM
Tried adding the new
build_asset_reconciliation_sensor
with some DBT assets but get an error every time. Basic definition:
Copy code
survey_asset_sensor = build_asset_reconciliation_sensor(
    asset_selection=AssetSelection.groups('raw_data'),
    name="survey_asset_reconciliation",
    minimum_interval_seconds=60*2,
    description='Keep survey assets up to date.',
    default_status=DefaultSensorStatus.STOPPED,
)
the asset key it is complaining about in the error is a DBT source table
AssetKey(['pollfish', 'pollfish_qa'])
here is error message
Copy code
dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor survey_asset_reconciliation

  File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
KeyError: AssetKey(['pollfish', 'pollfish_qa'])

  File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 199, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 425, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 589, in _wrapped_fn
    result = fn(context)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 890, in sensor_fn
    run_requests, updated_cursor = reconcile(
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 726, in reconcile
    ) = determine_asset_partitions_to_reconcile(
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 383, in determine_asset_partitions_to_reconcile
    all(
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 396, in <genexpr>
    instance_queryer.is_reconciled(
  File "/usr/local/lib/python3.8/site-packages/dagster/_utils/cached_method.py", line 59, in helper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/dagster/_utils/caching_instance_queryer.py", line 247, in is_reconciled
    for parent in asset_graph.get_parents_partitions(
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_graph.py", line 221, in get_parents_partitions
    for parent_asset_key in self.get_parents(asset_key):
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/asset_graph.py", line 151, in get_parents
    return self.asset_dep_graph["upstream"][asset_key]
message has been deleted
s

sandy

12/19/2022, 5:57 PM
Hi @Eric Larson - is pollfish_qa a dbt source? Does it correspond to an asset in a different Dagster repository?
e

Eric Larson

12/19/2022, 6:03 PM
It a dbt source table, which currently is generated by a dagster job that uses
ops
and a
asset materliaztion event
is logged inside the op. Do I need to convert it to a SDA to get it to work?
s

sandy

12/19/2022, 11:44 PM
Ah - this is something we need to fix, but I believe you can get it working in the mean time by creating a "dummy" asset to represent it. You don't need to run it, but it will make it appear in the asset graph that the reconciliation sensor understands so it knows how to handle
m

MikeVL

01/10/2023, 11:28 PM
Hi @sandy, I created "dummy" asset like you suggested and it runs fine with manual materialization. I'm trying to use the reconciliation sensor to run the dbt models hourly however, since the dummy asset (represents a BQ table that has data streaming into it) doesn't materialize the models won't run because it thinks there's no new dara. Would the only work around be adding a sensor to the dummy asset just to mimic that new data has been added or is this where airbyte would come into play?
s

sandy

01/11/2023, 4:20 PM
We actually just merged a change that should fix this dbt source reconciliation issue in tomorrow's release. If you set a freshness policy on your dbt asset, then it will be updated according to the cadence of the freshness policy. If you don't set a freshness policy, then yes, you'd need to somehow indicate that an upstream change has happened by recording a materialization.
m

MikeVL

01/12/2023, 12:11 AM
Great! Thanks for the heads up.
50 Views