Eric Coleman
07/24/2023, 6:12 PMSung Won Chung
07/24/2023, 6:26 PMPaul Burns
07/24/2023, 6:49 PMPaul Burns
07/24/2023, 6:51 PMEric Coleman
07/24/2023, 6:57 PMPaul Burns
07/24/2023, 7:01 PMEric Coleman
07/24/2023, 7:05 PMPaul Burns
07/24/2023, 7:49 PMPaul Burns
07/24/2023, 7:49 PMEric Coleman
07/24/2023, 8:01 PMrex
07/24/2023, 10:47 PMdbt_project_1
are defined as sources in dbt_project_2
. Furthermore, you should ensure that the asset keys for the models in dbt_project_1
match the asset keys defined for the sources in dbt_project_2
.Jonathan Neo
07/25/2023, 8:28 AMmart
) has to match exactly the name used in the project 1's dbt_project.yml (mart)
b. This is so that dagster is able to infer the dependencies between projects. There are other ways to workaround this, but I won't get into it here.Paul Burns
07/25/2023, 1:10 PMJonathan Neo
07/25/2023, 1:30 PMdagster_dbt_translator
seems to be it, although I haven’t had time to try it out yet!
Here’s what I can tell so far:
• The default source to asset_key mapping is performed through a method called default_asset_key_fn
here.
• There is a class called DagsterDbtTranslator
that implements a method called get_asset_key(). By default, get_asset_key()
calls the default_asset_key_fn
.
• However, you can overwrite the default behaviour by creating a custom class and overwriting the existing get_asset_key()
method. There’s an incomplete example of how to do it here.
I’d imagine a full complete example of the last point above would look something like:
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, node_info: Mapping[str, Any]) -> AssetKey:
dagster_metadata = node_info.get("meta", {}).get("dagster", {})
asset_key_config = dagster_metadata.get("asset_key", [])
if asset_key_config:
return AssetKey(asset_key_config)
if node_info["resource_type"] == "source":
components = ["CUSTOMIZE YOUR KEY HERE" , node_info["source_name"], node_info["name"]] # CUSTOMIZE YOUR ASSET KEY HERE FOR DBT SOURCES
else:
configured_schema = node_info["config"].get("schema")
if configured_schema is not None:
components = [configured_schema, node_info["name"]]
else:
components = [node_info["name"]]
return AssetKey(components)
@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
Paul Burns
07/25/2023, 1:37 PMEric Coleman
07/25/2023, 1:38 PMJonathan Neo
07/25/2023, 1:40 PMsources.yml
for the 150+ existing sources in your database.
Generating it is as simple as running this in your command line:
dbt run-operation generate_source --args 'schema_name: raw_jaffle_shop'
Eric Coleman
07/25/2023, 2:03 PMTimothee Vandeput
07/25/2023, 2:05 PMDagsterDbtTranslator
to map the asset keys between dbt project.
@Eric Coleman we also have upstream assets (python script that invokes an API) to our dbt assets. In our case we build those assets based on yaml config file using the asset factory pattern. Those assets are @multi-asset so one call to the API generates multiple assets materialization. As long as your keys match, Dagster will infer the dependencies between those dependencies.Jonathan Neo
07/25/2023, 2:07 PM@multi-asset
decorator! Was sitting here thinking to myself, “does dagster even have an interface for 1 function producing many dagster assets?”Timothee Vandeput
07/25/2023, 2:07 PMTimothee Vandeput
07/25/2023, 2:10 PMThat’s cool @Timothee Vandeput! I didn’t know about theIt does have fan out 🙂 And we also use the inputs on assets to do fan in.decorator! Was sitting here thinking to myself, “does dagster even have an interface for 1 function producing many dagster assets?”@multi-asset
Eric Coleman
07/25/2023, 2:10 PMPaul Burns
07/25/2023, 2:48 PMEric Coleman
07/25/2023, 6:18 PM