fahad
09/01/2022, 4:56 PMdef create_spark_processed_asset(
asset_name: str,
input_assets: list[AssetIn], # applied in order
script_name: str,
):
@asset(
name=asset_name,
ins={
f"upstream_{idx}": asset_input
for idx, asset_input in enumerate(input_assets)
},
config_schema=override_spark_config_dagster_schema(),
required_resource_keys={"executor", "spark_config", "data_config"}
)
async def processed_asset(context: OpExecutionContext, *inputs) -> str:
do_spark_things(script_name, *inputs, ...)
However this doesn’t work because neither the upstream assets are named by key in the parameter list nor are the ins dict keys present in the parameter list -
DagsterInvalidDefinitionError: Key 'upstream_0' in provided ins dict does not correspond to any of the names of the arguments to the decorated function
Is there any way to craft an asset factory that takes in upstream assets as inputs?jamie
09/01/2022, 6:23 PM**kwargs
so maybe try that?
def create_spark_processed_asset(
asset_name: str,
input_assets: list[AssetIn], # applied in order
script_name: str,
):
@asset(
name=asset_name,
ins={
f"upstream_{idx}": asset_input
for idx, asset_input in enumerate(input_assets)
},
config_schema=override_spark_config_dagster_schema(),
required_resource_keys={"executor", "spark_config", "data_config"}
)
async def processed_asset(context: OpExecutionContext, **inputs) -> str: # change this line
do_spark_things(script_name, *inputs, ...)
fahad
09/01/2022, 7:11 PM