https://dagster.io/ logo
Title
n

Neil

04/07/2023, 9:15 PM
Hey all, I have a question about dbt and asset keys for Snowflake IO Manager. Specifically, if I want an asset to produce data for a source in dbt, I use asset keys for that, but then asset keys simultaneously are used by the snowflake IO manager to provide schema info. Right now, it looks like these two will conflict in my project and I'm not sure what to do about it
As an example, I have a dbt project with this source defined:
sources:
  - name: stg_product
    database: STG
    schema: PRODUCT
    tables:
      - name: orders
So I want to load this source in an asset, and use the snowflake IO manager to do it:
@asset(key_prefix=["stg_product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
But then it loads the dataframe to a a schema called "stg_product". Of course, I could change the key_prefix, but then then it won't connect to the downstream dbt models. How can I get it to write to the schema defined in the source, or else write to a schema independent different from the key_prefix? I feel like I've gotta be missing something
j

jamie

04/10/2023, 2:59 PM
hey @Neil - how are you setting the key_prefix for the dbt project? You may have already seen this, but the dbt tutorial uses the duckdb io manager (which inherits from the same base class as the snowflake io manager, so it functions in the same way just stores data in duckdb) https://docs.dagster.io/integrations/dbt/using-dbt-with-dagster/part-one it talks through getting the schema/key prefixes to be in line if you share how the schema is being set for the dbt project, and any key prefixes you’re adding i can help sort out how the other assets should be
n

Neil

04/10/2023, 3:09 PM
Hi @jamie, thanks for your help! Yeah, I used that tutorial as my starting point, but it looks like all operations are being done on the same
jaffle_shop
schema, and moreover that it's not specifying the schema to be different than the name of the source in that example. I'm not 100% sure what you mean by how the schema is being set, but for models in dbt, we set the schema on a per-folder basis (so models in the
/{project}/dwh
path are assigned to the
DWH
schema in the
dbt_project.yml
). The big issue, I believe, is the sources though, which I posted above where I have to manually specify the database and schema since we're reading in data from a bunch of different sources and organize them into a separate database
Oh, and for the
key_prefix
, I basically have it in the code snipper above. The source has the name
stg_product
and so I set the key_prefix to be
stg_product
, and make the function name the same as the source table name. Is there another way to hook up that dagster function to that dbt source?
j

jamie

04/10/2023, 3:25 PM
ok so i think i see the problem. I’m not super deeply familiar with our dbt integration, so i made an incorrect assumption. the dbt project is looking for source data in the
PRODUCT.orders
table. The snowflake IO manager determines the schema for a table from the key_prefix of the corresponding asset, so in order to store/load the source data in the
PRODUCT
schema, the asset will need to look like this
@asset(
    key_prefix=["PRODUCT"]
)
def orders() -> pd.DataFrame:
    .....
but based on your message, the downstream dbt assets are looking for upstream assets called
["stg_product", "orders"]
. I think this is because the
load_assets_from…
function is determining the key_prefix of sources from the source name, not the schema name. There’s a
source_key_prefix
parameter that you could try setting to
PRODUCT
and that might make it so that the dbt assets are looking for ``["PRODUCT", "orders"]` instead. If that doesn’t work you can set a function to determine what the full asset key should be, but i’ll need to get some help from a coworker to figure out what that function should be. so try
source_key_prefix
first and if that doesn’t work let me know and i’ll look into the function
n

Neil

04/10/2023, 3:27 PM
Sweet, I'll try that out and let you know! Thanks again for helping with this
j

jamie

04/10/2023, 3:28 PM
yeah no worries. sorry the interface between these two integrations is less than smooth. i’m taking some notes to try and address some of the sticking points in the future
n

Neil

04/10/2023, 3:53 PM
Ah, sadly didn't work 😕 I set the source_key_prefix like so:
dbt_assets = load_assets_from_dbt_manifest(
        json.load(manifest_file),
        ...
        source_key_prefix=["product"],
    )
and in the asset itself like so:
@asset(key_prefix=["product", "stg_product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
but it still wrote to
stg_product
schema. I believe the snowflake IO Manager uses the last key as the schema name I should also mention that we have sources in multiple different schemas, so I suppose that source_key_prefix would have needed to be dynamic in some way as well
If it's not too much trouble, would love to hear about the function you mentioned
j

jamie

04/10/2023, 4:30 PM
if you define the asset like
@asset(key_prefix=["product"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
does that work? for the function, basically you define a function that takes dbt metadata and returns the desired asset key. https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.load_assets_from_dbt_project the parameter is
node_info_to_asset_key
. i haven’t been able to find a good example of its usage in our code base. so i would recommend doing something like this
def my_dbt_node_to_asset_key_function(dbt_metadata: Dict[str, Any]):
      print(dbt_metadata)
      return AssetKey(dbt_metadata["unique_id"].split(".")) # this is approximately what we do internally, just put it here as a starting point
and then based on what the dbt_metadata dictionary looks like you can put together the asset key that makes sense.
your load_assets function would become
load_assets_from…(<other_params>, node_info_to_asset_key=my_dbt_node_to_asset_key)
n

Neil

04/10/2023, 5:02 PM
If I define the asset like that, it will load to the right schema but it doesn't recognize any dbt tasks to be downstream of it. I'll play around with that function and see if I can get it working that way
j

jamie

04/10/2023, 5:03 PM
ah i see, so the
source_asset_key
doesn’t behave as i expected it to. in that case, then the function will probably be your best bet
n

Neil

04/10/2023, 5:52 PM
Aha! Got it working with that. Made a function
_get_node_asset_key
which is basically the same as Dagster dbt's version with some slight tweaks:
def _get_node_asset_key(node_info) -> AssetKey:
    if node_info["resource_type"] == "source":
        configured_database = node_info["database"]
        configured_schema = node_info["schema"]
        if configured_schema is not None and configured_database is not None:
            components = [configured_database, configured_schema, node_info["name"]]
        else:
            components = [node_info["source_name"], node_info["name"]]
    else:
        ... # Same as original
and then passed that in to `load_assets_from_`:
load_assets_from_dbt_manifest(
        ...,
        source_key_prefix=["dbt_source"],
        node_info_to_asset_key=_get_node_asset_key
    )
and it works like a charm. Have this as my Asset now, for reference (with the additional
source_key_prefix
just for organization):
@asset(key_prefix=["dbt_source", "STG", "PRODUCT"], io_manager_key="io_manager_snowflake")
def orders():
    ...
    return some_panda_dataframe
Thanks so much for your help!
j

jamie

04/10/2023, 5:53 PM
amazing! so glad that worked for you!