Alexis Eutrope
08/10/2023, 3:58 PMdagster._check.CheckError: Invariant failed. Description: The set of input names keys specified in the keys_by_input_name argument must equal the set of asset keys inputted by 'merged_df'.
keys_by_input_name keys: {'source_asset__1', 'source_asset__2', 'source_asset__3, 'source_asset__4', 'source_asset__5', 'source_asset__6'}
expected keys: []
Am i doing something wrong ? Is there any way I can pass a variable number/dynamically created sourceAssets to a graph_asset (Without having to explicitly write all assets in args)
Thank youyuhan
08/10/2023, 8:44 PMAlexis Eutrope
08/10/2023, 9:35 PMdef generate_assets(source_names=["a1", "a2", "a3"]):
source_assets = []
for source in source_names:
source_assets.append(
SourceAsset(
key=AssetKey(f"source_{source}"),
io_manager_key="legacy_s3_pickle_pandas",
group_name="test",
metadata={"source_name": source},
)
)
@op
def gather(assets_names: List[str]):
return ",".join(assets_names)
@graph_asset(
group_name="test",
ins={
f"source_asset__{source_asset.metadata.get('source_name').text}": AssetIn(
source_asset.key
)
for source_asset in source_assets
},
name=f"merged_test",
)
def merge(**kwargs):
arg_assets = {k: v for k, v in kwargs.items() if k.startswith("source__asset__")}
return gather(list(arg_assets.keys()))
return [merge, *source_assets]
Alexis Eutrope
08/10/2023, 9:37 PM