https://dagster.io/ logo
#ask-community
Title
# ask-community
j

Jacob Marcil

02/24/2023, 5:13 PM
Hi, I have a
Multi_asset
configured with Snowflake as an IO_MANAGER. I can’t find a way to make the io_manager write to the right schema. Looking at the doc here , configuring
key_prefix
or
metadata={"schema": "my_schema"}
should do the trick. But I’ve tried both configuration and it’s always trying to insert data to the default schema (
PUBLIC
). I have the same configuration with
key_prefix
on an
asset
and it’s working as expected. Is there something I’m missing?
Copy code
@multi_asset(
    partitions_def=dar_hourly_partitions,
    op_tags={"kind": "s3"},
    required_resource_keys={"aws_secrets"},
    outs={
        "collect": AssetOut(
            metadata={"priority": "high"},
            io_manager_key="io_manager",
            key=AssetKey("dar_tokens_price"),
        ),
        "missing": AssetOut(
            is_required=False,
            metadata={"partition_expr": "requested_window_start", "schema": "MY_TEST_SCHEMA"},
            io_manager_key="warehouse_io_manager",
            key=AssetKey("tokens_not_found"),
            key_prefix=["MY_TEST_SCHEMA"],
            dagster_type=HourlyMissingDarTokensPricesDgType,
        ),
    },
    can_subset=False,
)
def dar_hourly_token_price_s3(context: OpExecutionContext):

    hour = context.asset_partition_key_for_output("collect")  # Since can_subset=False, this will always be provided

    json_response, missing_tokens_df = get_data(hour)

    yield Output(value=json_response, output_name="collect")

    if missing_tokens_df is not None:
        yield Output(value=missing_tokens_df, output_name="missing")
c

claire

02/24/2023, 9:17 PM
cc @jamie
j

jamie

02/27/2023, 4:41 PM
hey @Jacob Marcil this might be a bug. i’ll look into it
thank you box 1
ok this is happening because key_prefix and key aren’t compatible in multi_assets. from the docs for AssetOut
Copy code
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
            concatenation of the key_prefix and the asset's name. When using ``@multi_asset``, the
            asset name defaults to the key of the "outs" dictionary Only one of the "key_prefix" and
            "key" arguments should be provided.
        key (Optional[Union[str, Sequence[str], AssetKey]]): The asset's key. Only one of the
            "key_prefix" and "key" arguments should be provided.
So in your case you can fix this by directly putting the key_prefix value in the key, like this:
Copy code
@multi_asset(
    partitions_def=dar_hourly_partitions,
    op_tags={"kind": "s3"},
    required_resource_keys={"aws_secrets"},
    outs={
        "collect": AssetOut(
            metadata={"priority": "high"},
            io_manager_key="io_manager",
            key=AssetKey("dar_tokens_price"),
        ),
        "missing": AssetOut(
            is_required=False,
            metadata={"partition_expr": "requested_window_start"},
            io_manager_key="warehouse_io_manager",
            key=AssetKey(["MY_TEST_SCHEMA", "tokens_not_found"]),
            dagster_type=HourlyMissingDarTokensPricesDgType,
        ),
    },
    can_subset=False,
)
j

Jacob Marcil

02/27/2023, 4:57 PM
Oh thank you didn’t see that part of the doc. Thank you very much 🙂
2 Views