how can I use reuse op graphs in multiple jobs?
# ask-ai
b
how can I use reuse op graphs in multiple jobs?
s
To reuse op graphs in multiple jobs, you can define a graph and then include it in different job definitions. Here's a step-by-step guide based on the documentation: 1. Define your graph: Start by defining a graph that encapsulates the logic you want to reuse. A graph is a reusable, composable unit that groups together ops (operations).
Copy code
python     @graph     def my_reusable_graph(input1, input2):         # Define the operations and logic within the graph         op1_result = my_op1(input1)         op2_result = my_op2(input2)         final_result = my_op3(op1_result, op2_result)         return final_result
2. Create job definitions: Once you have your graph defined, you can create multiple job definitions that include this graph. Each job can have its own configuration and can be tailored to different use cases or environments.
Copy code
python     @job     def job_a():         my_reusable_graph()      @job     def job_b():         my_reusable_graph()
3. Configure jobs: If your graph requires different configurations for different jobs, you can pass configuration values when constructing the jobs.
Copy code
python     @job(config=my_config_for_job_a)     def job_a():         my_reusable_graph()      @job(config=my_config_for_job_b)     def job_b():         my_reusable_graph()
4. Execute jobs: You can execute these jobs independently, and each will run the shared graph according to its specific configuration. By following these steps, you can effectively reuse a graph in multiple jobs, keeping your code DRY (Don't Repeat Yourself) and making it easier to maintain and update your data pipelines. Remember that the graph itself is a static definition, and it's the job configurations that can provide the necessary flexibility for different scenarios.
b
@Scout can you give an example of using two graphs in one job? Where graph_1 is a dependency for graph_2?
s
Certainly! To use two graphs in one job where
graph_1
is a dependency for
graph_2
, you can define each graph separately and then compose them within a job. Here's an example:
Copy code
python from dagster import graph, op, job  # Define the first graph @op def op_a():     return "result_a"  @graph def graph_1():     return op_a()  # Define the second graph, which depends on the output of the first graph @op def op_b(input_from_a):     return f"result_b with input: {input_from_a}"  @graph def graph_2(input_from_a):     return op_b(input_from_a)  # Compose both graphs in a job @job def my_composed_job():     result_from_graph_1 = graph_1()     graph_2(result_from_graph_1)
In this example,
graph_1
is executed first, and its output is passed as an input to
graph_2
. The
my_composed_job
job orchestrates the execution of both graphs, ensuring that
graph_1
runs before
graph_2
and that the data flows between them as expected. When you execute
my_composed_job
, Dagster will automatically resolve the dependencies and execute the ops in the correct order based on the data dependencies you've defined.
b
@Scout how can I pass the graph into the
out
argument of an op? I'm getting an error saying
In @job build_database_clones, received invalid type <class 'NoneType'> for input "end" (passed by keyword) in op invocation "create_empty_database_clones_op". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
my job looks like below where
clone_dbt_models
is the graph
Copy code
create_empty_database_clones_op(start=drop_database_clones_op(), end=clone_dbt_models())
s
In Dagster, you cannot directly pass a graph into the
out
argument of an op because graphs do not produce outputs in the same way that ops do. Instead, graphs define a set of computations, and you can obtain outputs from the ops within the graph. If you want to use the output of a graph within an op, you need to ensure that the graph has an op that produces an output, and then you can use that output in subsequent ops or graphs. Here's an example of how you might structure your code to use the output of a graph within an op:
Copy code
python from dagster import graph, op, job, Out  # Define your ops @op def drop_database_clones_op():     # Logic to drop database clones     pass  @op(out=Out()) def create_empty_database_clones_op(start, end):     # Logic to create empty database clones     # 'start' and 'end' are used to ensure execution order     return "database_clone_result"  # Define your graph that ends with an op that produces an output @op(out=Out()) def final_op_in_graph():     # Final op logic that produces an output     return "result_from_graph"  @graph def clone_dbt_models():     # Define the graph logic here     # Ensure the final op in the graph is 'final_op_in_graph'     return final_op_in_graph()  # Compose the job @job def build_database_clones():     # First, execute the drop_database_clones_op     start_result = drop_database_clones_op()      # Then, execute the graph and get the output from its final op     end_result = clone_dbt_models()      # Finally, pass the results to create_empty_database_clones_op     create_empty_database_clones_op(start=start_result, end=end_result)
In this example,
drop_database_clones_op
is executed first, and then
clone_dbt_models
graph is executed. The output of the final op in the graph (
final_op_in_graph
) is captured as
end_result
. This output is then passed to
create_empty_database_clones_op
along with the result of
drop_database_clones_op
(
start_result
) to ensure the correct execution order. Make sure that the final op in your graph (
final_op_in_graph
in the example) is actually producing an output that you can pass to the next op. If your graph does not have an op that produces an output, you will need to modify the graph to include such an op.