Hey guys :wave: Is it possible to add upstream de...
# ask-community
f
Hey guys 👋 Is it possible to add upstream dependencies to an asset that has already been created? For example, how can I add such a dependency to an asset created by
load_assets_from_dbt_manifest
?
I tried something like this but got refreshed by a frozendict 😄
Copy code
def add_upstream_dependency(downstream: AssetsDefinition, upstream: AssetsDefinition):
    downstream.op.input_dict["extra"] = In(Nothing)
    downstream.keys_by_input_name["extra"] = upstream.asset_key
    downstream.asset_deps[downstream.asset_key] = downstream.asset_deps[downstream.asset_key] | {
        upstream.asset_key
    }
Got it working with this, but, is there a better way ?
Copy code
@asset
def a(context: OpExecutionContext):
    <http://context.log.info|context.log.info>("Producing asset A")


@asset
def b(context: OpExecutionContext):
    <http://context.log.info|context.log.info>("Producting asset B")


def add_inputs(op: OpDefinition, ins: Mapping[str, In]) -> OpDefinition:
    return OpDefinition(
        name=op.name,
        compute_fn=op.compute_fn,
        config_schema=op.config_schema,
        description=op.description,
        ins=op.ins | ins,
        outs=op.outs,
        required_resource_keys=op.required_resource_keys,
        retry_policy=op.retry_policy,
        tags=op.tags,
        version=op.version,
    )


def add_upstream_dependency(
    asset: AssetsDefinition, dependency: AssetKey
) -> AssetsDefinition:
    input_name = "_".join(dependency.path)

    op = add_inputs(asset.op, {input_name: In(Nothing)})
    keys_by_input_name = asset.keys_by_input_name | {input_name: dependency}
    asset_deps = copy.deepcopy(asset.asset_deps)
    asset_deps[asset.asset_key] |= {dependency}

    return AssetsDefinition(
        keys_by_input_name=keys_by_input_name,
        keys_by_output_name=asset.keys_by_output_name,
        asset_deps=asset_deps,
        can_subset=asset.can_subset,
        group_names_by_key=asset.group_names_by_key,
        metadata_by_key=asset.metadata_by_key,
        node_def=op,
        partition_mappings=asset._partition_mappings,
        partitions_def=asset.partitions_def,
        resource_defs=asset.resource_defs,
        selected_asset_keys=asset._selected_asset_keys,
    )


new_b = add_upstream_dependency(b, a.asset_key)


@repository
def repo():
    return [a, new_b]
c
No better way to do this - in general I think we would tend to discourage this type of pattern because it leads to having multiple asset definition objects flying around for a given asset key, which increases the likelihood of some mistakes happening
👍 1