Alec Ryan
04/15/2022, 3:27 PMsandy
04/15/2022, 4:31 PMowen
04/15/2022, 4:38 PMAlec Ryan
04/15/2022, 4:40 PMowen
04/15/2022, 4:40 PMimport subprocess
from typing import Dict, Any
from dagster import (
solid,
OutputDefinition,
InputDefinition,
Nothing,
AssetKey,
CompositeSolidDefinition,
DependencyDefinition,
MultiDependencyDefinition,
OutputMapping,
InputMapping,
)
from dagster.core.definitions.output import OutputPointer
from dagster.core.definitions.input import InputPointer
import json
def _dbt_ls(project_dir: str, profiles_dir: str, select: str):
command_list = [
"dbt",
"ls",
"--project-dir",
project_dir,
"--profiles-dir",
profiles_dir,
"--select",
select,
"--resource-type",
"model",
"--output",
"json",
]
process = subprocess.Popen(command_list, stdout=subprocess.PIPE)
return [json.loads(line.decode("utf-8").strip()) for line in process.stdout or []]
def get_node_solid(node_info: Dict[str, Any]):
node_name = _get_node_name(node_info)
dagster_name = _get_dagster_name(node_name)
parent_nodes = node_info["depends_on"]["nodes"]
@solid(
name=dagster_name,
required_resource_keys={"dbt"},
input_defs=[
InputDefinition(name=_get_dagster_name(n), dagster_type=Nothing) for n in parent_nodes
]
if len(parent_nodes) > 0
else [InputDefinition(name="start_after", dagster_type=Nothing)],
output_defs=[
OutputDefinition(
asset_key=AssetKey(["dbt", *node_name.split(".")[1:]]),
dagster_type=Nothing,
)
],
tags={"kind": "dbt"},
)
def _solid(context):
context.resources.dbt.run(extra_flags={"models": node_name[len("model.") :]})
return _solid
def _get_dagster_name(node_name: str):
return node_name.split(".")[-1]
def _get_node_name(node_info: Dict[str, Any]):
return ".".join([node_info["resource_type"], node_info["package_name"], node_info["name"]])
def get_dag_solid(
project_dir: str, profiles_dir: str = None, select: str = "*", solid_name: str = "dbt_dag"
):
if profiles_dir is None:
profiles_dir = project_dir + "/config"
dbt_nodes = _dbt_ls(project_dir, profiles_dir, select)
node_to_deps = {
_get_node_name(node_info): node_info["depends_on"]["nodes"] for node_info in dbt_nodes
}
solid_defs = [get_node_solid(node_info) for node_info in dbt_nodes]
def get_dep_def(node_name):
if len(node_to_deps[node_name]) == 0:
return MultiDependencyDefinition([DependencyDefinition("run_dbt", "result")])
return MultiDependencyDefinition(
[
DependencyDefinition(_get_dagster_name(parent_name), "result")
for parent_name in node_to_deps[node_name]
]
)
# read in dependency information returned by dbt ls and format it for dagster
dependencies = {
_get_dagster_name(node_name): {"log_stream": get_dep_def(node_name)}
for node_name in node_to_deps.keys()
}
# These are dbt nodes with no children
leaf_names = set(node_to_deps.keys())
for parents in node_to_deps.values():
# if you are a parent of something, you have children
leaf_names -= set(parents)
# create an output for any leaf node
output_mappings = [
OutputMapping(
OutputDefinition(name=_get_dagster_name(node_name)),
maps_from=OutputPointer(_get_dagster_name(node_name)),
)
for node_name in leaf_names
]
# create an input
input_mappings = [
InputMapping(
InputDefinition(name="start_after", dagster_type=Nothing),
maps_to=InputPointer("run_dbt", "start_after"),
)
]
return CompositeSolidDefinition(
name=solid_name,
solid_defs=solid_defs,
dependencies=dependencies,
input_mappings=input_mappings,
output_mappings=output_mappings,
tags={"kind": "dbt"},
)
```
Alec Ryan
04/15/2022, 4:40 PMowen
04/15/2022, 4:41 PMAlec Ryan
04/15/2022, 6:26 PMowen
04/15/2022, 6:26 PM