Hello Team. <@U04MJTPUVDK> <@U046AGUTUAV> I am doi...
# ask-community
s
Hello Team. @Gary Nicholson @Sean Lopp I am doing a POC with dbt+Bigquery+Dasgter I am using the Dagster_starter project with dbt. and for the IO Manager I am using this https://github.com/dagster-io/quickstart-gcp/blob/main/quickstart_gcp/io_managers.py Currently I only have 2 assets. orders (Which is a csv) and orders_cleaned (Which is a dbt model using orders) When I tried to Materialize all, it works ok for orders but It creates the table with the prefix on it. ex: raw_data__orders then dbt asset orders_cleaned fails because is trying to read from a table called orders which does not exists. I have tried removing the prefix, but then I loose the lineage between the assets (see image attached) Here is the code:
Copy code
@asset(compute_kind="random", io_manager_key="io_manager_raw")
def orders() -> pd.DataFrame:
    data = pd.read_csv("<https://docs.dagster.io/assets/orders.csv>")
    return data
Copy code
dbt_assets = load_assets_from_dbt_project(
    DBT_PROJECT_DIR,
    DBT_PROFILES_DIR,
)

raw_data_assets = load_assets_from_package_module(
    raw_data,
    group_name="raw_data",
    # all of these assets live in the duckdb database, under the schema raw_data
    key_prefix=["raw_data"],
)

resources = {
    "io_manager": bigquery_pandas_io_manager.configured(
        {
            "credentials": {"env": "BIGQUERY_SERVICE_ACCOUNT_CREDENTIALS"},
            "project_id": {"env": "BIGQUERY_PROJECT_ID"},
            "dataset_id": "analytics"
        }
    ),
    # this io_manager is responsible for storing/loading our pickled machine learning model
    "model_io_manager": fs_io_manager,
    # this resource is used to execute dbt cli commands
    "dbt": dbt_cli_resource.configured(
        {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
    ),
}

defs = Definitions(
    assets=[*dbt_assets,*raw_data_assets],
    resources=resources,
)
I am also trying to write data in bigquery in 2 different datasets. raw_data in one dataset and dbt generated data in a different dataset called analytics Thank you in advance
🤖 1
s
What does your
sources.yaml
file look like for the dbt project? In that example, the BQ IO manager assumes a single dataset and then it names the table with the entire asset key (prefix + asset name). https://github.com/dagster-io/quickstart-gcp/blob/main/quickstart_gcp/io_managers.py#L34-L38 So you could potentially do a few things: • update sources.yaml to tell it to read from raw_data___orders • remove the key prefix altogether • tweak the IO manager behavior, for example if you want to only use the asset name for the table name, you'd do something like this: https://github.com/slopp/dagster-conditional-etl-gcp-demo/blob/main/dagster_project/resources.py#L80-L81
s
I tried the first suggestion this is my original sources.yml file
Copy code
version: 2

sources:
  - name: raw_data
    tables:
      - name: orders
      - name: users
  - name: forecasting
    tables:
      - name: predicted_orders
I change it to
Copy code
sources:
  - name: raw_data
    tables:
      - name: raw_data__orders
      - name: users
  - name: forecasting
    tables:
      - name: predicted_orders
but this is what happens
if I remove the key prefix. I have something similar
will try to modify the IO manger
s
ah gotcha, yea the first result makes sense and was a poor suggestion on my part. if you remove the prefix altogether what happened? fwiw @jamie is working on an official bq io manager and it will likely tackle many of these challenges other IO managers treat the key prefix and database schema as synonyms, but because BQ does not have a schema concept we'll likely adjust the quickstart once the official IO manager is out
s
yes, seems like the solution is in modify the IO manager. and agree, I prefer 3 levels of hierarchy like Snowflake and Databricks (wit data catalogs). but Bigquery needs a different approach. is there any way to follow jamie's work?
j
I don’t have a PR for the BQ io manager work yet, but i can post it here when it exists
s
awesome, Thank you!!!
m
node_info_to_asset_key will remove the pre fix from the dbt source table dbt_assets = load_assets_from_dbt_project( project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, node_info_to_asset_key=lambda node_info: AssetKey(node_info["name"]) )