Stephen Bailey
01/09/2023, 2:16 PMthis_asset
to that_asset
2. Change repository name, but not the AssetKey, from this_repo
to that_repo
.
In Terraform, you can run a command like terraform state mv ADDRESS_A ADDRESS_B
, and I think something similar here would be ideal. Otherwise, renaming an asset requires a backfill of the partition sets AFAICT.Adam Bloom
01/09/2023, 2:40 PMStephen Bailey
01/09/2023, 3:42 PMAdam Bloom
01/09/2023, 3:48 PMfrom dagster import (
AssetKey,
AssetMaterialization,
DagsterEventType,
DagsterRunStatus,
EventRecordsFilter,
Field,
OpExecutionContext,
RunsFilter,
op,
)
@op(
config_schema={
"asset_key": Field(
config=str,
description="The name of the asset that the job materializes",
),
"job_name": Field(
config=str,
description="Name of the job (in this repo) to perform a backfill for",
),
"repository_name": Field(
config=str,
description="Name of the dagster repository",
),
},
)
def partitioned_asset_backfill_op(context: OpExecutionContext):
asset_key = AssetKey.from_user_string(context.op_config["asset_key"])
materializations = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
),
)
job_partition_history = context.instance.get_run_partition_data(
runs_filter=RunsFilter(
job_name=context.op_config["job_name"],
tags={".dagster/repository": context.op_config["repository_name"]},
statuses=[DagsterRunStatus.SUCCESS],
)
)
asset_materialized_partitions = [
materialization.event_log_entry.dagster_event.partition for materialization in materializations
]
job_success_partitions = [run.partition for run in job_partition_history]
missing_partitions = set(job_success_partitions) - set(asset_materialized_partitions)
for partition in missing_partitions:
context.log_event(
AssetMaterialization(
asset_key=asset_key,
partition=partition,
description="Asset previously materialized as a job/op",
)
)
Stephen Bailey
01/09/2023, 4:17 PMsandy
01/09/2023, 6:27 PM