Daniel Galea
04/03/2023, 10:13 AM"All assets must be partitioned and share the same partitions definition"
. However, I'm not sure why I get this error as all my assets have the same type of partition definition and they are all partitioned. I am using this partition type DailyPartitionsDefinition
, the difference between some of the assets is the hour and minute offset, and the start date. Do they need to have the exact same hour and minute offset, and start date too? I am creating a new DailyPartitionsDefinition
object per asset, do all assets need to share the same definition type object too?
This is how I generate a multi-asset sensor:
@multi_asset_sensor(
monitored_assets=asset_keys,
job=downstream_job,
name=sensor_name,
default_status=DefaultSensorStatus.RUNNING,
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(
context: MultiAssetSensorEvaluationContext,
):
run_requests = []
for (
partition,
materializations_by_asset,
) in context.latest_materialization_records_by_partition_and_asset().items():
if set(materializations_by_asset.keys()) == set(context.asset_keys):
run_requests.append(downstream_job.run_request_for_partition(partition))
for asset_key, materialization in materializations_by_asset.items():
context.advance_cursor({asset_key: materialization})
return run_requests
I am getting the error when running this line of code:
context.latest_materialization_records_by_partition_and_asset().items()
sandy
04/03/2023, 4:11 PMDaniel Galea
04/11/2023, 12:24 PM