Sebastian Charrier
03/08/2023, 7:04 PMDBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
ASSETS = [*dbt_assets, *raw_assets]
defs = Definitions(
assets = ASSETS,
resources = {
"db_io_manager": bigquery_pandas_io_manager.configured (
{
"project": {"env": "PROJECT_ID"},
"dataset": "analytics"
}
),
"dbt": dbt_cli_resource.configured (
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILES_DIR,
"target": "bigquery_stage"
}
)
}
)
Dagster assets
from dagster import AssetIn, asset
import pandas as pd
@asset(
ins={"my_model": AssetIn(key_prefix=["analytics"])},
)
def dagster_asset(my_model: pd.DataFrame):
return my_model
from dagster import asset, load_assets_from_package_module, file_relative_path
from dagster_dbt import load_assets_from_dbt_project
from . import raw
DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
# all assets live in the default dbt_schema
dbt_assets = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
key_prefix=["analytics"],
)
raw_assets = load_assets_from_package_module(
package_module = raw,
group_name = 'external',
key_prefix = ["analytics"]
)
and my dbt model simple as
select 1 as id, 'sebastian' as nombre
my dbt model is materialised in bigquery. but the dependant asset is failing with the following error. Not sure why is looking for the file inside my local directory when it should look for it in bigquery. (BTW I am using the new Bigquery IO Manager)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/charrier/Repos/my_dagster_dbt/tmpw386n_pf/storage/analytics/my_model'
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 835, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 150, in load_input
return self._load_single_input(path, context)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 108, in _load_single_input
obj = self.load_from_path(context=context, path=path)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/dagster/_core/storage/fs_io_manager.py", line 173, in load_from_path
with path.open("rb") as file:
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/pathlib.py", line 1119, in open
return self._accessor.open(self, mode, buffering, encoding, errors,
rex
03/08/2023, 11:17 PMdbt_assets = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
key_prefix=["analytics"],
io_manager_key="db_io_manager",
)
Sebastian Charrier
03/08/2023, 11:19 PMrex
03/08/2023, 11:24 PMio_manager_key="db_io_manager",
?
@jamie might be another example of the issue that you encountered the other day, where the IO manager is looking for a table that doesn’t correspond to the alias that’s configured in the dbt projectSebastian Charrier
03/08/2023, 11:28 PMjamie
03/08/2023, 11:33 PMdb_io_manager
as the io manager for the dbt assets. Behind the scenes, when we load an input, we use the io manager for the asset corresponding to the input, not the io manager for the asset that’s being run. You should be able to pass io_manager_key="db_io_manager"
as a parameter to load_assets_from_dbt_project
Example for how we determine the io manager to use
@asset(
io_manager_key="io_manager_1"
)
def asset_1():
return 1 # this output is stored using "io_manager_1"
@asset(
io_manager_key="io_manager_2"
)
def asset_2(asset_1):
# asset_1 is loaded using "io_manager_1"
return asset_1 + 1 # this output is stored using "io_manager_2"
Sebastian Charrier
03/08/2023, 11:43 PMload_assets_from_dbt_project
jamie
03/09/2023, 3:33 PMso it means, every asset referencing an upstream dbt model will need to use the same io manager I have defined inNo, the IO manager you attach toload_assets_from_dbt_project
load_assets_from_dbt_project
will be used to load any of the dbt assets in any downstream asests. The IO manager on the downstream asset will be used to store that asset and load it in any further downstream assets