marcos
05/16/2022, 2:09 PMload_assets_from_dbt_project
, but running into an issue due to my dbt project using packages.
_“dbt found 2 package(s) specified in packages.yml, but only 0 package(s) installed in dbt_packages. Run “dbt deps” to install package dependencies.”_
When I was using ops, I’d use a dbt resource key and call dbt.cli
(example). With assets it is unclear how to handle this. More details in thread.import os
from dagster import (
AssetGroup,
asset)
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
@asset(
name="dbt_deps",
required_resource_keys={"dbt"},
compute_kind="python")
def dbt_deps(context):
context.resources.dbt.cli(command="deps")
dbt_assets = load_assets_from_dbt_project(
project_dir=os.getenv("DBT_PROJECT_DIR"),
profiles_dir=os.getenv("DBT_PROFILES_DIR"),
select="tag:powerschool"
)
asset_dev_group = AssetGroup(
[dbt_deps] + dbt_assets,
resource_defs={
"dbt": dbt_cli_resource.configured({
"project_dir": os.getenv("DBT_PROJECT_DIR"),
"profiles_dir": os.getenv("DBT_PROFILES_DIR"),
"target": "dev"
})
}
)
asset_dev_job = asset_dev_group.build_job(name="powerschool_assets")
owen
05/16/2022, 4:49 PMdbt run
command also fire off a dbt deps
command right before that. Unfortunately, that's a little tricky as you're importing this directly from the library, so you don't have all that much flexibility there. We're planning on making this integration more flexible in the future (so that it's easier to swap in different computations if necessary), but for now a hacky solution might be to replace the dbt_cli_resource with one whose run
function will run dbt deps
before dbt run
. something like
from dagster import ResourceDefinition
from dagster_dbt.cli.resources import DbtCliResource
class HackedDbtResource(DbtCliResource):
def __init__(self, project_dir: str, profiles_dir: str, target: str):
self._default_flags = default_flags
self._executable = executable
self._warn_error = warn_error
self._ignore_handled_error = ignore_handled_error
self._target_path = target_path
super().__init__(
executable="dbt",
default_flags={"project-dir": project_dir, "profiles-dir": profiles_dir, "target": target},
warn_error=False,
ignore_handled_error=False,
target_path="target",
)
def run(self, **kwargs):
self.cli("deps")
super().run(**kwargs)
my_dbt_resource = ResourceDefinition.hardcoded_resource(HackedDbtResource(project_dir=..., profiles_dir=..., target=...))