Yuval Goldberg
08/30/2023, 12:38 PMload_assets_from_dbt_manifest
API , we used to load multiple dbt projects like the following - parsing each project directory dynamically within the code by creating each a manifest.json
file
dbt_resources = {
f"dbt_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
project_dir=str(DBT_PROJECTS_DIR / dbt_project),
profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
profile=DBT_PROFILE,
target=DBT_TARGET,
) for dbt_project in DBT_PROJECTS
}
_dbt_manifests = {}
_dbt_resource_lock = threading.Lock()
def load_dbt_assets():
assets = []
for key, dbt_resource in dbt_resources.items():
if key not in _dbt_manifests:
matches = glob(f"{dbt_resource.project_dir}/target/**/manifest.json", recursive=True)
if matches:
_dbt_manifests[key] = Path(matches[0])
else:
with _dbt_resource_lock:
dbt_resource.cli(["deps"], manifest={}, raise_on_error=True).wait()
dbt_parse_invocation = dbt_resource.cli(["parse"], manifest={}, raise_on_error=True).wait()
_dbt_manifests[key] = dbt_parse_invocation.target_path.joinpath("manifest.json")
assets.append(
*load_assets_from_dbt_manifest(
manifest=_dbt_manifests[key],
use_build_command=False, # TODO: Change to load seeds, run, and test
dbt_resource_key=key
)
)
return assets
It works fine, but dbt seed can't be running as part of the asset (use_build_command
provides either running the dbt run
or dbt build
commands)
I'd like to either run dbt seed
or run avoid running dbt test
I noticed that the newer API, via the @dbt_asset
decorator I'll have more control over what's running for every asset.
The question is - is there a way to build dbt assets dynamically with the dbt_asset
decorator? so I'll have a dynamic amount of assets, depending on a DBT_PROJECTS
env var. I wasn't able to find such documentation
Thanks!rex
08/30/2023, 1:42 PMYuval Goldberg
08/30/2023, 2:04 PMdef dbt_asset_compute(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
dbt_asset_compute.__name__ = f"dbt_assets_{key}"
assets.append(dbt_assets(
manifest=_dbt_manifests[key],
)(dbt_asset_compute))
maybe I miss this part - how do I specify which io-resource to be used? notice that in the previous snippet I used dbt_resource_key=key
to pass the predefined dbt resource
is it possible not to use the default dbt
resource? (so every project uses a different dbt resource)rex
08/30/2023, 2:23 PMwith_resources
? https://docs.dagster.io/concepts/resources
dbt_assets(
manifest=_dbt_manifests[key],
dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(
asset_key_prefix=[TENANT, basename(dbt_resource.project_dir)],
source_asset_key_prefix=[TENANT]
),
)(dbt_asset_compute)
.with_resources({"dbt": dbt_resource})
Yuval Goldberg
08/30/2023, 2:46 PMresource with key 'dbt' required by op 'dbt_assets_dbt_my_proj', but received <class 'dagster_dbt.core.resources_v2.DbtCliResource'>
Does the key has to be dbt
? with the previous code I was able to define customized keysrex
08/30/2023, 2:50 PMdbt
. The key is the name of the DbtCliResource
parameter in your dbt_asset_compute
function. E.g. the name of the resource here would be my_custom_dbt_name
.
def dbt_asset_compute(context: OpExecutionContext, my_custom_dbt_name: DbtCliResource):
...
But since you're generating these compute functions dynamically, it would be a bit more involved to make the parameter name dynamic as well. Which is why I would recommend stick with dbt
as the key and using with_resources
. But if you want to have custom keys, then you should change the parameter name dynamically.Yuval Goldberg
08/30/2023, 2:55 PMrex
08/30/2023, 3:09 PMmy_dbt_assets = dbt_assets(
manifest=_dbt_manifests[key],
dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(
asset_key_prefix=[TENANT, basename(dbt_resource.project_dir)],
source_asset_key_prefix=[TENANT]
),
)(dbt_asset_compute)
my_dbt_assets = with_resources(
[my_dbt_assets],
{"dbt": dbt_resource}
)
Yuval Goldberg
08/30/2023, 3:41 PMConflicting versions of resource with key 'dbt' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
FYI seems like I found a solution dagster yay
dbt_resources = {
f"dbt_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
project_dir=str(DBT_PROJECTS_DIR / dbt_project),
profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
profile=DBT_PROFILE,
target=DBT_TARGET,
) for dbt_project in DBT_PROJECTS
}
...
def dbt_asset_compute(context: OpExecutionContext, **kwargs):
dbt_key = next((k for k in kwargs.keys() if 'dbt_' in k), None)
dbt: DbtCliResource = kwargs[dbt_key]
yield from dbt.cli(["run"], context=context).stream()
dbt_asset_compute.__name__ = f"dbt_assets_{key}"
dbt_asset = dbt_assets(
manifest=_dbt_manifests[key],
)(dbt_asset_compute)
assets += with_resources([dbt_asset], {key: dbt_resource})
Yuval Goldberg
08/31/2023, 11:30 AMdef dbt_asset_compute(context: OpExecutionContext, **kwargs):
dbt_key = next((k for k in kwargs.keys() if 'dbt_' in k), None)
dbt: DbtCliResource = kwargs[dbt_key]
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run"], context=context).stream()
dbt_asset_compute_wrapper = partial(dbt_asset_compute, **{key: dbt_resource})
dbt_asset_compute_wrapper.__name__ = f"dbt_assets_{key}"
dbt_asset = dbt_assets(
manifest=_dbt_manifests[key],
)(dbt_asset_compute_wrapper)
assets.append(dbt_asset)
Sergio Pintaldi
09/28/2023, 10:15 AMSergio Pintaldi
10/18/2023, 10:38 AMdbt_resource
in **{key: dbt_resource}
the type DbtCliResource or the actual definition of DbtCliResource(....) as defined in the resources (Definitions(... resources={...}
)?Yuval Goldberg
10/18/2023, 11:13 AMdbt_resource
is an instance of type DbtCliResource
My definition looks like the following
dbt_resources = {
f"dbt_{TENANT}_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
project_dir=str(DBT_PROJECTS_DIR / dbt_project),
profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
profile=DBT_PROFILE,
target=DBT_TARGET,
)
for dbt_project in DBT_PROJECTS
}
...
defs = Definitions(
...
resources={**dbt_resources},
)
Sergio Pintaldi
10/19/2023, 1:05 AMdbt_asset_compute
factory function runs (so at Dagster initialisation), therefore I cannot change the resource configuration per job. Also the resource is not visible in the UI. Lastly I loose all the dbt models description, as the partial
overwrite them with its docstring. I am happy to call the argument for the dbt resources dbt
but then I want to be able to configure the resource for a bunch of selected dbt models