Hey there, dynamic dbt_asset question here :slight...
# integration-dbt
y
Hey there, dynamic dbt_asset question here 🙂 Using the
load_assets_from_dbt_manifest
API , we used to load multiple dbt projects like the following - parsing each project directory dynamically within the code by creating each a
manifest.json
file
Copy code
dbt_resources = {
    f"dbt_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
        project_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profile=DBT_PROFILE,
        target=DBT_TARGET,
    ) for dbt_project in DBT_PROJECTS
}

_dbt_manifests = {}
_dbt_resource_lock = threading.Lock()

def load_dbt_assets():
    assets = []

    for key, dbt_resource in dbt_resources.items():
        if key not in _dbt_manifests:
            matches = glob(f"{dbt_resource.project_dir}/target/**/manifest.json", recursive=True)
            if matches:
                _dbt_manifests[key] = Path(matches[0])
            else:
                with _dbt_resource_lock:
                    dbt_resource.cli(["deps"], manifest={}, raise_on_error=True).wait()
                    dbt_parse_invocation = dbt_resource.cli(["parse"], manifest={}, raise_on_error=True).wait()
                    _dbt_manifests[key] = dbt_parse_invocation.target_path.joinpath("manifest.json")

        assets.append(
            *load_assets_from_dbt_manifest(
                manifest=_dbt_manifests[key],
                use_build_command=False,  # TODO: Change to load seeds, run, and test
                dbt_resource_key=key
            )
        )

    return assets
It works fine, but dbt seed can't be running as part of the asset (
use_build_command
provides either running the
dbt run
or
dbt build
commands) I'd like to either run
dbt seed
or run avoid running
dbt test
I noticed that the newer API, via the
@dbt_asset
decorator I'll have more control over what's running for every asset. The question is - is there a way to build dbt assets dynamically with the
dbt_asset
decorator? so I'll have a dynamic amount of assets, depending on a
DBT_PROJECTS
env var. I wasn't able to find such documentation Thanks!
r
Here's a similar question, with an answer: https://dagster.slack.com/archives/C04CW71AGBW/p1692019552965309
y
Hey @rex thank you for the quick response 🙂 So IIUC I can do something like this
Copy code
def dbt_asset_compute(context: OpExecutionContext, dbt: DbtCliResource):
            yield from dbt.cli(["run"], context=context).stream()

        dbt_asset_compute.__name__ = f"dbt_assets_{key}"

        assets.append(dbt_assets(
            manifest=_dbt_manifests[key],
        )(dbt_asset_compute))
maybe I miss this part - how do I specify which io-resource to be used? notice that in the previous snippet I used
dbt_resource_key=key
to pass the predefined dbt resource is it possible not to use the default
dbt
resource? (so every project uses a different dbt resource)
r
Use
with_resources
? https://docs.dagster.io/concepts/resources
Copy code
dbt_assets(
    manifest=_dbt_manifests[key],
    dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(
        asset_key_prefix=[TENANT, basename(dbt_resource.project_dir)],
        source_asset_key_prefix=[TENANT]
    ),
)(dbt_asset_compute)
.with_resources({"dbt": dbt_resource})
y
I get the following error
Copy code
resource with key 'dbt' required by op 'dbt_assets_dbt_my_proj', but received <class 'dagster_dbt.core.resources_v2.DbtCliResource'>
Does the key has to be
dbt
? with the previous code I was able to define customized keys
r
The key doesn't have to be
dbt
. The key is the name of the
DbtCliResource
parameter in your
dbt_asset_compute
function. E.g. the name of the resource here would be
my_custom_dbt_name
.
Copy code
def dbt_asset_compute(context: OpExecutionContext, my_custom_dbt_name: DbtCliResource):
    ...
But since you're generating these compute functions dynamically, it would be a bit more involved to make the parameter name dynamic as well. Which is why I would recommend stick with
dbt
as the key and using
with_resources
. But if you want to have custom keys, then you should change the parameter name dynamically.
y
Ah I see. didn't know that 🙂 I don't care about the naming as long as the asset would use the proper resource and not a different one Do you understand why did I get the above error with your suggested code?
r
ah, use this:
Copy code
my_dbt_assets = dbt_assets(
    manifest=_dbt_manifests[key],
    dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(
        asset_key_prefix=[TENANT, basename(dbt_resource.project_dir)],
        source_asset_key_prefix=[TENANT]
    ),
)(dbt_asset_compute)

my_dbt_assets = with_resources(
    [my_dbt_assets],
    {"dbt": dbt_resource}
)
y
Thanks rex it seemed to resolve the previous issue but created a different one
Copy code
Conflicting versions of resource with key 'dbt' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
FYI seems like I found a solution dagster yay
Copy code
dbt_resources = {
    f"dbt_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
        project_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profile=DBT_PROFILE,
        target=DBT_TARGET,
    ) for dbt_project in DBT_PROJECTS
}

...

        def dbt_asset_compute(context: OpExecutionContext, **kwargs):
            dbt_key = next((k for k in kwargs.keys() if 'dbt_' in k), None)
            dbt: DbtCliResource = kwargs[dbt_key]
            yield from dbt.cli(["run"], context=context).stream()

        dbt_asset_compute.__name__ = f"dbt_assets_{key}"

        dbt_asset = dbt_assets(
            manifest=_dbt_manifests[key],
        )(dbt_asset_compute)
        assets += with_resources([dbt_asset], {key: dbt_resource})
For the record, the above code didn't work as well. although compilation succeeded - no resource was provided to the asset function (in kwargs) To resolve it, I eventually used the following code
Copy code
def dbt_asset_compute(context: OpExecutionContext, **kwargs):
            dbt_key = next((k for k in kwargs.keys() if 'dbt_' in k), None)
            dbt: DbtCliResource = kwargs[dbt_key]
            yield from dbt.cli(["seed"], context=context).stream()
            yield from dbt.cli(["run"], context=context).stream()

        dbt_asset_compute_wrapper = partial(dbt_asset_compute, **{key: dbt_resource})
        dbt_asset_compute_wrapper.__name__ = f"dbt_assets_{key}"

        dbt_asset = dbt_assets(
            manifest=_dbt_manifests[key],
        )(dbt_asset_compute_wrapper)
        assets.append(dbt_asset)
s
Yeah I tried that but did not work. I'll give it another go thx tho
@Yuval Goldberg is
dbt_resource
in
**{key: dbt_resource}
the type DbtCliResource or the actual definition of DbtCliResource(....) as defined in the resources (
Definitions(... resources={...}
)?
y
Hi @Sergio Pintaldi in my code
dbt_resource
is an instance of type
DbtCliResource
My definition looks like the following
Copy code
dbt_resources = {
    f"dbt_{TENANT}_{(DBT_PROJECTS_DIR / dbt_project).name}": DbtCliResource(
        project_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profiles_dir=str(DBT_PROJECTS_DIR / dbt_project),
        profile=DBT_PROFILE,
        target=DBT_TARGET,
    )
    for dbt_project in DBT_PROJECTS
} 

...

defs = Definitions(
...
    resources={**dbt_resources},
)
🙏 1
s
Thanks for this. It worked but there are few drawbacks, such as the resource is initialised when the
dbt_asset_compute
factory function runs (so at Dagster initialisation), therefore I cannot change the resource configuration per job. Also the resource is not visible in the UI. Lastly I loose all the dbt models description, as the
partial
overwrite them with its docstring. I am happy to call the argument for the dbt resources
dbt
but then I want to be able to configure the resource for a bunch of selected dbt models