Flavien
08/19/2022, 12:45 PMload_assets_from_dbt_manifest
?def add_upstream_dependency(downstream: AssetsDefinition, upstream: AssetsDefinition):
downstream.op.input_dict["extra"] = In(Nothing)
downstream.keys_by_input_name["extra"] = upstream.asset_key
downstream.asset_deps[downstream.asset_key] = downstream.asset_deps[downstream.asset_key] | {
upstream.asset_key
}
@asset
def a(context: OpExecutionContext):
<http://context.log.info|context.log.info>("Producing asset A")
@asset
def b(context: OpExecutionContext):
<http://context.log.info|context.log.info>("Producting asset B")
def add_inputs(op: OpDefinition, ins: Mapping[str, In]) -> OpDefinition:
return OpDefinition(
name=op.name,
compute_fn=op.compute_fn,
config_schema=op.config_schema,
description=op.description,
ins=op.ins | ins,
outs=op.outs,
required_resource_keys=op.required_resource_keys,
retry_policy=op.retry_policy,
tags=op.tags,
version=op.version,
)
def add_upstream_dependency(
asset: AssetsDefinition, dependency: AssetKey
) -> AssetsDefinition:
input_name = "_".join(dependency.path)
op = add_inputs(asset.op, {input_name: In(Nothing)})
keys_by_input_name = asset.keys_by_input_name | {input_name: dependency}
asset_deps = copy.deepcopy(asset.asset_deps)
asset_deps[asset.asset_key] |= {dependency}
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=asset.keys_by_output_name,
asset_deps=asset_deps,
can_subset=asset.can_subset,
group_names_by_key=asset.group_names_by_key,
metadata_by_key=asset.metadata_by_key,
node_def=op,
partition_mappings=asset._partition_mappings,
partitions_def=asset.partitions_def,
resource_defs=asset.resource_defs,
selected_asset_keys=asset._selected_asset_keys,
)
new_b = add_upstream_dependency(b, a.asset_key)
@repository
def repo():
return [a, new_b]
chris
08/19/2022, 5:37 PM