Hey, I'm trying out a POC of dagster currently and...
# ask-community
k
Hey, I'm trying out a POC of dagster currently and comparing to something like Luigi and Airflow where it is really simple to chain tasks dynamically given the input. Is it possible to do something similar in dagster? I want to do something like:
Copy code
def run_node(node):
    if node.has_child_nodes():
        for child_node in node.child_nodes():
            yield run_node(child_node)

    yield {"res": my_func(node)}


def my_job():
    graph = load_graph()
    for node in graph:
        run_node(node)
t
+1 I also would really like to do something like this.
j
You can use dynamic outputs for this https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs happy to help answer ay clarifying questions you may have!
k
Thanks @jamie, are there any runnable examples of using dynamic graphs? None of the snippets in the above docs can actually be run as-is unfortunately. Also, is it possible to combine this with
AssetsDefinition.from_graph
to create assets lineage? It looks like dagster-dbt is doing something similar to what I want -- loading some input/config and building a runnable lineage in dagster from this. I tried reprocuding the dagster-dbt way but I'm not sure how to use
AssetsDefinition
and
AssetKey
correctly. The following minimal example just gives
KeyError
when creating the
AssetsDefinition
for the
keys_by_output_name
Copy code
from typing import Dict, Tuple
from dagster import (
    AssetKey,
    AssetsDefinition,
    In,
    Nothing,
    Out,
    op,
)

def get_op(op_name, k, ins, outs):
    @op(name=op_name)
    def my_op():
        print(k)

    return my_op


def get_assets() -> AssetsDefinition:

    asset_ins: Dict[AssetKey, Tuple[str, In]] = {}
    asset_outs: Dict[AssetKey, Tuple[str, Out]] = {}

    asset_ins[AssetKey("foo")] = ("some_input", In(Nothing))
    asset_outs[AssetKey("bar")] = ("some_output", Out())

    my_op = get_op(
        op_name="some_output",
        k="foo",
        ins=dict(asset_ins.values()),
        outs=dict(asset_outs.values()),
    )

    return AssetsDefinition(
        keys_by_input_name={
            input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
        },
        keys_by_output_name={
            output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
        },
        node_def=my_op,
    )

get_assets()
I also tried doing it like this:
Copy code
from dagster import AssetIn, asset


def load_graph():
    return {
        "a1": {
            "children": [
                {"a1b1": {"children": [], "op": None}},
                {"a1b2": {"children": [], "op": None}},
            ],
            "op": None,
        },
        "b1": {
            "children": [
                {"b1b1": {"children": [], "op": None}},
                {"b1b2": {"children": [], "op": None}},
            ],
            "op": None,
        },
    }


def get_asset(name):
    @asset(name=f"{name}__input")
    def input_asset():
        return "input"

    @asset(name=name, ins={"parents_asset": AssetIn(f"{name}__input")})
    def generic_asset(parents_asset):
        print(parents_asset)

    return [input_asset, generic_asset]


def get_assets():
    loaded_assets = []
    for key, value in load_graph():
        loaded_assets = loaded_assets + get_asset(key)

    return loaded_assets


multi_assets = get_assets()
which seems to kind of work, the graph is created and it looks correct in dagit, however I get graphql errors instead:
Copy code
Operation name: AssetGraphLiveQuery

Message: maximum recursion depth exceeded while calling a Python object

Path: ["assetNodes",2,"projectedLogicalVersion"]

Locations: [{"line":44,"column":3}]
managed to materialize them in correct order as well, so it seems to work even though graphql is throwing errors?
j
your code snippets look good! sorry I misunderstood your original request. The Dynamic Outs concept is for when you want to have a graph of ops where you have some top level op that then fans out into an unknown number of downstream ops. But for what you want to do (from what I gather, that’s dynamically construct a set of assets from some external config, but correct me if i’m wrong) is done just like your code snippets. the graphql error you’re seeing is unrelated to your code and is a bug that was introduced in a recent version. If you upgrade to the latest version of dagster, I believe it will go away
k
Ah thanks for the clarification! I can confirm that the code (my last snippet) works now without any error after upgrading to dagster & dagit 1.1.7. Thanks for the help!