Hey all, I have a question about dbt and asset key...
# integration-snowflake
n
Hey all, I have a question about dbt and asset keys for Snowflake IO Manager. Specifically, if I want an asset to produce data for a source in dbt, I use asset keys for that, but then asset keys simultaneously are used by the snowflake IO manager to provide schema info. Right now, it looks like these two will conflict in my project and I'm not sure what to do about it
As an example, I have a dbt project with this source defined:
Copy code
sources:
  - name: stg_product
    database: STG
    schema: PRODUCT
    tables:
      - name: orders
So I want to load this source in an asset, and use the snowflake IO manager to do it:
Copy code
@asset(key_prefix=["stg_product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
But then it loads the dataframe to a a schema called "stg_product". Of course, I could change the key_prefix, but then then it won't connect to the downstream dbt models. How can I get it to write to the schema defined in the source, or else write to a schema independent different from the key_prefix? I feel like I've gotta be missing something
j
hey @Neil - how are you setting the key_prefix for the dbt project? You may have already seen this, but the dbt tutorial uses the duckdb io manager (which inherits from the same base class as the snowflake io manager, so it functions in the same way just stores data in duckdb) https://docs.dagster.io/integrations/dbt/using-dbt-with-dagster/part-one it talks through getting the schema/key prefixes to be in line if you share how the schema is being set for the dbt project, and any key prefixes you’re adding i can help sort out how the other assets should be
n
Hi @jamie, thanks for your help! Yeah, I used that tutorial as my starting point, but it looks like all operations are being done on the same
jaffle_shop
schema, and moreover that it's not specifying the schema to be different than the name of the source in that example. I'm not 100% sure what you mean by how the schema is being set, but for models in dbt, we set the schema on a per-folder basis (so models in the
/{project}/dwh
path are assigned to the
DWH
schema in the
dbt_project.yml
). The big issue, I believe, is the sources though, which I posted above where I have to manually specify the database and schema since we're reading in data from a bunch of different sources and organize them into a separate database
Oh, and for the
key_prefix
, I basically have it in the code snipper above. The source has the name
stg_product
and so I set the key_prefix to be
stg_product
, and make the function name the same as the source table name. Is there another way to hook up that dagster function to that dbt source?
j
ok so i think i see the problem. I’m not super deeply familiar with our dbt integration, so i made an incorrect assumption. the dbt project is looking for source data in the
PRODUCT.orders
table. The snowflake IO manager determines the schema for a table from the key_prefix of the corresponding asset, so in order to store/load the source data in the
PRODUCT
schema, the asset will need to look like this
Copy code
@asset(
    key_prefix=["PRODUCT"]
)
def orders() -> pd.DataFrame:
    .....
but based on your message, the downstream dbt assets are looking for upstream assets called
["stg_product", "orders"]
. I think this is because the
load_assets_from…
function is determining the key_prefix of sources from the source name, not the schema name. There’s a
source_key_prefix
parameter that you could try setting to
PRODUCT
and that might make it so that the dbt assets are looking for ``["PRODUCT", "orders"]` instead. If that doesn’t work you can set a function to determine what the full asset key should be, but i’ll need to get some help from a coworker to figure out what that function should be. so try
source_key_prefix
first and if that doesn’t work let me know and i’ll look into the function
n
Sweet, I'll try that out and let you know! Thanks again for helping with this
j
yeah no worries. sorry the interface between these two integrations is less than smooth. i’m taking some notes to try and address some of the sticking points in the future
n
Ah, sadly didn't work 😕 I set the source_key_prefix like so:
Copy code
dbt_assets = load_assets_from_dbt_manifest(
        json.load(manifest_file),
        ...
        source_key_prefix=["product"],
    )
and in the asset itself like so:
Copy code
@asset(key_prefix=["product", "stg_product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
but it still wrote to
stg_product
schema. I believe the snowflake IO Manager uses the last key as the schema name I should also mention that we have sources in multiple different schemas, so I suppose that source_key_prefix would have needed to be dynamic in some way as well
If it's not too much trouble, would love to hear about the function you mentioned
j
if you define the asset like
Copy code
@asset(key_prefix=["product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
does that work? for the function, basically you define a function that takes dbt metadata and returns the desired asset key. https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.load_assets_from_dbt_project the parameter is
node_info_to_asset_key
. i haven’t been able to find a good example of its usage in our code base. so i would recommend doing something like this
Copy code
def my_dbt_node_to_asset_key_function(dbt_metadata: Dict[str, Any]):
      print(dbt_metadata)
      return AssetKey(dbt_metadata["unique_id"].split(".")) # this is approximately what we do internally, just put it here as a starting point
and then based on what the dbt_metadata dictionary looks like you can put together the asset key that makes sense.
your load_assets function would become
load_assets_from…(<other_params>, node_info_to_asset_key=my_dbt_node_to_asset_key)
n
If I define the asset like that, it will load to the right schema but it doesn't recognize any dbt tasks to be downstream of it. I'll play around with that function and see if I can get it working that way
j
ah i see, so the
source_asset_key
doesn’t behave as i expected it to. in that case, then the function will probably be your best bet
n
Aha! Got it working with that. Made a function
_get_node_asset_key
which is basically the same as Dagster dbt's version with some slight tweaks:
Copy code
def _get_node_asset_key(node_info) -> AssetKey:
    if node_info["resource_type"] == "source":
        configured_database = node_info["database"]
        configured_schema = node_info["schema"]
        if configured_schema is not None and configured_database is not None:
            components = [configured_database, configured_schema, node_info["name"]]
        else:
            components = [node_info["source_name"], node_info["name"]]
    else:
        ... # Same as original
and then passed that in to `load_assets_from_`:
Copy code
load_assets_from_dbt_manifest(
        ...,
        source_key_prefix=["dbt_source"],
        node_info_to_asset_key=_get_node_asset_key
    )
and it works like a charm. Have this as my Asset now, for reference (with the additional
source_key_prefix
just for organization):
Copy code
@asset(key_prefix=["dbt_source", "STG", "PRODUCT"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
Thanks so much for your help!
j
amazing! so glad that worked for you!