Hi everyone, I am trying to create a graph dynamic...
# ask-community
d
Hi everyone, I am trying to create a graph dynamically, meaning that the number of Ops in a graph is determined by an external configuraiton. The use-case is that I want to run spark jobs in steps, and one job might have 5 transformations while another might have 3. The Op launches a spark job on EMR and is templated. The output of one op is not the input of the other. I simply want them chained in a certain order and I don't want to have to hard code the Ops by using the
@graph
decorator on a function and specifying the Ops inside that function. I am getting the following error due to the
Op
which has a
hook
. The error:
AttributeError: 'PendingNodeInvocation' object has no attribute '__name__'
. This happens because
GraphDefinition
expects
node_defs
to be a list of
NodeDefinition
and not
PendingNodeInvocation
. I tried to separate the hooks from the nodes and pass them to the repository but a repository does not accept the
HookDefinition
class. I want to apply certain hooks to specific Ops so applying a hook to the entire job doesn't fit my use case. Is there a way I can work around this? or a different way I can create Ops dynamically in the same way I want to? Below is an example of how I create my code:
from dagster import GraphDefinition, op, DependencyDefinition, OpExecutionContext
nodes = list()
def get_template_node(x: int):
@op(name=f"node{x}")
def template_node(context: OpExecutionContext, some_value) -> str:
return f"This is node {x}"
return template_node
def get_downstream_node(x: int):
@op(name=f"downstream_of_node{x}")
def downstream_node(context: OpExecutionContext, some_value) -> str:
return f"DOWNSTREAM GOT VALUE {some_value}"
return downstream_node
for x in range(0, 5):
nodes.append(get_template_node(x=x))
nodes.append(get_downstream_node(x=x).with_hooks({my_hook_definition}))
dependencies = dict()
for x in range(len(nodes) - 1, -1, -1):
if x != 0:
dependencies[nodes[x].name] = {
"some_value": DependencyDefinition(nodes[x - 1].name)
}
my_graph = GraphDefinition(
name="dynamic_graph",
description="dynamic grap desc",
node_defs=nodes,
dependencies=dependencies,
)
my_job = <http://my_graph.to|my_graph.to>_job(name="dynamic_job")
@repository
def repo():
return [my_job]