Lucas Humbert
02/21/2024, 10:36 AMScout
02/21/2024, 10:36 AM@op
decorator to define each op. 2. If an op requires configuration, you can define a config schema for that op using the config_schema
parameter. 3. In the graph or job definition, you would connect the ops by passing the output of one op as the input to another. 4. When executing the graph or job, you provide the config values that match the defined config schemas for the ops. Here's an example of how you might define two ops where one op's output is used as a parameter for another op: python from dagster import op, job, Out, In, ConfigSchema @op(config_schema={"param": str}) def op_with_config(context): # Use the config parameter in the op's logic param = context.op_config["param"] # ... perform some operation with param ... return result @op def downstream_op(context, input_param): # Use the input parameter in this op's logic # ... perform some operation with input_param ... @job def my_job(): downstream_op(op_with_config())
When executing my_job
, you would provide the config for `op_with_config`: python result = my_job.execute_in_process( run_config={ "ops": { "op_with_config": { "config": { "param": "value" } } } } )
In this example, op_with_config
has a config schema that expects a parameter named "param". When my_job
is executed, the run_config
provides the value for "param", which is then used within op_with_config
. The output of op_with_config
is passed as an input to downstream_op
. Remember that the actual invocation of ops and the passing of parameters are handled by Dagster's execution engine when you run the job or graph, not by directly calling one op from another in the compute function code.