Any thoughts on how to parse the dbt manifset to c...
# ask-community
a
Any thoughts on how to parse the dbt manifset to create ops dynamically?
s
cc @owen
o
hi @Alec Ryan, right now, we do have some functionality that parses through the dbt manifest, but we don't create multiple ops from it, instead we create a single op that backs multiple interconnected Software Defined Assets: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-dbt/dagster_dbt/asset_defs.py?L58. Depending on your usecase, this might be what you want.
just for posterity, I did track down some old code I wrote to do the op thing you're describing. it still uses a bunch of legacy interfaces, but if you're curious about what the op-based solution would look like (rather than the asset-based solution above), here it is:
a
Thanks @owen
o
Copy code
import subprocess
from typing import Dict, Any
from dagster import (
    solid,
    OutputDefinition,
    InputDefinition,
    Nothing,
    AssetKey,
    CompositeSolidDefinition,
    DependencyDefinition,
    MultiDependencyDefinition,
    OutputMapping,
    InputMapping,
)
from dagster.core.definitions.output import OutputPointer
from dagster.core.definitions.input import InputPointer
import json


def _dbt_ls(project_dir: str, profiles_dir: str, select: str):
    command_list = [
        "dbt",
        "ls",
        "--project-dir",
        project_dir,
        "--profiles-dir",
        profiles_dir,
        "--select",
        select,
        "--resource-type",
        "model",
        "--output",
        "json",
    ]
    process = subprocess.Popen(command_list, stdout=subprocess.PIPE)
    return [json.loads(line.decode("utf-8").strip()) for line in process.stdout or []]


def get_node_solid(node_info: Dict[str, Any]):
    node_name = _get_node_name(node_info)
    dagster_name = _get_dagster_name(node_name)

    parent_nodes = node_info["depends_on"]["nodes"]

    @solid(
        name=dagster_name,
        required_resource_keys={"dbt"},
        input_defs=[
            InputDefinition(name=_get_dagster_name(n), dagster_type=Nothing) for n in parent_nodes
        ]
        if len(parent_nodes) > 0
        else [InputDefinition(name="start_after", dagster_type=Nothing)],
        output_defs=[
            OutputDefinition(
                asset_key=AssetKey(["dbt", *node_name.split(".")[1:]]),
                dagster_type=Nothing,
            )
        ],
        tags={"kind": "dbt"},
    )
    def _solid(context):
        context.resources.dbt.run(extra_flags={"models": node_name[len("model.") :]})

    return _solid


def _get_dagster_name(node_name: str):
    return node_name.split(".")[-1]


def _get_node_name(node_info: Dict[str, Any]):
    return ".".join([node_info["resource_type"], node_info["package_name"], node_info["name"]])


def get_dag_solid(
    project_dir: str, profiles_dir: str = None, select: str = "*", solid_name: str = "dbt_dag"
):
    if profiles_dir is None:
        profiles_dir = project_dir + "/config"

    dbt_nodes = _dbt_ls(project_dir, profiles_dir, select)

    node_to_deps = {
        _get_node_name(node_info): node_info["depends_on"]["nodes"] for node_info in dbt_nodes
    }

    solid_defs = [get_node_solid(node_info) for node_info in dbt_nodes]

    def get_dep_def(node_name):
        if len(node_to_deps[node_name]) == 0:
            return MultiDependencyDefinition([DependencyDefinition("run_dbt", "result")])
        return MultiDependencyDefinition(
            [
                DependencyDefinition(_get_dagster_name(parent_name), "result")
                for parent_name in node_to_deps[node_name]
            ]
        )

    # read in dependency information returned by dbt ls and format it for dagster
    dependencies = {
        _get_dagster_name(node_name): {"log_stream": get_dep_def(node_name)}
        for node_name in node_to_deps.keys()
    }

    # These are dbt nodes with no children
    leaf_names = set(node_to_deps.keys())
    for parents in node_to_deps.values():
        # if you are a parent of something, you have children
        leaf_names -= set(parents)

    # create an output for any leaf node
    output_mappings = [
        OutputMapping(
            OutputDefinition(name=_get_dagster_name(node_name)),
            maps_from=OutputPointer(_get_dagster_name(node_name)),
        )
        for node_name in leaf_names
    ]

    # create an input
    input_mappings = [
        InputMapping(
            InputDefinition(name="start_after", dagster_type=Nothing),
            maps_to=InputPointer("run_dbt", "start_after"),
        )
    ]

    return CompositeSolidDefinition(
        name=solid_name,
        solid_defs=solid_defs,
        dependencies=dependencies,
        input_mappings=input_mappings,
        output_mappings=output_mappings,
        tags={"kind": "dbt"},
    )
```
a
My thought is that it would be great to visualize all of the dbt models in my manifest
and replay the failures (if any)
o
I think the asset-based solution is likely what you want. It doesn't currently support retries from failure (you either run the entire thing or nothing), but that is a feature we're targeting for the 0.15.0 release.
a
Awesome, excited for that release lol
thanks again
o
we're excited for it too 😄 -- and no problem