Neil
04/07/2023, 9:15 PMsources:
- name: stg_product
database: STG
schema: PRODUCT
tables:
- name: orders
So I want to load this source in an asset, and use the snowflake IO manager to do it:
@asset(key_prefix=["stg_product"], io_manager_key="io_manager_snowflake")
def orders():
...
return some_panda_dataframe
But then it loads the dataframe to a a schema called "stg_product". Of course, I could change the key_prefix, but then then it won't connect to the downstream dbt models. How can I get it to write to the schema defined in the source, or else write to a schema independent different from the key_prefix? I feel like I've gotta be missing somethingjamie
04/10/2023, 2:59 PMNeil
04/10/2023, 3:09 PMjaffle_shop
schema, and moreover that it's not specifying the schema to be different than the name of the source in that example.
I'm not 100% sure what you mean by how the schema is being set, but for models in dbt, we set the schema on a per-folder basis (so models in the /{project}/dwh
path are assigned to the DWH
schema in the dbt_project.yml
). The big issue, I believe, is the sources though, which I posted above where I have to manually specify the database and schema since we're reading in data from a bunch of different sources and organize them into a separate databasekey_prefix
, I basically have it in the code snipper above. The source has the name stg_product
and so I set the key_prefix to be stg_product
, and make the function name the same as the source table name. Is there another way to hook up that dagster function to that dbt source?jamie
04/10/2023, 3:25 PMPRODUCT.orders
table. The snowflake IO manager determines the schema for a table from the key_prefix of the corresponding asset, so in order to store/load the source data in the PRODUCT
schema, the asset will need to look like this
@asset(
key_prefix=["PRODUCT"]
)
def orders() -> pd.DataFrame:
.....
but based on your message, the downstream dbt assets are looking for upstream assets called ["stg_product", "orders"]
. I think this is because the load_assets_from…
function is determining the key_prefix of sources from the source name, not the schema name. There’s a source_key_prefix
parameter that you could try setting to PRODUCT
and that might make it so that the dbt assets are looking for ``["PRODUCT", "orders"]` instead. If that doesn’t work you can set a function to determine what the full asset key should be, but i’ll need to get some help from a coworker to figure out what that function should be. so try source_key_prefix
first and if that doesn’t work let me know and i’ll look into the functionNeil
04/10/2023, 3:27 PMjamie
04/10/2023, 3:28 PMNeil
04/10/2023, 3:53 PMdbt_assets = load_assets_from_dbt_manifest(
json.load(manifest_file),
...
source_key_prefix=["product"],
)
and in the asset itself like so:
@asset(key_prefix=["product", "stg_product"], io_manager_key="io_manager_snowflake")
def orders():
...
return some_panda_dataframe
but it still wrote to stg_product
schema. I believe the snowflake IO Manager uses the last key as the schema name
I should also mention that we have sources in multiple different schemas, so I suppose that source_key_prefix would have needed to be dynamic in some way as welljamie
04/10/2023, 4:30 PM@asset(key_prefix=["product"], io_manager_key="io_manager_snowflake")
def orders():
...
return some_panda_dataframe
does that work?
for the function, basically you define a function that takes dbt metadata and returns the desired asset key. https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.load_assets_from_dbt_project the parameter is node_info_to_asset_key
. i haven’t been able to find a good example of its usage in our code base. so i would recommend doing something like this
def my_dbt_node_to_asset_key_function(dbt_metadata: Dict[str, Any]):
print(dbt_metadata)
return AssetKey(dbt_metadata["unique_id"].split(".")) # this is approximately what we do internally, just put it here as a starting point
and then based on what the dbt_metadata dictionary looks like you can put together the asset key that makes sense.load_assets_from…(<other_params>, node_info_to_asset_key=my_dbt_node_to_asset_key)
Neil
04/10/2023, 5:02 PMjamie
04/10/2023, 5:03 PMsource_asset_key
doesn’t behave as i expected it to. in that case, then the function will probably be your best betNeil
04/10/2023, 5:52 PM_get_node_asset_key
which is basically the same as Dagster dbt's version with some slight tweaks:
def _get_node_asset_key(node_info) -> AssetKey:
if node_info["resource_type"] == "source":
configured_database = node_info["database"]
configured_schema = node_info["schema"]
if configured_schema is not None and configured_database is not None:
components = [configured_database, configured_schema, node_info["name"]]
else:
components = [node_info["source_name"], node_info["name"]]
else:
... # Same as original
and then passed that in to `load_assets_from_`:
load_assets_from_dbt_manifest(
...,
source_key_prefix=["dbt_source"],
node_info_to_asset_key=_get_node_asset_key
)
and it works like a charm. Have this as my Asset now, for reference (with the additional source_key_prefix
just for organization):
@asset(key_prefix=["dbt_source", "STG", "PRODUCT"], io_manager_key="io_manager_snowflake")
def orders():
...
return some_panda_dataframe
jamie
04/10/2023, 5:53 PM