William
08/10/2022, 2:58 PM┌───────────────┐ ┌──────────────────┐
│ │ │ │
│ │ │ │
│ derived ├───────────────────► folder1 of files │
┌─────────────────────────────►│ file1 │ │ │
│ │ │ │ │
│ └───────────────┘ └──────────────────┘
│
│
┌─────────────┴─────┐
│ │
│ │ Fan out
│ │ ┌───────────────┐ ┌────────────────────┐
│ OneFile │ │ │ │ │
│ │ │ │ │ │
│ ├───────────────────────►│ derived ├───────────────────► folder2 of files │
│ │ │ file2 │ │ │
└──────────┬────────┘ │ │ │ │
│ └───────────────┘ └────────────────────┘
│
│
│
│
│ ┌────────────────┐ ┌──────────────────────────┐
│ │ │ │ │
│ │ │ │ │
│ │ derived │ │ │
└────────────────────────────────►│ file3 ├───────────────► folder3 of files │
│ │ │ │
│ │ │ │
└────────────────┘ └──────────────────────────┘
The OneFile
, derived file1/2/3
and folder1/2/3 of files
are all parametrized, meaning there could be different run configs provided to OneFile
to generate OneFile
asset with different AssetKeys, then each OneFile
could fan out to hundreds or thousands of dervied files&folders
as down-stream assets.claire
08/10/2022, 5:13 PMWilliam
08/11/2022, 1:05 AMpersist_to_storage(df) ?
William
08/11/2022, 2:07 PMyield asset materializations
in @op or @job. However is it possible to get lineage of those dynamically yield `asset`s?claire
08/11/2022, 8:58 PMAssetsDefinitions
depending on the upstream assets dynamically. For example, for the structure you mentioned above, you could do something like:
def create_asset(asset_key, upstream_asset):
if upstream_asset:
@asset(name=asset_key, ins={"upstream": AssetIn(key=AssetKey(upstream_asset))})
def _generated_asset(context, upstream):
return 1
else:
@asset(
name=asset_key,
)
def _generated_asset(context):
return 1
return _generated_asset
struct = {"asset_1": {"asset_2": {"asset_3": {}}, "asset_4": {}}}
def generate_asset_graph(struct, upstream=None):
assets = []
for key, value in struct.items():
assets.append(create_asset(key, upstream))
if isinstance(value, dict):
assets.extend(generate_asset_graph(value, key))
return assets
@repository
def my_repo():
return [*generate_asset_graph(struct)]