Jacob Marcil
02/24/2023, 5:13 PMMulti_asset
configured with Snowflake as an IO_MANAGER.
I can’t find a way to make the io_manager write to the right schema.
Looking at the doc here , configuring key_prefix
or metadata={"schema": "my_schema"}
should do the trick.
But I’ve tried both configuration and it’s always trying to insert data to the default schema (PUBLIC
).
I have the same configuration with key_prefix
on an asset
and it’s working as expected.
Is there something I’m missing?
@multi_asset(
partitions_def=dar_hourly_partitions,
op_tags={"kind": "s3"},
required_resource_keys={"aws_secrets"},
outs={
"collect": AssetOut(
metadata={"priority": "high"},
io_manager_key="io_manager",
key=AssetKey("dar_tokens_price"),
),
"missing": AssetOut(
is_required=False,
metadata={"partition_expr": "requested_window_start", "schema": "MY_TEST_SCHEMA"},
io_manager_key="warehouse_io_manager",
key=AssetKey("tokens_not_found"),
key_prefix=["MY_TEST_SCHEMA"],
dagster_type=HourlyMissingDarTokensPricesDgType,
),
},
can_subset=False,
)
def dar_hourly_token_price_s3(context: OpExecutionContext):
hour = context.asset_partition_key_for_output("collect") # Since can_subset=False, this will always be provided
json_response, missing_tokens_df = get_data(hour)
yield Output(value=json_response, output_name="collect")
if missing_tokens_df is not None:
yield Output(value=missing_tokens_df, output_name="missing")
claire
02/24/2023, 9:17 PMjamie
02/27/2023, 4:41 PMkey_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name. When using ``@multi_asset``, the
asset name defaults to the key of the "outs" dictionary Only one of the "key_prefix" and
"key" arguments should be provided.
key (Optional[Union[str, Sequence[str], AssetKey]]): The asset's key. Only one of the
"key_prefix" and "key" arguments should be provided.
So in your case you can fix this by directly putting the key_prefix value in the key, like this:
@multi_asset(
partitions_def=dar_hourly_partitions,
op_tags={"kind": "s3"},
required_resource_keys={"aws_secrets"},
outs={
"collect": AssetOut(
metadata={"priority": "high"},
io_manager_key="io_manager",
key=AssetKey("dar_tokens_price"),
),
"missing": AssetOut(
is_required=False,
metadata={"partition_expr": "requested_window_start"},
io_manager_key="warehouse_io_manager",
key=AssetKey(["MY_TEST_SCHEMA", "tokens_not_found"]),
dagster_type=HourlyMissingDarTokensPricesDgType,
),
},
can_subset=False,
)
Jacob Marcil
02/27/2023, 4:57 PM