Anoop Sharma
05/12/2022, 7:11 PMdagit
, I get the following error:
Error loading repository location test.py:dagster.core.errors.DagsterInvalidDefinitionError: Invalid dependencies: node "a" does not have output "result". Listed as dependency for node "c input "start"
Here's my code snippet.
from dagster import DependencyDefinition, GraphDefinition, op, job, Out, In, Nothing
@op(out={"x":Out(),"y":Out()})
def a():
print("a done!")
return 4,5
@op(ins={"p":In(),"q":In()})
def b(p,q):
print("b done!")
@op(ins={"start":In(Nothing)})
def c():
print("c done!")
_ = GraphDefinition(
name="test",
node_defs=[a,b,c],
dependencies={
"a": {},
"b": {
"p": DependencyDefinition("a","x"),
"q": DependencyDefinition("a","y")
},
"c": {
"start": DependencyDefinition("a")
}
}
).to_job()
A workaround that I've found is to map start
of c with x
or y
of a but again, that's a workaround which I don't really want to do unless there is no better way.yuhan
05/12/2022, 7:40 PM@graph
or @job
decorator?@op(out={"x": Out(), "y": Out()})
def a():
print("a done!")
return 4, 5
@op(ins={"p": In(), "q": In()})
def b(p, q):
print("b done!")
@op(ins={"start_x": In(Nothing), "start_y": In(Nothing)})
def c():
print("c done!")
@job
def my_job():
a_result = a()
b(a_result.x, a_result.y)
c(start_x=a_result.x, start_y=a_result.y)
this should work, or
@job
def my_job():
x, y = a()
b(x, y)
c(start_x=x, start_y=y)
@op(ins={"start": In(Nothing)})
def c():
print("c done!")
@job
def my_job():
x, y = a()
b(x, y)
c(start=x)
because it’s possible to continue downstream of one of many outputs. for example, in this case if you yield “a.x” before “a.y” completes in op “a” and “c” is just waiting for “x”, “c” and “a” (while “y” is not yet output) could be running at the same. so assuming you want “c” to wait for “a” to finish completely, i’d recommend explicitly setting two Nothing dependencies on “c”.Anoop Sharma
05/12/2022, 8:06 PMGraphDefinition
instead of @job
decorator. To your answer to set two Nothing
dependencies on c, how is such a scenario even possible when both the outputs are getting returned in a single return
statement? Also, this is still a workaround since c is not really dependent on x or y. Is there a better way to achieve this?yuhan
05/12/2022, 8:10 PMTo your answer to set tworight in your case (the return case), it’s not possible. but the fact that dagster also allows yield cases would make your code a bit verbose - you’ll need to explicitly set two nothing dependencies on “c”dependencies on c, how is such a scenario even possible when both the outputs are getting returned in a singleNothing
statement?return
Also, this is still a workaround since c is not really dependent on x or y. Is there a better way to achieve this?when you say “c” is not really dependent on x or y, did you mean “c” is dependent on the completion of “a”?
antonl
05/12/2022, 8:14 PMyuhan
05/12/2022, 8:15 PMAnoop Sharma
05/12/2022, 8:33 PMantonl
05/12/2022, 9:06 PMdef make_metrics_workflow(
name: str,
metrics: List[OpDefinition],
column_groups: Dict[str, List[str]],
groupby: Union[str, List[str]] = None,
featurewise: bool = True,
):
"""
Define the workflow on data and twin samples.
Args:
name: name of workflow to use
metrics: a list of metrics to use to compare the results of evaluating the endpoints
on both branches
column_groups: a map of group name to columns in the group to pass to metrics
groupby: whether to group the results by a key
featurewise: whether to evaluate the metric per-feature
Returns:
metrics_graph: a GraphDefinition that computes the passed metrics on the endpoints.
Notes:
Metric workflows must have unique names in a graph.
"""
ins = {"data_result": GraphIn(), "twin_result": GraphIn()}
out = {
f"{output_name}": GraphOut()
for output_name in _flatten_metric_outputs(metrics)
}
@graph(name=name, out=out, ins=ins)
def inner(data_result, twin_result):
# pylint: disable=too-many-locals
result = {}
common_config = {"groupby": groupby, "featurewise": featurewise}
for group_name, columns in column_groups.items():
config = {"columns": columns, **common_config}
for m in metrics:
alias = f"{group_name}_{m.name}"
# compute metric
metric_ = m.configured(config, name=alias)
partial_result = metric_(data_result, twin_result)
for output_ in _homogenize_op_outputs(partial_result):
result_key = f"{output_.output_name}"
store = result.setdefault(result_key, [])
store.append(output_)
# postprocess results
result = {
k: collect_results(*v, name=f"{name}_{k}_collect_groups", axis=0)
for k, v in result.items()
}
return result
return inner
Anoop Sharma
05/13/2022, 6:02 AM