https://dagster.io/ logo
#ask-ai
Title
# ask-ai
a

Aki Iwa

03/05/2024, 11:32 PM
How to specify the same op in a job if you want to use multiple tags and separate arguments for the same op when executing the job.
s

Scout

03/05/2024, 11:32 PM
To specify the same op in a job with different tags and separate arguments, you can use the
.alias
method to give the op a new name within the job. This allows you to include the same op multiple times in a job with different configurations. Each aliased op can have its own set of tags and arguments. Here's an example based on the documentation:
Copy code
python
from dagster import job, op

@op
def my_op(context):
    # Your op logic here
    pass

@job
def my_job():
    my_op.alias("my_op_with_tag_1")(tag={"key1": "value1"})
    my_op.alias("my_op_with_tag_2")(tag={"key2": "value2"})
In this example,
my_op
is aliased twice within
my_job
, each time with a different name and a different set of tags. When you execute
my_job
, it will include two instances of
my_op
, each with its own configuration. Remember that the
.alias
method is used to create a new instance of an op with a different name, which allows you to configure it independently from other instances of the same op within the same job. Allow re-using Ops whilst being able to ... button Filtering (and grouping) assets on op/jo... button What's the best way to configure reusabl... button Running N independent version of a job button Thank you for your feedback!
a

Aki Iwa

03/05/2024, 11:35 PM
If I want to pass arbitrary arguments for op in job, do I use config? @Scout
s

Scout

03/05/2024, 11:38 PM
Yes, if you want to pass arbitrary arguments to an op within a job, you would use the
config
system in Dagster. Each op can have a
config_schema
that defines the structure and type of the configuration it accepts. When you execute a job, you can provide a
run_config
that specifies the configuration for each op. Here's an example of how you might define an op with a
config_schema
and then pass configuration to it within a job:
Copy code
python from dagster import op, job, Field  @op(config_schema={"arg1": str, "arg2": int}) def my_op(context):     arg1 = context.op_config["arg1"]     arg2 = context.op_config["arg2"]     # Your op logic here using arg1 and arg2  @job def my_job():     my_op()
When you execute
my_job
, you can pass the configuration for
my_op
like this:
Copy code
python result = my_job.execute_in_process(     run_config={         "ops": {             "my_op": {                 "config": {                     "arg1": "some_string",                     "arg2": 123,                 }             }         }     } )
In this
run_config
, you specify the configuration for
my_op
under the
ops
key, using the name of the op as a sub-key. The
config
key within that contains the actual configuration values that match the
config_schema
defined in the op. If you have multiple instances of the same op in a job (using aliases as mentioned in the previous response), you would provide separate configurations for each aliased op in the
run_config
. Each aliased op can have its own unique configuration.
a

Aki Iwa

03/05/2024, 11:44 PM
For example, if I have a process similar to my_op and I want to run it in 3 parallel, changing only the arguments, how should I set it up? @Scout
s

Scout

03/05/2024, 11:44 PM
To run a similar process to
my_op
in parallel with different arguments, you would use dynamic mapping in Dagster. This involves using an op to yield
DynamicOutput
objects, which can then be mapped over with another op to execute in parallel. Each
DynamicOutput
can carry different arguments that will be passed to the mapped op. Here's an example of how you might set this up:
Copy code
python from dagster import op, job, DynamicOut, DynamicOutput, Out  @op(out=DynamicOut()) def generate_dynamic_outputs(context):     for i in range(3):         # Generate different arguments for each invocation of my_op         yield DynamicOutput(value={"arg1": f"string_{i}", "arg2": i}, mapping_key=str(i))  @op(config_schema={"arg1": str, "arg2": int}) def my_op(context):     arg1 = context.op_config["arg1"]     arg2 = context.op_config["arg2"]     # Your op logic here using arg1 and arg2  @job def my_parallel_job():     dynamic_outputs = generate_dynamic_outputs()     dynamic_outputs.map(my_op)
In this setup,
generate_dynamic_outputs
yields three
DynamicOutput
objects, each with a different set of arguments. The
dynamic_outputs.map(my_op)
line tells Dagster to map
my_op
over each of these dynamic outputs, effectively running
my_op
in parallel with the different arguments provided. When you execute
my_parallel_job
, Dagster will run three parallel invocations of
my_op
, each with the
arg1
and
arg2
values specified in the
DynamicOutput
objects. You don't need to provide a separate
run_config
for each invocation because the arguments are being passed through the dynamic outputs. This approach allows you to execute the same op logic in parallel with different arguments without needing to manually alias the op or provide separate configurations for each instance.
k

Karthik Bhaskara

03/07/2024, 10:21 PM
Just building on this - Can I define an asset that executes multiple ops in parallel? If so, how?
17 Views