Is there a way to migrate Asset (or partition_set)...
# dagster-feedback
s
Is there a way to migrate Asset (or partition_set) state / events from one address to another? I've run into this in two situations: 1. Change AssetKey directly from
this_asset
to
that_asset
2. Change repository name, but not the AssetKey, from
this_repo
to
that_repo
. In Terraform, you can run a command like
terraform state mv ADDRESS_A ADDRESS_B
, and I think something similar here would be ideal. Otherwise, renaming an asset requires a backfill of the partition sets AFAICT.
🤖 1
a
I asked about this a while ago - there wasn’t anything available at the time. The suggestion was to write a little job that created asset materializations for asset A for any materializations that already existed for asset B. It worked just fine. It just links the materialization to the fake backfill instead of the original run, but saves a lot of time
s
any chance you have that script 😉
a
Yup! I’ll put it here when I have a chance today
❤️ 1
🌈 2
Ok, looking at this again, my use case might have been slightly different. I was transitioning a job that didn’t use assets to one using assets, and wanted to backfill the asset history. Can’t think of any reason why this wouldn’t also work for your use case though. You just may need to make some tweaks. Here’s the op. I have a job that I can manually invoke that just runs this op.
Copy code
from dagster import (
    AssetKey,
    AssetMaterialization,
    DagsterEventType,
    DagsterRunStatus,
    EventRecordsFilter,
    Field,
    OpExecutionContext,
    RunsFilter,
    op,
)
Copy code
@op(
    config_schema={
        "asset_key": Field(
            config=str,
            description="The name of the asset that the job materializes",
        ),
        "job_name": Field(
            config=str,
            description="Name of the job (in this repo) to perform a backfill for",
        ),
        "repository_name": Field(
            config=str,
            description="Name of the dagster repository",
        ),
    },
)
def partitioned_asset_backfill_op(context: OpExecutionContext):
    asset_key = AssetKey.from_user_string(context.op_config["asset_key"])
Copy code
materializations = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=asset_key,
        ),
    )
Copy code
job_partition_history = context.instance.get_run_partition_data(
        runs_filter=RunsFilter(
            job_name=context.op_config["job_name"],
            tags={".dagster/repository": context.op_config["repository_name"]},
            statuses=[DagsterRunStatus.SUCCESS],
        )
    )
Copy code
asset_materialized_partitions = [
        materialization.event_log_entry.dagster_event.partition for materialization in materializations
    ]
Copy code
job_success_partitions = [run.partition for run in job_partition_history]
    missing_partitions = set(job_success_partitions) - set(asset_materialized_partitions)
    for partition in missing_partitions:
        context.log_event(
            AssetMaterialization(
                asset_key=asset_key,
                partition=partition,
                description="Asset previously materialized as a job/op",
            )
        )
Well that pasted a bit weird, but hopefully that helps
🙌 1
s
wow, this is 1000 , exactly what I was looking for
s
btw, here's an issue where we're tracking related functionality: https://github.com/dagster-io/dagster/issues/9512