https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Anoop Sharma

05/12/2022, 7:11 PM
I am trying to set up a very basic workflow but am getting an error. I have 3 ops - a, b and c. a is a predecessor to both b and c. b has a parametric-dependency on a while c has an order-based dependency on a. a has 2 outputs - x and y. Both of these outputs are being used by b. When I run the code using
dagit
, I get the following error:
Copy code
Error loading repository location test.py:dagster.core.errors.DagsterInvalidDefinitionError: Invalid dependencies: node "a" does not have output "result". Listed as dependency for node "c input "start"
Here's my code snippet.
Copy code
from dagster import DependencyDefinition, GraphDefinition, op, job, Out, In, Nothing


@op(out={"x":Out(),"y":Out()})
def a():
    print("a done!")
    return 4,5

@op(ins={"p":In(),"q":In()})
def b(p,q):
    print("b done!")

@op(ins={"start":In(Nothing)})
def c():
    print("c done!")


_ = GraphDefinition(
    name="test",
    node_defs=[a,b,c],
    dependencies={
        "a": {},
        "b": {
            "p": DependencyDefinition("a","x"),
            "q": DependencyDefinition("a","y")
        },
        "c": {
            "start": DependencyDefinition("a")
        }
    }
).to_job()
A workaround that I've found is to map
start
of c with
x
or
y
of a but again, that's a workaround which I don't really want to do unless there is no better way.
y

yuhan

05/12/2022, 7:40 PM
Hi @Anoop Sharma is there a reason that you are using GraphDefinition instead of the
@graph
or
@job
decorator?
Copy code
@op(out={"x": Out(), "y": Out()})
def a():
    print("a done!")
    return 4, 5


@op(ins={"p": In(), "q": In()})
def b(p, q):
    print("b done!")


@op(ins={"start_x": In(Nothing), "start_y": In(Nothing)})
def c():
    print("c done!")


@job
def my_job():
    a_result = a()
    b(a_result.x, a_result.y)
    c(start_x=a_result.x, start_y=a_result.y)
this should work, or
Copy code
@job
def my_job():
    x, y = a()
    b(x, y)
    c(start_x=x, start_y=y)
you can also make c only depend on “x” (the first output of “a”)
Copy code
@op(ins={"start": In(Nothing)})
def c():
    print("c done!")


@job
def my_job():
    x, y = a()
    b(x, y)
    c(start=x)
because it’s possible to continue downstream of one of many outputs. for example, in this case if you yield “a.x” before “a.y” completes in op “a” and “c” is just waiting for “x”, “c” and “a” (while “y” is not yet output) could be running at the same. so assuming you want “c” to wait for “a” to finish completely, i’d recommend explicitly setting two Nothing dependencies on “c”.
a

Anoop Sharma

05/12/2022, 8:06 PM
@yuhan I am actually writing a workflow where I need to create ops and graphs/jobs dynamically which are later used to create repositories dynamically. This is why I am using
GraphDefinition
instead of
@job
decorator. To your answer to set two
Nothing
dependencies on c, how is such a scenario even possible when both the outputs are getting returned in a single
return
statement? Also, this is still a workaround since c is not really dependent on x or y. Is there a better way to achieve this?
y

yuhan

05/12/2022, 8:10 PM
To your answer to set two
Nothing
dependencies on c, how is such a scenario even possible when both the outputs are getting returned in a single
return
statement?
right in your case (the return case), it’s not possible. but the fact that dagster also allows yield cases would make your code a bit verbose - you’ll need to explicitly set two nothing dependencies on “c”
Also, this is still a workaround since c is not really dependent on x or y. Is there a better way to achieve this?
when you say “c” is not really dependent on x or y, did you mean “c” is dependent on the completion of “a”?
a

antonl

05/12/2022, 8:14 PM
I also do these “compile-time” dynamic graphs, but I found that you can still use the @graph decorator. You just use the factory function pattern which defines a graph and returns it.
y

yuhan

05/12/2022, 8:15 PM
yea - you can have that decorated func inside the factory code
a

Anoop Sharma

05/12/2022, 8:33 PM
@yuhan Yes. c is only dependent on the completion of a.
@antonl Do you have a sample code that I can look at for the factory function pattern? That would be very helpful.
a

antonl

05/12/2022, 9:06 PM
Oh, you’ve probably seen stuff like this in the dagster docs. I don’t have a minimal example. Here’s a snippet though:
Copy code
def make_metrics_workflow(
    name: str,
    metrics: List[OpDefinition],
    column_groups: Dict[str, List[str]],
    groupby: Union[str, List[str]] = None,
    featurewise: bool = True,
):
    """
    Define the workflow on data and twin samples.
    Args:
        name: name of workflow to use
        metrics: a list of metrics to use to compare the results of evaluating the endpoints
            on both branches
        column_groups: a map of group name to columns in the group to pass to metrics
        groupby: whether to group the results by a key
        featurewise: whether to evaluate the metric per-feature
    Returns:
        metrics_graph: a GraphDefinition that computes the passed metrics on the endpoints.
    Notes:
        Metric workflows must have unique names in a graph.
    """
    ins = {"data_result": GraphIn(), "twin_result": GraphIn()}
    out = {
        f"{output_name}": GraphOut()
        for output_name in _flatten_metric_outputs(metrics)
    }

    @graph(name=name, out=out, ins=ins)
    def inner(data_result, twin_result):
        # pylint: disable=too-many-locals
        result = {}
        common_config = {"groupby": groupby, "featurewise": featurewise}
        for group_name, columns in column_groups.items():
            config = {"columns": columns, **common_config}

            for m in metrics:
                alias = f"{group_name}_{m.name}"

                # compute metric
                metric_ = m.configured(config, name=alias)
                partial_result = metric_(data_result, twin_result)

                for output_ in _homogenize_op_outputs(partial_result):
                    result_key = f"{output_.output_name}"
                    store = result.setdefault(result_key, [])
                    store.append(output_)

        # postprocess results
        result = {
            k: collect_results(*v, name=f"{name}_{k}_collect_groups", axis=0)
            for k, v in result.items()
        }
        return result

    return inner
a

Anoop Sharma

05/13/2022, 6:02 AM
Thanks @antonl. Will go through this.
2 Views