Aki Iwa
03/05/2024, 11:32 PMScout
03/05/2024, 11:32 PM.alias
method to give the op a new name within the job. This allows you to include the same op multiple times in a job with different configurations. Each aliased op can have its own set of tags and arguments.
Here's an example based on the documentation:
python
from dagster import job, op
@op
def my_op(context):
# Your op logic here
pass
@job
def my_job():
my_op.alias("my_op_with_tag_1")(tag={"key1": "value1"})
my_op.alias("my_op_with_tag_2")(tag={"key2": "value2"})
In this example, my_op
is aliased twice within my_job
, each time with a different name and a different set of tags. When you execute my_job
, it will include two instances of my_op
, each with its own configuration.
Remember that the .alias
method is used to create a new instance of an op with a different name, which allows you to configure it independently from other instances of the same op within the same job. Allow re-using Ops whilst being able to ... button Filtering (and grouping) assets on op/jo... button What's the best way to configure reusabl... button Running N independent version of a job button Thank you for your feedback!Aki Iwa
03/05/2024, 11:35 PMScout
03/05/2024, 11:38 PMconfig
system in Dagster. Each op can have a config_schema
that defines the structure and type of the configuration it accepts. When you execute a job, you can provide a run_config
that specifies the configuration for each op. Here's an example of how you might define an op with a config_schema
and then pass configuration to it within a job: python from dagster import op, job, Field @op(config_schema={"arg1": str, "arg2": int}) def my_op(context): arg1 = context.op_config["arg1"] arg2 = context.op_config["arg2"] # Your op logic here using arg1 and arg2 @job def my_job(): my_op()
When you execute my_job
, you can pass the configuration for my_op
like this: python result = my_job.execute_in_process( run_config={ "ops": { "my_op": { "config": { "arg1": "some_string", "arg2": 123, } } } } )
In this run_config
, you specify the configuration for my_op
under the ops
key, using the name of the op as a sub-key. The config
key within that contains the actual configuration values that match the config_schema
defined in the op. If you have multiple instances of the same op in a job (using aliases as mentioned in the previous response), you would provide separate configurations for each aliased op in the run_config
. Each aliased op can have its own unique configuration.Aki Iwa
03/05/2024, 11:44 PMScout
03/05/2024, 11:44 PMmy_op
in parallel with different arguments, you would use dynamic mapping in Dagster. This involves using an op to yield DynamicOutput
objects, which can then be mapped over with another op to execute in parallel. Each DynamicOutput
can carry different arguments that will be passed to the mapped op. Here's an example of how you might set this up: python from dagster import op, job, DynamicOut, DynamicOutput, Out @op(out=DynamicOut()) def generate_dynamic_outputs(context): for i in range(3): # Generate different arguments for each invocation of my_op yield DynamicOutput(value={"arg1": f"string_{i}", "arg2": i}, mapping_key=str(i)) @op(config_schema={"arg1": str, "arg2": int}) def my_op(context): arg1 = context.op_config["arg1"] arg2 = context.op_config["arg2"] # Your op logic here using arg1 and arg2 @job def my_parallel_job(): dynamic_outputs = generate_dynamic_outputs() dynamic_outputs.map(my_op)
In this setup, generate_dynamic_outputs
yields three DynamicOutput
objects, each with a different set of arguments. The dynamic_outputs.map(my_op)
line tells Dagster to map my_op
over each of these dynamic outputs, effectively running my_op
in parallel with the different arguments provided. When you execute my_parallel_job
, Dagster will run three parallel invocations of my_op
, each with the arg1
and arg2
values specified in the DynamicOutput
objects. You don't need to provide a separate run_config
for each invocation because the arguments are being passed through the dynamic outputs. This approach allows you to execute the same op logic in parallel with different arguments without needing to manually alias the op or provide separate configurations for each instance.Karthik Bhaskara
03/07/2024, 10:21 PM