Hello all, I was hoping for some guidance. I am t...
# integration-dbt
e
Hello all, I was hoping for some guidance. I am trying to POC dagster as a way to orchestrate multiple disparate dbt projects together which have dependencies between them. I have loaded both projects as dbt resource IDs, and I can materialize different groups, or selections independently, but I have not been able to take build an asset or graph object that can tie two simple dbt object definitions together. The 'ins' of the asset decorator does not appear to recognize a dbt asset definition. An Op doesn't make sense to use since I already have (hundreds) of the dbt assets defined, with proper prefix keys and groups. How does one build a graph of disparate dbt objects with properly defined dependencies?
s
following
p
Hey @Eric Coleman this looks similar to what I was asking about here, hope it helps! I was able to get my 2 dbt projects to have dependency links between them in Dagster by setting up source assets. It's completely based on the keys aligning between the dbt projects which can be annoying but is all we have as far as I know.
D 1
A way to be more flexible than strictly aligning the keys may be to alter the example provided here but I haven't played around with it.
e
Thank you for the quick reply Paul. The link you provided seems to go to your profile? I would be very interested in getting to that example.
e
Thank you Paul, this looks very close to our use case. We have different projects that operate as layers in our warehouse.
p
We should collaborate as we go down this journey! Let me know if you come up with anything.
I'll do the same of course
e
Another idea I had was just manually build a graph object of dbt asset collections. https://docs.dagster.io/concepts/assets/graph-backed-assets I am just not sure how to define the graph object as there are very few examples I could find. The dbt assets loaded from the project do not have functions defined.
r
Jonathan Neo has an example of this in their project: https://github.com/jonathanneo/data-aware-orchestration. You define sources in your dbt projects so that models in
dbt_project_1
are defined as sources in
dbt_project_2
. Furthermore, you should ensure that the asset keys for the models in
dbt_project_1
match the asset keys defined for the sources in
dbt_project_2
.
D 1
j
Thanks for mentioning @rex. Yep, the easiest way to set it up is what @Paul Burns mentioned above with setting up your dbt sources to reference another project. My toy project repo illustrates and example where Project 2 consumes a model from Project 1 via dbt sources. Here are the key call outs on how to set this up: 1. Code to import dbt_project _1 as dagster assets 2. Code to import dbt_project _2 as dagster assets 3. Important: Project 2's sources.yml file references the database schema in Project 1 a. The name in the project 2's sources (i.e.
mart
) has to match exactly the name used in the project 1's dbt_project.yml (mart) b. This is so that dagster is able to infer the dependencies between projects. There are other ways to workaround this, but I won't get into it here.
p
@Jonathan Neo Could you get into workarounds actually? We are in our design phase of using Dagster and I would love to hear what other options are out there. I've seen a "dbt translator" but haven't heard of anything else
j
Hey @Paul Burns 👋 we’re in the early phases too of dagster too (just did a hackathon on it last week at my company). Yep I saw the
dagster_dbt_translator
seems to be it, although I haven’t had time to try it out yet! Here’s what I can tell so far: • The default source to asset_key mapping is performed through a method called
default_asset_key_fn
here. • There is a class called
DagsterDbtTranslator
that implements a method called get_asset_key(). By default,
get_asset_key()
calls the
default_asset_key_fn
. • However, you can overwrite the default behaviour by creating a custom class and overwriting the existing
get_asset_key()
method. There’s an incomplete example of how to do it here. I’d imagine a full complete example of the last point above would look something like:
Copy code
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_asset_key(cls, node_info: Mapping[str, Any]) -> AssetKey:
        dagster_metadata = node_info.get("meta", {}).get("dagster", {})
        asset_key_config = dagster_metadata.get("asset_key", [])
        if asset_key_config:
            return AssetKey(asset_key_config)

        if node_info["resource_type"] == "source":
            components = ["CUSTOMIZE YOUR KEY HERE" , node_info["source_name"], node_info["name"]] # CUSTOMIZE YOUR ASSET KEY HERE FOR DBT SOURCES
        else:
            configured_schema = node_info["config"].get("schema")
            if configured_schema is not None:
                components = [configured_schema, node_info["name"]]
            else:
                components = [node_info["name"]]
        return AssetKey(components)

@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
p
Thanks a ton!
D 1
e
Another challenge I have with this pattern is we have Python scripts that populate hundreds of tables in our landing zone using parallel thread executors against an API. How would I define 150+ source table assets that become the source for the first layer of our DBT project transformations? It seems like I might need an abstraction layer at the execution layer to define my assets, rather than the actual database objects themselves.
j
@Eric Coleman you can use dbt-codgen to generate a
sources.yml
for the 150+ existing sources in your database. Generating it is as simple as running this in your command line:
Copy code
dbt run-operation generate_source --args 'schema_name: raw_jaffle_shop'
e
Thank you Jonathan, that is a very helpful tip. We do have all the sources defined already in our dbt project. My challenge is how to add the upstream Python script as a dependency. It is not one script that generates a single output. Based on the source, a single ingestion execution can update 220+ tables. Which would need to be defined as separate functions in an op?
t
Hello all, I just wanted to let you know that we implemented something similar (multiple code location with a dbt project per code location). We used what @Jonathan Neo described above using a custom
DagsterDbtTranslator
to map the asset keys between dbt project. @Eric Coleman we also have upstream assets (python script that invokes an API) to our dbt assets. In our case we build those assets based on yaml config file using the asset factory pattern. Those assets are @multi-asset so one call to the API generates multiple assets materialization. As long as your keys match, Dagster will infer the dependencies between those dependencies.
D 1
j
That’s cool @Timothee Vandeput! I didn’t know about the
@multi-asset
decorator! Was sitting here thinking to myself, “does dagster even have an interface for 1 function producing many dagster assets?”
t
One thing to keep in mind, you won't be able to define jobs accross code locations. In our case, we used run sensors to cascade jobs accross code locations.
That’s cool @Timothee Vandeput! I didn’t know about the
@multi-asset
decorator! Was sitting here thinking to myself, “does dagster even have an interface for 1 function producing many dagster assets?”
It does have fan out 🙂 And we also use the inputs on assets to do fan in.
1
e
Great suggestions. I am digging into these now. Thank you!
p
Fantastic thread
e
Quick update, with the @multi_asset decorator, I was able to create multiple table outputs to feed one executable slice through the source ingestion and several dbt projects. By aligning the keys between the source names and model names, the key-chaining process seems to work. I have not implemented the actual code yet in the @multi_asset , which isn't necessary to instantiate the asset and execute the function. By assigning the @multi_asset asset keys to the correct group, and assigning the dbt models to the correct group in the project.yml file, I was able to use the AssetSelect.group() to create the definition of the job. This pattern seems to work, and is reasonably scalable. Thanks to all!
👌 2