Hey team! is there a way to `define` an asset as a...
# integration-dbt
d
Hey team! is there a way to
define
an asset as a subset of a dbt loaded from manifest asset?
o
hi @Dennis Gera! not 100% sure I'm following -- do you mean that you'd want multiple dbt models to correspond to a single dbt asset? If so, this is not supported -- mind sharing a bit more about the use case?
d
Sure! Currently our dbt project loads dbt assets using the
load_assets_from_dbt_project
. Initially, we did not know the best way to use these assets, so we ended up defining multiple dbt assets using logic like:
Copy code
example_dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_DIR,
    profiles_dir=DBT_PROJECT_DIR + "/profile",
    select="tag:example",
    use_build_command=True,
    key_prefix="analytics",
    source_key_prefix=["analytics", "example"],
    node_info_to_group_fn=lambda _: "example",
)
Consequently, we had a bunch of asset groups defined by dbt’s
tag
select statement. We then proceed to define jobs like this
Copy code
example_assets_job = define_asset_job(
    name="example_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("example"),
)
Since our project has grown significantly, we are migrating to use the
load_assets_from_dbt_manifest
function. I also wanted to improve the way we define our assets so that we wouldn’t have to keep loading assets from manifest in various parts of our code. Ideally, I want to load all of our dbt assets under
dbt_assets = load_assets_from_dbt_manifest(...)
and then define assets from that load. I tried using a
node_info_to_group_fn
that would select the schema or the node’s first tag as a grouping criteria, but this is not the same as defining an asset with the select
tag
statement (using the model’s tag as a group_fn doesn’t guarantee which tag it’s grouping to as the one I want it to if the model has many tags). I’m wondering if this approach is feasible/recommended or if I’m better off just defining multiple
load_assets_from_dbt_manifest
in my project.
🌈 1
o
ah I see -- I think node_info_to_group_fn + a single load_assets_from_dbt_manifest is the right direction here. it seems like you can get by with maybe a slightly more complex node_info_to_group_fn, i.e.
Copy code
def my_group_fn(node_info):
    # if the model has a tag that I want to associate with a group, use that
    for tag in ["tags", "that", "are", "groups"]:
        if tag in node_info["tags"]:
            return tag
    # otherwise, use schema
    return node_info["schema"]

dbt_assets = load_assets_from_dbt_manifest(..., node_info_to_group_fn=my_group_fn)
d
This could work, but if I have a model that has more than 1 “groupable” tag (ex: “staging”, “operations”), I wouldn’t have that model selected in both
Copy code
stg_assets_job = define_asset_job(
    name="stg_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("staging"),
)
and
Copy code
operations_assets_job = define_asset_job(
    name="operations_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("operations"),
)
o
ah, groups <> assets are a 1-1 mapping, and so having an asset appear in multiple distinct groups is unsupported
essentially, dagster does not currently have any concept that maps on to the dbt "tags" concept
if you want to select assets based off of dbt tag information, you can just directly use the DbtManifestAssetSelection, which lets you directly use dbt syntax, rather than having to translate dbt concepts into Dagster land 🙂
d
haha But how is that different from using multiple
load_assets_from_dbt_manifest
?
o
with multiple calls to load_assets_from_dbt_manifest, you run the risk of accidentally selecting overlapping subsets of your dbt graph in subsequent calls, which would likely result in errors. I.e. if you have an asset with tags "foo" and "bar", calling
load_assets_from_dbt_manifest(select="tag:foo")
and
load_assets_from_dbt_manifest(select="tag:bar")
, then that model would be double-represented, which should cause an error when building your repository
by loading everything at once, you eliminate that risk (and make it possible to handle models with multiple tags that have semantic meaning in the orchestration layer)
d
so
DbtManifestAssetSelection
doesn’t load the assets from my manifest file, it just selects based on it while
load_assets_from_dbt_manifest
loads everything once?
So if I understood correctly, our best approach would be to have a single
dbt_assets = load_assets_from_dbt_manifest(…)
with multiple
DbtManifestAssetSelection
o
yep exactly --
load_assets_from..
defines the assets that you want to exist ("I want to create assets for every dbt model with the tag 'bar', and put them in a group called 'bar'"), whereas the DbtManifestSelection selection says "of the assets that exist, I want the ones that correspond to dbt models with the tag 'bar'"
d
Awesome Owen! Thanks for the huge help here
o
no problem blob salute
d
Hey @owen! I used
DbtManifestAssetSelection
to select my dbt assets
Copy code
example_dbt_assets = DbtManifestAssetSelection(
    manifest_json=json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    select="tag:example,tag:sources",
)
and then created a job using
Copy code
example_assets_job = define_asset_job(
    name="example_assets_job",
    executor_def=in_process_executor,
    selection=example_dbt_assets,
)
But in the UI I’m seeing an empty job pipeline. Any idea on why this is? cc: @Tim Castillo
I’ve defined my dbt_assets with load_from manifest in my
assets/__init__.py
file and each
DbtManifestAssetSelection
is in a
assets/example/dbt.py
file. The jobs are then defined outside my
assets
folder. Not sure if this influences in anything
o
ah I think this is happening on account of a bit of subtlety in the dbt syntax. The
,
indicates the instersection between the two clauses, meaning
"tag:example,tag:sources"
means "give me the intersection between the set of models with tag "example" and the set of models with tag "sources". If no model has both tags, this set will be empty. I think you want
"tag:example tag:sources"
, which should give you the union between those sets
d
I have other instances where I just have a
tag:example
and I got the same thing blob sad
o
hmm interesting -- how many of your models have this example tag set?
d
it varies depending on the specific tag, some are just 3, others more than 10
o
gotcha -- and can you share the code snippet where you load the dbt assets?
d
Copy code
dbt_assets = load_assets_from_dbt_manifest(
    json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    io_manager_key="io_manager",
    key_prefix=["analytics"],
    source_key_prefix=["analytics"],
    node_info_to_group_fn=node_info_to_group_fn,
)
I know the dbt assets loaded because they appear in my UI asset groups
o
ahh I think I get what's going on. the dbt manifest asset selection is trying to convert its dbt selection to dagster asset keys, but it's missing the context that there's supposed to be a key prefix there. this is something that should be supported better, but the hacky fix would be
Copy code
from dagster_dbt.asset_utils import default_asset_key_fn

def hacky_asset_key_fn(node_info):
    orig_asset_key = default_asset_key_fn(node_info)
    return AssetKey(["analytics"] + orig_asset_key.path)

example_dbt_assets = DbtManifestAssetSelection(
    manifest_json=json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    select="tag:example,tag:sources",
    node_info_to_asset_key=hacky_asset_key_fn
)
d
nicee! That worked. Thanks again Owen!
🌈 1
o
great!
d
hey @owen! by using the
load_assets_from_dbt_manifest
and several
DbtManifestAssetSelection
, I was able to successfully load the dagster repository and run jobs locally. However, when merged to production, I started getting the following error in job runs. All dbt assets appear to have loaded properly, buy this issue appeared specifically when a job was ran. Also, I was not able to reproduce this error when running this job locally. Any ideas on what could be the issue here?
o
hi! it looks like the
manifest.json
file is not a part of your docker image -- generally, we recommend putting a
dbt compile
inside your dagster_cloud_post_install.sh script so that your docker image will contain an up-to-date copy of your manifest.json
d
Thanks @owen ! But is it possible to have my
manifest.json
read from S3 instead of my docker image? cc: @Gabriel Montañola
o
the recommended way to accomplish this would be to copy the file from s3 into your docker image when building it -- you could imagine putting some code to automatically pull that file from s3 to your local system, but the problem with that approach is that all subprocesses (aka every op) will need to run this same code to get the most up-to-date version of your manifest.json file, even if they have nothing to do with dbt. this also makes it possible for subprocesses to be out of sync in terms of what manifest version they're working with, which can cause strange errors which are difficult to debug
keanu thanks 1
❤️ 1
g
thanks a lot @owen. that makes total sense and we will incorporate this in our cd pipeline!
🌈 1
d
Hey @owen, we're still getting this error message when running dbt-related jobs. We have copied the
manifest.json
to our docker image in the dbt directory -> dbt/manifest.json.
g
FYI - we're using define_asset_job() to create the jobs.
o
hm in theory, this error message would go away if you instead copied things into
.../dbt/target/manifest.json
instead of
/dbt/manifest.json
.
g
yeap, ill move the file!
o
copying it into this precise path ideally shouldn't be necessary, but I think I see why it's working that way at the moment
D 1