Using `build_asset_reconciliation_sensor` and get ...
# ask-community
a
Using
build_asset_reconciliation_sensor
and get this error:
Copy code
dagster._check.CheckError: Failure condition: 
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.10/dist-packages/dagster/_grpc/impl.py", line 328, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/sensor_definition.py", line 428, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/sensor_definition.py", line 593, in _wrapped_fn
    result = fn(context)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 954, in _sensor
    run_requests, updated_cursor = reconcile(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 769, in reconcile
    ) = determine_asset_partitions_to_reconcile(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 393, in determine_asset_partitions_to_reconcile
    stale_candidates, latest_storage_id = find_parent_materialized_asset_partitions(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 244, in find_parent_materialized_asset_partitions
    if child.asset_key in target_asset_keys and not instance_queryer.is_asset_in_run(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/caching_instance_queryer.py", line 148, in is_asset_in_run
    check.failed("")
  File "/usr/local/lib/python3.10/dist-packages/dagster/_check/__init__.py", line 1699, in failed
    raise CheckError(f"Failure condition: {desc}")
The three assets directly referenced with it are time partitioned (daily, monthly, monthly). There are downstream assets not directly referenced. Any idea on the issue?
o
hi @Alec Koumjian -- what version of dagster are you on currently? This stack trace seems to indicate an earlier version -- I'm not confident if an update would resolve the issue but I wouldn't be surprised. Just for some more info, did you happen to delete any runs recently before this error showed up?
a
Yeah, this is on
1.1.21
. I did delete some runs, but I can't remember now. I had a large set of assets in here before and I'm been slimming it down to try to reduce the complexity of what's being reconciled. I'll deploy recent version to my test instance and see what happens.
Most recent version (
1.2.3
) appears to produce an identical result. Worth noting, the list of Jobs seems incorrect, referencing all sorts of downstream and upstream assets. Is it possible that the definition is not being updated after changing the list of assets defined in the sensor job?
I am able to produce the failure with specifying just two assets. The first has the following asset definition:
Copy code
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2018-06-01"),
    code_version="0.0.2",
    retry_policy=RetryPolicy(
        max_retries=6,
        delay=10.0,  # seconds
        backoff=Backoff.EXPONENTIAL,  # 10, 20, 40, 80, 160s, 320s of delay
        jitter=Jitter.PLUS_MINUS,
    ),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=5),
)
def extract_method()...
The second has this definition:
Copy code
@asset(
    partitions_def=MonthlyPartitionsDefinition(start_date="2018-06-01"),
    code_version="1",
    ins={"extract_method": AssetIn(metadata={"allow_missing_partitions": True})},
)
def transform_method(context, extract_method) -> Output[DataFrame[ADAMETLSourceSchema]]:
If I only include the extract asset, it's content. Adding the transform asset, I get the above error. I tried removing the
allow_missing_partitions
and it still didn't resolve. Is it possible it is failing simply because the transform step is mapping monthly partitions onto daily extract partitions?
The daily -> monthly mapping also does not appear to be the source of the issue.
Okay, it looks like the error occured simply because there was more than a single missing partition in the extract asset. I can maybe understand not queueing up all missing partitions automatically, but I didn't get any kind of helpful error message.
o
hi @Alec Koumjian ! do you mind sharing the stack trace that you're seeing in
1.2.3
? I'm haven't been able to replicate this
a
I attempt to go back to that implementation and now can no longer replicate it. I am having some issues with sensors in general, but I or my colleague may bring that up separately.
blob salute 1