Todd de Quincey
08/14/2023, 1:27 PMTodd de Quincey
08/14/2023, 1:27 PMdefault
asset group. However, this is obviously a contrived example and doesn’t really apply to the “real world” where we will likely have multiple dbt jobs.
So the question is, how do we select specific dbt assets + other assets and combine them into a job?
For example, we can easily select all of the required dbt assets using a tag, but how can we then additively include other assets e.g. using a group?
dbt_assets = build_dbt_asset_selection([my_dbt_assets], dbt_select="tag:dbt_group_1")
other_assets_we_want_to_run_in_the_job = AssetSelection.groups("dbt_group_1")
define_asset_job(
name="dbt_group_1",
selection=dbt_assets # This will work fine on its own
selection=other_assets... # This will work fine on its own
selection=dbt_assets + other_assets # How do we combine them?
)
In the last line above, doing a “dumb” addition (+
) will yield an error. And I can’t see how we can coerce these two types into a common sequence.
TypeError: unsupported operand type(s) for +: 'KeysAssetSelection' and 'GroupsAssetSelection'
Todd de Quincey
08/14/2023, 1:29 PMrex
08/14/2023, 1:36 PMYou can use the “|”, “&”, and “-” operators to create unions, intersections, and differences of asset selections, respectively.So you’ll want
dbt_assets | other_assets
rex
08/14/2023, 1:37 PMTodd de Quincey
08/14/2023, 1:39 PMdbt
repo/code. I feel like this logic should live in Dagster. But that’s just an initial thought / feelingrex
08/14/2023, 1:41 PMTodd de Quincey
08/14/2023, 1:42 PMrex
08/14/2023, 1:42 PMdbt_resource_props
so that only your fivetran sources are selectedTodd de Quincey
08/14/2023, 4:23 PMclass CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_metadata(
self, dbt_resource_props: Mapping[str, Any]
) -> Mapping[str, Any]:
return {
"dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta", {}))
}
Essentially, I am trying to override the auto mat policies to do this on a per dbt tag basis. I’ve tried almost every combination known to man as the return value / dict, but I cannot get it to work.
Below is an example of what I am trying to achieve:
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
if "my-dbt-tag" in dbt_resource_props["config"]["tags"]:
logger.warn("Implementing custom user_data metadata") # THIS LOGS, SO A GOOD START
meta = {
"auto_materialize_policy": {"type": "eager"},
"freshness_policy": {
"maximum_lag_minutes": 60,
"cron_schedule": "0 * * * *",
"cron_schedule_timezone": "UTC",
},
}
return {
"dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta", meta))
}
return super().get_metadata(dbt_resource_props)
I’ve tried every combination I can think of for the return value (vanilla dict, the above etc), but nothing works (and I suspect this returned value is the issue).
FYI, if I add the auto-mat policies to the dbt schema yaml file, then the auto-mat kicks in. But when I remove it and try to use this custom translator, I can’t get it to work.
Any pointers?rex
08/14/2023, 4:26 PMget_metadata
affects the metadata of the AssetsDefinition. It’s similar to https://docs.dagster.io/concepts/ops-jobs-graphs/metadata-tags.
Dagster metadata is for display purposes in the UI. It doesn’t override the dbt metadata.rex
08/14/2023, 4:27 PMget_freshness_policy
or get_automaterialize_policy
as methods on DagsterDbtTranslator
. This is not implemented yet. Feel free to add a feature requestTodd de Quincey
08/14/2023, 4:46 PMrex
08/14/2023, 4:47 PMTodd de Quincey
08/14/2023, 4:48 PMrex
08/14/2023, 4:49 PMTodd de Quincey
08/15/2023, 8:24 AMrex
08/16/2023, 2:09 AMTodd de Quincey
08/16/2023, 3:38 AM