Brandon Peebles
03/06/2024, 6:06 PMScout
03/06/2024, 6:06 PMpython @graph def my_reusable_graph(input1, input2): # Define the operations and logic within the graph op1_result = my_op1(input1) op2_result = my_op2(input2) final_result = my_op3(op1_result, op2_result) return final_result
2. Create job definitions: Once you have your graph defined, you can create multiple job definitions that include this graph. Each job can have its own configuration and can be tailored to different use cases or environments. python @job def job_a(): my_reusable_graph() @job def job_b(): my_reusable_graph()
3. Configure jobs: If your graph requires different configurations for different jobs, you can pass configuration values when constructing the jobs. python @job(config=my_config_for_job_a) def job_a(): my_reusable_graph() @job(config=my_config_for_job_b) def job_b(): my_reusable_graph()
4. Execute jobs: You can execute these jobs independently, and each will run the shared graph according to its specific configuration. By following these steps, you can effectively reuse a graph in multiple jobs, keeping your code DRY (Don't Repeat Yourself) and making it easier to maintain and update your data pipelines. Remember that the graph itself is a static definition, and it's the job configurations that can provide the necessary flexibility for different scenarios.Brandon Peebles
03/06/2024, 6:19 PMScout
03/06/2024, 6:19 PMgraph_1
is a dependency for graph_2
, you can define each graph separately and then compose them within a job. Here's an example: python from dagster import graph, op, job # Define the first graph @op def op_a(): return "result_a" @graph def graph_1(): return op_a() # Define the second graph, which depends on the output of the first graph @op def op_b(input_from_a): return f"result_b with input: {input_from_a}" @graph def graph_2(input_from_a): return op_b(input_from_a) # Compose both graphs in a job @job def my_composed_job(): result_from_graph_1 = graph_1() graph_2(result_from_graph_1)
In this example, graph_1
is executed first, and its output is passed as an input to graph_2
. The my_composed_job
job orchestrates the execution of both graphs, ensuring that graph_1
runs before graph_2
and that the data flows between them as expected. When you execute my_composed_job
, Dagster will automatically resolve the dependencies and execute the ops in the correct order based on the data dependencies you've defined.Brandon Peebles
03/06/2024, 8:33 PMout
argument of an op? I'm getting an error saying In @job build_database_clones, received invalid type <class 'NoneType'> for input "end" (passed by keyword) in op invocation "create_empty_database_clones_op". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
my job looks like below where clone_dbt_models
is the graph
create_empty_database_clones_op(start=drop_database_clones_op(), end=clone_dbt_models())
Scout
03/06/2024, 8:33 PMout
argument of an op because graphs do not produce outputs in the same way that ops do. Instead, graphs define a set of computations, and you can obtain outputs from the ops within the graph. If you want to use the output of a graph within an op, you need to ensure that the graph has an op that produces an output, and then you can use that output in subsequent ops or graphs. Here's an example of how you might structure your code to use the output of a graph within an op: python from dagster import graph, op, job, Out # Define your ops @op def drop_database_clones_op(): # Logic to drop database clones pass @op(out=Out()) def create_empty_database_clones_op(start, end): # Logic to create empty database clones # 'start' and 'end' are used to ensure execution order return "database_clone_result" # Define your graph that ends with an op that produces an output @op(out=Out()) def final_op_in_graph(): # Final op logic that produces an output return "result_from_graph" @graph def clone_dbt_models(): # Define the graph logic here # Ensure the final op in the graph is 'final_op_in_graph' return final_op_in_graph() # Compose the job @job def build_database_clones(): # First, execute the drop_database_clones_op start_result = drop_database_clones_op() # Then, execute the graph and get the output from its final op end_result = clone_dbt_models() # Finally, pass the results to create_empty_database_clones_op create_empty_database_clones_op(start=start_result, end=end_result)
In this example, drop_database_clones_op
is executed first, and then clone_dbt_models
graph is executed. The output of the final op in the graph (final_op_in_graph
) is captured as end_result
. This output is then passed to create_empty_database_clones_op
along with the result of drop_database_clones_op
(start_result
) to ensure the correct execution order. Make sure that the final op in your graph (final_op_in_graph
in the example) is actually producing an output that you can pass to the next op. If your graph does not have an op that produces an output, you will need to modify the graph to include such an op.