Hey all! I am wondering: Is there any chance to a...
# ask-community
h
Hey all! I am wondering: Is there any chance to attach metadata to specific partitions of a partitioned asset, when backfilling multiple partitions in single runs? I tried AssetObservations and manual AssetMaterialization, which works, but sadly the actual materialization (and its metadata) eventually is the most current metadata for all processed partitions of that run and hence the metadata to show up in the plots when observing the asset or asset details. And second question: How do I start a backfill run for a partition range programatically via python api? materialize only works with a single partition key as far as I can see. Thank you very much in advance!
c
Hi Heiner. You could add logic within your asset determines the metadata. Something like:
Copy code
@asset 
def my_asset(context):
    partition_key = context.partition_key
    metadata = my_partition_metadata(partition_key)
    return Output(..., metadata=metadata)
This would attach specific metadata to the partitioned materialization whenever you execute the asset (in a backfill or ad-hoc materialization)
In order to execute a run across a range of partitions, you can attach the necessary tags:
Copy code
my_repo.get_job("partitioned_asset_job").execute_in_process(
    instance=instance,
    tags={ASSET_PARTITION_RANGE_START_TAG: start_partition_key, ASSET_PARTITION_RANGE_END_TAG: end_partition_key},
)
h
Hi @claire! Thank you very much for your responds! Regarding the metadata issue: That is pretty much something, that I tried, but the result is not what I want. I tried something like:
Copy code
@asset
def my_asset(context):
    p_start = context.asset_partition_key_range.start
    p_end = context.asset_partition_key_range.end
    
    # create asset for partitions in partition range
    metadata_export = export_data_from_db_to_gcs(p_start, p_end)
    metadata_import = import_data_from_gcs_to_bq()
    
    # gather partition-wise metadata
    partition_info = read_partition_metadata_from_information_schema(p_start, p_end)
    for partition in partition_info:
        context.log_event(AssetObservation(
            f'{DBT_BQ_PROJECT_NAME}/{trg_table_name}',
            partition=partition.day, metadata={
                'bytes': partition.total_billable_bytes,
                'rows': partition.total_rows,
            }))
    
    # return None-Output since asset has been materialized in asset itself
    # metadata might be empty or not. - does not make a difference for described problem
    metadata = metadata_export.copy()
    metadata.update(metadata_import)
    return Output(None, metadata=metadata)
This solution yields the metadata attached to the Output object to be the youngest valid metadata for all partitions of that asset. The metadata attached to each partition via AssetObservations is older since its logged beforehand. And hence the metadata plots in dagit do not display the intended by partition metadata: I loaded the partitions from 2023-06-15 to 2023-06-30 in a single backfill with no metadata attached to the Output object and the partitions from 2023-07-01 to date with attaching rows = total rows importet in that single backfill run. In both cases the plots do not show the actual metadata of the partitions. In the first case no values are shown. In the latter all partitions show the total number of rows in the second run (2023-07-01 - 2023-07-19) I think the best solution would be to introduce a metadata_by_partition_key-like feature.
c
Ah I see--it seems like the issue is that when you backfill across a partition range, you can't assign specific metadata per partition. We have an issue for tracking: https://github.com/dagster-io/dagster/issues/12498
I think if you did execute single-run backfills where each run corresponds to one partition, I think you would be able to see per-partition metadata
h
YES! That issue desctribes what I need! And single partition backfills work too.