https://dagster.io/ logo
#ask-community
Title
# ask-community
f

fahad

09/01/2022, 4:56 PM
Hey all, Might just be a python question, but I’m trying to create a wrapper around SDAs that let’s me abstract away some boilerplate. It looks something like this:
Copy code
def create_spark_processed_asset(
    asset_name: str,
    input_assets: list[AssetIn],  # applied in order
    script_name: str,
):
    @asset(
        name=asset_name,
        ins={
            f"upstream_{idx}": asset_input
            for idx, asset_input in enumerate(input_assets)
        },
        config_schema=override_spark_config_dagster_schema(),
        required_resource_keys={"executor", "spark_config", "data_config"}
    )
    async def processed_asset(context: OpExecutionContext, *inputs) -> str:
        do_spark_things(script_name, *inputs, ...)
However this doesn’t work because neither the upstream assets are named by key in the parameter list nor are the ins dict keys present in the parameter list -
Copy code
DagsterInvalidDefinitionError: Key 'upstream_0' in provided ins dict does not correspond to any of the names of the arguments to the decorated function
Is there any way to craft an asset factory that takes in upstream assets as inputs?
j

jamie

09/01/2022, 6:23 PM
Hi @fahad i haven't tried this myself yet, but our op factory example has the inputs as
**kwargs
so maybe try that?
Copy code
def create_spark_processed_asset(
    asset_name: str,
    input_assets: list[AssetIn],  # applied in order
    script_name: str,
):
    @asset(
        name=asset_name,
        ins={
            f"upstream_{idx}": asset_input
            for idx, asset_input in enumerate(input_assets)
        },
        config_schema=override_spark_config_dagster_schema(),
        required_resource_keys={"executor", "spark_config", "data_config"}
    )
    async def processed_asset(context: OpExecutionContext, **inputs) -> str: # change this line
        do_spark_things(script_name, *inputs, ...)
👀 1
f

fahad

09/01/2022, 7:11 PM
So this doesn’t throw any errors, but it also doesn’t show the asset on the asset graph
Unsure why, didn’t see any output either