Hi guys, could you please give me a hand here. I a...
# integration-dbt
s
Hi guys, could you please give me a hand here. I am using Dagster+dbt+bigquery for some reason my asset in dagster can't read the object produced by dbt, not sure what I am missing here is my dagster project defs
Copy code
DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = DBT_PROJECT_DIR  + "/config"

ASSETS = [*dbt_assets, *raw_assets]

defs = Definitions(
    assets = ASSETS,
    resources = {
        "db_io_manager": bigquery_pandas_io_manager.configured (
            {
                "project": {"env": "PROJECT_ID"},
                "dataset": "analytics"
            }
        ),
        "dbt": dbt_cli_resource.configured (
            {
                "project_dir": DBT_PROJECT_DIR, 
                "profiles_dir": DBT_PROFILES_DIR,
                "target": "bigquery_stage"
            }
        )
    }
)
Dagster assets
Copy code
from dagster import AssetIn, asset
import pandas as pd

@asset(
    ins={"my_model": AssetIn(key_prefix=["analytics"])},
)
def dagster_asset(my_model: pd.DataFrame):
    return my_model
Copy code
from dagster import asset, load_assets_from_package_module, file_relative_path
from dagster_dbt import load_assets_from_dbt_project
from . import raw

DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project")
DBT_PROFILES_DIR = DBT_PROJECT_DIR  + "/config"

# all assets live in the default dbt_schema
dbt_assets = load_assets_from_dbt_project(
    DBT_PROJECT_DIR,
    DBT_PROFILES_DIR,
    key_prefix=["analytics"],
)

raw_assets = load_assets_from_package_module(
    package_module = raw,
    group_name = 'external',
    key_prefix = ["analytics"]
)
and my dbt model simple as
Copy code
select 1 as id, 'sebastian' as nombre
my dbt model is materialised in bigquery. but the dependant asset is failing with the following error. Not sure why is looking for the file inside my local directory when it should look for it in bigquery. (BTW I am using the new Bigquery IO Manager)
Copy code
FileNotFoundError: [Errno 2] No such file or directory: '/Users/charrier/Repos/my_dagster_dbt/tmpw386n_pf/storage/analytics/my_model' 
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 835, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 150, in load_input
    return self._load_single_input(path, context)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 108, in _load_single_input
    obj = self.load_from_path(context=context, path=path)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/fs_io_manager.py", line 173, in load_from_path
    with path.open("rb") as file:
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/pathlib.py", line 1119, in open
    return self._accessor.open(self, mode, buffering, encoding, errors,
r
You need to specify the IO manager on your assets:
Copy code
dbt_assets = load_assets_from_dbt_project(
    DBT_PROJECT_DIR,
    DBT_PROFILES_DIR,
    key_prefix=["analytics"],
    io_manager_key="db_io_manager",
)
s
Hi rex, yes I have tried that as well... It is very strange because I have created a previous dagster asset that feeds dbt and works well. Is when dbt feeds back a dagster asset when it fails
r
What’s the error that you get when you specify
io_manager_key="db_io_manager",
? @jamie might be another example of the issue that you encountered the other day, where the IO manager is looking for a table that doesn’t correspond to the alias that’s configured in the dbt project
s
Same error. I am trying with duckdb now to verify that it is not a bigquery specific problem. But I have the same result.
image.png
This are my assets definitions
j
I think you also need to specify
db_io_manager
as the io manager for the dbt assets. Behind the scenes, when we load an input, we use the io manager for the asset corresponding to the input, not the io manager for the asset that’s being run. You should be able to pass
io_manager_key="db_io_manager"
as a parameter to
load_assets_from_dbt_project
Example for how we determine the io manager to use
Copy code
@asset(
   io_manager_key="io_manager_1"
)
def asset_1():
   return 1 # this output is stored using "io_manager_1"

@asset(
    io_manager_key="io_manager_2"
)
def asset_2(asset_1):
   # asset_1 is loaded using "io_manager_1"
   return asset_1 + 1 # this output is stored using "io_manager_2"
🎉 1
🙌 1
s
amazing, that was the problem. thanks a lot. In my mind I though the "dbt" io manager was enough. so it means, every asset referencing an upstream dbt model will need to use the same io manager I have defined in
load_assets_from_dbt_project
j
so it means, every asset referencing an upstream dbt model will need to use the same io manager I have defined in
load_assets_from_dbt_project
No, the IO manager you attach to
load_assets_from_dbt_project
will be used to load any of the dbt assets in any downstream asests. The IO manager on the downstream asset will be used to store that asset and load it in any further downstream assets