https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Alexis Eutrope

08/10/2023, 3:58 PM
Hi, I have an asset that is dynamically defined at runtime (and also it's sourceAssets dependencies). I set the asset decorator ins with some dict comprehension and **kwargs in the function definition. That works well However, if I try to change it to a graph_asset instead of asset, now dagster fails to load definition and explains me that the function expect [] (empty array) and I gave it multiple inputs
Copy code
dagster._check.CheckError: Invariant failed. Description: The set of input names keys specified in the keys_by_input_name argument must equal the set of asset keys inputted by 'merged_df'. 
keys_by_input_name keys: {'source_asset__1', 'source_asset__2', 'source_asset__3, 'source_asset__4', 'source_asset__5', 'source_asset__6'} 
expected keys: []
Am i doing something wrong ? Is there any way I can pass a variable number/dynamically created sourceAssets to a graph_asset (Without having to explicitly write all assets in args) Thank you
1
y

yuhan

08/10/2023, 8:44 PM
do you mind sharing some code snippets of the assets you’re defining?
a

Alexis Eutrope

08/10/2023, 9:35 PM
Hi, @yuhan Sure, here is what I'm tring to do:
Copy code
def generate_assets(source_names=["a1", "a2", "a3"]):
    source_assets = []
    for source in source_names:
        source_assets.append(
            SourceAsset(
                key=AssetKey(f"source_{source}"),
                io_manager_key="legacy_s3_pickle_pandas",
                group_name="test",
                metadata={"source_name": source},
            )
        )

    @op
    def gather(assets_names: List[str]):
        return ",".join(assets_names)

    @graph_asset(
        group_name="test",
        ins={
            f"source_asset__{source_asset.metadata.get('source_name').text}": AssetIn(
                source_asset.key
            )
            for source_asset in source_assets
        },
        name=f"merged_test",
    )
    def merge(**kwargs):
        arg_assets = {k: v for k, v in kwargs.items() if k.startswith("source__asset__")}
        return gather(list(arg_assets.keys()))

    return [merge, *source_assets]
so the graph_asset merge should be generated at runtime with a1, a2, a3 as source, assets, and it should contain one op that join asset names