Alec Koumjian
03/27/2023, 7:59 PMbuild_asset_reconciliation_sensor
and get this error:
dagster._check.CheckError: Failure condition:
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
yield
File "/usr/local/lib/python3.10/dist-packages/dagster/_grpc/impl.py", line 328, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/sensor_definition.py", line 428, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/sensor_definition.py", line 593, in _wrapped_fn
result = fn(context)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 954, in _sensor
run_requests, updated_cursor = reconcile(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 769, in reconcile
) = determine_asset_partitions_to_reconcile(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 393, in determine_asset_partitions_to_reconcile
stale_candidates, latest_storage_id = find_parent_materialized_asset_partitions(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 244, in find_parent_materialized_asset_partitions
if child.asset_key in target_asset_keys and not instance_queryer.is_asset_in_run(
File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/caching_instance_queryer.py", line 148, in is_asset_in_run
check.failed("")
File "/usr/local/lib/python3.10/dist-packages/dagster/_check/__init__.py", line 1699, in failed
raise CheckError(f"Failure condition: {desc}")
The three assets directly referenced with it are time partitioned (daily, monthly, monthly). There are downstream assets not directly referenced. Any idea on the issue?owen
03/27/2023, 8:05 PMAlec Koumjian
03/27/2023, 8:07 PM1.1.21
. I did delete some runs, but I can't remember now. I had a large set of assets in here before and I'm been slimming it down to try to reduce the complexity of what's being reconciled.
I'll deploy recent version to my test instance and see what happens.1.2.3
) appears to produce an identical result. Worth noting, the list of Jobs seems incorrect, referencing all sorts of downstream and upstream assets. Is it possible that the definition is not being updated after changing the list of assets defined in the sensor job?@asset(
partitions_def=DailyPartitionsDefinition(start_date="2018-06-01"),
code_version="0.0.2",
retry_policy=RetryPolicy(
max_retries=6,
delay=10.0, # seconds
backoff=Backoff.EXPONENTIAL, # 10, 20, 40, 80, 160s, 320s of delay
jitter=Jitter.PLUS_MINUS,
),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=5),
)
def extract_method()...
The second has this definition:
@asset(
partitions_def=MonthlyPartitionsDefinition(start_date="2018-06-01"),
code_version="1",
ins={"extract_method": AssetIn(metadata={"allow_missing_partitions": True})},
)
def transform_method(context, extract_method) -> Output[DataFrame[ADAMETLSourceSchema]]:
If I only include the extract asset, it's content. Adding the transform asset, I get the above error. I tried removing the allow_missing_partitions
and it still didn't resolve.
Is it possible it is failing simply because the transform step is mapping monthly partitions onto daily extract partitions?owen
03/28/2023, 4:42 PM1.2.3
? I'm haven't been able to replicate thisAlec Koumjian
03/28/2023, 8:04 PM