Selecting upstream assets with a dbt job using the...
# integration-dbt
t
Selecting upstream assets with a dbt job using the new API
🤖 1
Hi, Two questions relating to the new DBT API. QUESTION 1 The dbt tutorial using the new DBT API states that if we want to run the dbt assets + the upstream assets that we do this from the
default
asset group. However, this is obviously a contrived example and doesn’t really apply to the “real world” where we will likely have multiple dbt jobs. So the question is, how do we select specific dbt assets + other assets and combine them into a job? For example, we can easily select all of the required dbt assets using a tag, but how can we then additively include other assets e.g. using a group?
Copy code
dbt_assets = build_dbt_asset_selection([my_dbt_assets], dbt_select="tag:dbt_group_1")
other_assets_we_want_to_run_in_the_job = AssetSelection.groups("dbt_group_1")

define_asset_job(
    name="dbt_group_1",
    selection=dbt_assets # This will work fine on its own
    selection=other_assets... # This will work fine on its own
    selection=dbt_assets + other_assets # How do we combine them? 
)
In the last line above, doing a “dumb” addition (
+
) will yield an error. And I can’t see how we can coerce these two types into a common sequence.
Copy code
TypeError: unsupported operand type(s) for +: 'KeysAssetSelection' and 'GroupsAssetSelection'
QUESTION 2 Before updating to the new API, the dbt + Fivetran assets were mapped as dependencies with the asset name and prefix. However, this is now broken. How do we set the upstream Fivetran assets to appear in a single job so that the Fivetran + dbt assets can be materialized in the same job?
r
Re: question 1 — https://docs.dagster.io/_apidocs/assets#dagster.AssetSelection
You can use the “|”, “&”, and “-” operators to create unions, intersections, and differences of asset selections, respectively.
So you’ll want
dbt_assets | other_assets
🙌 1
Re question 2 — you have to ensure that the asset keys for your fivetran assets match the asset keys for your dbt sources. Have you seen https://docs.dagster.io/integrations/dbt/reference#upstream-dependencies?
t
Oh I totally missed that… Thanks @rex. I’m not sure I like the idea of having to set these dependencies in the actual
dbt
repo/code. I feel like this logic should live in Dagster. But that’s just an initial thought / feeling
r
Got it. If you don’t want to set it in your dbt project, then you can set it using the `DbtTranslator`: https://docs.dagster.io/integrations/dbt/reference#customizing-asset-keys
t
Fancy! Thanks, Rex.
r
Similar to https://dagster.slack.com/archives/C04CW71AGBW/p1692019584171889?thread_ts=1692017983.884979&cid=C04CW71AGBW: You would need to parse
dbt_resource_props
so that only your fivetran sources are selected
t
Hi @rex, On the related vein of above, is there an example available for overwriting the dbt auto-materialization meta data? The example from the docs (replicated below) isn’t very descriptive.
Copy code
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_metadata(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Mapping[str, Any]:
        return {
            "dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta", {}))
        }
Essentially, I am trying to override the auto mat policies to do this on a per dbt tag basis. I’ve tried almost every combination known to man as the return value / dict, but I cannot get it to work. Below is an example of what I am trying to achieve:
Copy code
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        if "my-dbt-tag" in dbt_resource_props["config"]["tags"]:
            logger.warn("Implementing custom user_data metadata")  # THIS LOGS, SO A GOOD START
            meta = {
                "auto_materialize_policy": {"type": "eager"},
                "freshness_policy": {
                    "maximum_lag_minutes": 60,
                    "cron_schedule": "0 * * * *",
                    "cron_schedule_timezone": "UTC",
                },
            }
            return {
                "dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta", meta))
            }
        return super().get_metadata(dbt_resource_props)
I’ve tried every combination I can think of for the return value (vanilla dict, the above etc), but nothing works (and I suspect this returned value is the issue). FYI, if I add the auto-mat policies to the dbt schema yaml file, then the auto-mat kicks in. But when I remove it and try to use this custom translator, I can’t get it to work. Any pointers?
r
get_metadata
affects the metadata of the AssetsDefinition. It’s similar to https://docs.dagster.io/concepts/ops-jobs-graphs/metadata-tags. Dagster metadata is for display purposes in the UI. It doesn’t override the dbt metadata.
What you are looking for is something like
get_freshness_policy
or
get_automaterialize_policy
as methods on
DagsterDbtTranslator
. This is not implemented yet. Feel free to add a feature request
t
Ah ok. Thanks @rex . I misinterpreted the dbt metadata for the dragster metadata. Hmmm, this will require some extra work then, as I’m aiming for a per-tag materialization policy. But this would require restructuring our debt schema files. So it’d be much better if it were a feature in dagster. I’ll raise this as a request. In the meantime any suggestions for how I could potentially handle this in dagster? I mean, I could define a macro in dbt and apply this to each model impacted. But that’s very fragile. Otherwise refactoring the schema files is the only thing that springs to mind.
r
t
Cheers đź‘Ť
r
Let me know when you’ve filed the feature request — can probably get to it this week so that it’s in the release
t
Morning @rex, I’ve filed the feature request. https://github.com/dagster-io/dagster/issues/15846
r
t
Absolutely legendary @rex !