Daniel Galea
12/21/2022, 9:00 AM@graph
decorator on a function and specifying the Ops inside that function. I am getting the following error due to the Op
which has a hook
. The error: AttributeError: 'PendingNodeInvocation' object has no attribute '__name__'
. This happens because GraphDefinition
expects node_defs
to be a list of NodeDefinition
and not PendingNodeInvocation
. I tried to separate the hooks from the nodes and pass them to the repository but a repository does not accept the HookDefinition
class.
I want to apply certain hooks to specific Ops so applying a hook to the entire job doesn't fit my use case. Is there a way I can work around this? or a different way I can create Ops dynamically in the same way I want to? Below is an example of how I create my code:
from dagster import GraphDefinition, op, DependencyDefinition, OpExecutionContext
nodes = list()
def get_template_node(x: int):
@op(name=f"node{x}")
def template_node(context: OpExecutionContext, some_value) -> str:
return f"This is node {x}"
return template_node
def get_downstream_node(x: int):
@op(name=f"downstream_of_node{x}")
def downstream_node(context: OpExecutionContext, some_value) -> str:
return f"DOWNSTREAM GOT VALUE {some_value}"
return downstream_node
for x in range(0, 5):
nodes.append(get_template_node(x=x))
nodes.append(get_downstream_node(x=x).with_hooks({my_hook_definition}))
dependencies = dict()
for x in range(len(nodes) - 1, -1, -1):
if x != 0:
dependencies[nodes[x].name] = {
"some_value": DependencyDefinition(nodes[x - 1].name)
}
my_graph = GraphDefinition(
name="dynamic_graph",
description="dynamic grap desc",
node_defs=nodes,
dependencies=dependencies,
)
my_job = <http://my_graph.to|my_graph.to>_job(name="dynamic_job")
@repository
def repo():
return [my_job]