https://dagster.io/ logo
Title
d

Dennis Gera

04/17/2023, 12:23 PM
Hey team! is there a way to
define
an asset as a subset of a dbt loaded from manifest asset?
o

owen

04/17/2023, 9:35 PM
hi @Dennis Gera! not 100% sure I'm following -- do you mean that you'd want multiple dbt models to correspond to a single dbt asset? If so, this is not supported -- mind sharing a bit more about the use case?
d

Dennis Gera

04/17/2023, 10:23 PM
Sure! Currently our dbt project loads dbt assets using the
load_assets_from_dbt_project
. Initially, we did not know the best way to use these assets, so we ended up defining multiple dbt assets using logic like:
example_dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_DIR,
    profiles_dir=DBT_PROJECT_DIR + "/profile",
    select="tag:example",
    use_build_command=True,
    key_prefix="analytics",
    source_key_prefix=["analytics", "example"],
    node_info_to_group_fn=lambda _: "example",
)
Consequently, we had a bunch of asset groups defined by dbt’s
tag
select statement. We then proceed to define jobs like this
example_assets_job = define_asset_job(
    name="example_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("example"),
)
Since our project has grown significantly, we are migrating to use the
load_assets_from_dbt_manifest
function. I also wanted to improve the way we define our assets so that we wouldn’t have to keep loading assets from manifest in various parts of our code. Ideally, I want to load all of our dbt assets under
dbt_assets = load_assets_from_dbt_manifest(...)
and then define assets from that load. I tried using a
node_info_to_group_fn
that would select the schema or the node’s first tag as a grouping criteria, but this is not the same as defining an asset with the select
tag
statement (using the model’s tag as a group_fn doesn’t guarantee which tag it’s grouping to as the one I want it to if the model has many tags). I’m wondering if this approach is feasible/recommended or if I’m better off just defining multiple
load_assets_from_dbt_manifest
in my project.
:rainbow-daggy: 1
o

owen

04/17/2023, 10:27 PM
ah I see -- I think node_info_to_group_fn + a single load_assets_from_dbt_manifest is the right direction here. it seems like you can get by with maybe a slightly more complex node_info_to_group_fn, i.e.
def my_group_fn(node_info):
    # if the model has a tag that I want to associate with a group, use that
    for tag in ["tags", "that", "are", "groups"]:
        if tag in node_info["tags"]:
            return tag
    # otherwise, use schema
    return node_info["schema"]

dbt_assets = load_assets_from_dbt_manifest(..., node_info_to_group_fn=my_group_fn)
d

Dennis Gera

04/17/2023, 10:31 PM
This could work, but if I have a model that has more than 1 “groupable” tag (ex: “staging”, “operations”), I wouldn’t have that model selected in both
stg_assets_job = define_asset_job(
    name="stg_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("staging"),
)
and
operations_assets_job = define_asset_job(
    name="operations_assets_job",
    executor_def=in_process_executor,
    selection=AssetSelection.groups("operations"),
)
o

owen

04/17/2023, 10:32 PM
ah, groups <> assets are a 1-1 mapping, and so having an asset appear in multiple distinct groups is unsupported
essentially, dagster does not currently have any concept that maps on to the dbt "tags" concept
if you want to select assets based off of dbt tag information, you can just directly use the DbtManifestAssetSelection, which lets you directly use dbt syntax, rather than having to translate dbt concepts into Dagster land 🙂
d

Dennis Gera

04/17/2023, 10:35 PM
haha But how is that different from using multiple
load_assets_from_dbt_manifest
?
o

owen

04/17/2023, 10:36 PM
with multiple calls to load_assets_from_dbt_manifest, you run the risk of accidentally selecting overlapping subsets of your dbt graph in subsequent calls, which would likely result in errors. I.e. if you have an asset with tags "foo" and "bar", calling
load_assets_from_dbt_manifest(select="tag:foo")
and
load_assets_from_dbt_manifest(select="tag:bar")
, then that model would be double-represented, which should cause an error when building your repository
by loading everything at once, you eliminate that risk (and make it possible to handle models with multiple tags that have semantic meaning in the orchestration layer)
d

Dennis Gera

04/17/2023, 10:38 PM
so
DbtManifestAssetSelection
doesn’t load the assets from my manifest file, it just selects based on it while
load_assets_from_dbt_manifest
loads everything once?
So if I understood correctly, our best approach would be to have a single
dbt_assets = load_assets_from_dbt_manifest(…)
with multiple
DbtManifestAssetSelection
o

owen

04/17/2023, 10:40 PM
yep exactly --
load_assets_from..
defines the assets that you want to exist ("I want to create assets for every dbt model with the tag 'bar', and put them in a group called 'bar'"), whereas the DbtManifestSelection selection says "of the assets that exist, I want the ones that correspond to dbt models with the tag 'bar'"
d

Dennis Gera

04/17/2023, 10:41 PM
Awesome Owen! Thanks for the huge help here
o

owen

04/17/2023, 10:41 PM
no problem :blob_salute:
d

Dennis Gera

04/18/2023, 2:51 PM
Hey @owen! I used
DbtManifestAssetSelection
to select my dbt assets
example_dbt_assets = DbtManifestAssetSelection(
    manifest_json=json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    select="tag:example,tag:sources",
)
and then created a job using
example_assets_job = define_asset_job(
    name="example_assets_job",
    executor_def=in_process_executor,
    selection=example_dbt_assets,
)
But in the UI I’m seeing an empty job pipeline. Any idea on why this is? cc: @Tim Castillo
I’ve defined my dbt_assets with load_from manifest in my
assets/__init__.py
file and each
DbtManifestAssetSelection
is in a
assets/example/dbt.py
file. The jobs are then defined outside my
assets
folder. Not sure if this influences in anything
o

owen

04/18/2023, 5:30 PM
ah I think this is happening on account of a bit of subtlety in the dbt syntax. The
,
indicates the instersection between the two clauses, meaning
"tag:example,tag:sources"
means "give me the intersection between the set of models with tag "example" and the set of models with tag "sources". If no model has both tags, this set will be empty. I think you want
"tag:example tag:sources"
, which should give you the union between those sets
d

Dennis Gera

04/18/2023, 5:31 PM
I have other instances where I just have a
tag:example
and I got the same thing :blob_sad:
o

owen

04/18/2023, 5:34 PM
hmm interesting -- how many of your models have this example tag set?
d

Dennis Gera

04/18/2023, 5:37 PM
it varies depending on the specific tag, some are just 3, others more than 10
o

owen

04/18/2023, 5:38 PM
gotcha -- and can you share the code snippet where you load the dbt assets?
d

Dennis Gera

04/18/2023, 5:39 PM
dbt_assets = load_assets_from_dbt_manifest(
    json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    io_manager_key="io_manager",
    key_prefix=["analytics"],
    source_key_prefix=["analytics"],
    node_info_to_group_fn=node_info_to_group_fn,
)
I know the dbt assets loaded because they appear in my UI asset groups
o

owen

04/18/2023, 5:47 PM
ahh I think I get what's going on. the dbt manifest asset selection is trying to convert its dbt selection to dagster asset keys, but it's missing the context that there's supposed to be a key prefix there. this is something that should be supported better, but the hacky fix would be
from dagster_dbt.asset_utils import default_asset_key_fn

def hacky_asset_key_fn(node_info):
    orig_asset_key = default_asset_key_fn(node_info)
    return AssetKey(["analytics"] + orig_asset_key.path)

example_dbt_assets = DbtManifestAssetSelection(
    manifest_json=json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
    select="tag:example,tag:sources",
    node_info_to_asset_key=hacky_asset_key_fn
)
d

Dennis Gera

04/18/2023, 5:50 PM
nicee! That worked. Thanks again Owen!
:rainbow-daggy: 1
o

owen

04/18/2023, 6:13 PM
great!
d

Dennis Gera

05/08/2023, 4:37 PM
hey @owen! by using the
load_assets_from_dbt_manifest
and several
DbtManifestAssetSelection
, I was able to successfully load the dagster repository and run jobs locally. However, when merged to production, I started getting the following error in job runs. All dbt assets appear to have loaded properly, buy this issue appeared specifically when a job was ran. Also, I was not able to reproduce this error when running this job locally. Any ideas on what could be the issue here?
o

owen

05/08/2023, 7:30 PM
hi! it looks like the
manifest.json
file is not a part of your docker image -- generally, we recommend putting a
dbt compile
inside your dagster_cloud_post_install.sh script so that your docker image will contain an up-to-date copy of your manifest.json
d

Dennis Gera

05/09/2023, 12:35 PM
Thanks @owen ! But is it possible to have my
manifest.json
read from S3 instead of my docker image? cc: @Gabriel Montañola
o

owen

05/09/2023, 6:23 PM
the recommended way to accomplish this would be to copy the file from s3 into your docker image when building it -- you could imagine putting some code to automatically pull that file from s3 to your local system, but the problem with that approach is that all subprocesses (aka every op) will need to run this same code to get the most up-to-date version of your manifest.json file, even if they have nothing to do with dbt. this also makes it possible for subprocesses to be out of sync in terms of what manifest version they're working with, which can cause strange errors which are difficult to debug
:keanu-thanks: 1
❤️ 1
g

Gabriel Montañola

05/12/2023, 2:09 PM
thanks a lot @owen. that makes total sense and we will incorporate this in our cd pipeline!
:rainbow-daggy: 1
d

Dennis Gera

05/15/2023, 9:29 PM
Hey @owen, we're still getting this error message when running dbt-related jobs. We have copied the
manifest.json
to our docker image in the dbt directory -> dbt/manifest.json.
g

Gabriel Montañola

05/15/2023, 9:36 PM
FYI - we're using define_asset_job() to create the jobs.
o

owen

05/15/2023, 9:40 PM
hm in theory, this error message would go away if you instead copied things into
.../dbt/target/manifest.json
instead of
/dbt/manifest.json
.
g

Gabriel Montañola

05/15/2023, 9:40 PM
yeap, ill move the file!
o

owen

05/15/2023, 9:41 PM
copying it into this precise path ideally shouldn't be necessary, but I think I see why it's working that way at the moment
:dagster: 1