https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

Sandeep Mullangi

03/13/2024, 4:57 PM
@Scout How do i send multiple partitionkeys for RunRequest as RunRequest accepts parameter partition_key which is string
s

Scout

03/13/2024, 4:57 PM
The
RunRequest
object in Dagster is designed to request a run for a specific partition when working with partitioned assets or jobs. As per the documentation, the
partition_key
parameter of a
RunRequest
is a string that specifies the key of the partition for which you want to request a run. If you need to handle multiple partitions within a single run, you would typically iterate over the partition keys and create a separate
RunRequest
for each one. However, this approach may not be suitable if you want to process multiple partitions within a single run to reduce overhead. The documentation does not explicitly provide a built-in method for submitting a single
RunRequest
for multiple partition keys. However, there are discussions and issues that suggest workarounds or feature requests related to handling multiple partitions. One such discussion suggests that you can use tags to include information about a range of partitions when creating a
RunRequest
. For example, you could use reserved tags like
ASSET_PARTITION_RANGE_START_TAG
and
ASSET_PARTITION_RANGE_END_TAG
to specify a range of partitions to process in a single run. Here's an example from the discussions:
Copy code
python @sensor(asset_selection=AssetSelection.keys(my_asset.key)) def stations_sensor(context):     stations = ["1", "2", "3"]     return SensorResult(         run_requests=[RunRequest(run_key=None, tags={"dagster/partition_start": "1", "dagster/partition_end": "3"})],         dynamic_partitions_requests=[stations_partition.build_add_request(stations)],     )
In this example, the tags are used to indicate a range of partitions to be processed. However, this is not a standard feature and would require custom handling in your job logic to interpret these tags and process the appropriate partitions. If you have a specific use case that requires processing multiple partitions in a single run and the current
RunRequest
functionality does not meet your needs, you may consider reaching out on the Dagster GitHub discussions or issues to inquire about potential features or best practices for such scenarios.
s

Sandeep Mullangi

03/13/2024, 5:31 PM
@Scout I have a schedule which triggers job and my schedule returns Runrequest with parameters. How do i capture this in job and pass it to op?
s

Scout

03/13/2024, 5:31 PM
To capture parameters from a
RunRequest
in a job and pass them to an op, you can use the
run_config
field of the
RunRequest
to specify the configuration for the job. This configuration can include parameters that your ops need to execute. Here's a step-by-step guide on how to do this: 1. Define your op with the required configuration schema. The op should accept a context parameter from which you can access the configuration values. 2. Create a job that includes this op. 3. Define a schedule that returns a
RunRequest
with the
run_config
field set. The
run_config
should match the structure expected by the job and its ops. 4. When the schedule triggers a run, Dagster will use the
run_config
provided in the
RunRequest
to configure the job and its ops. Here's an example to illustrate this process:
Copy code
python from dagster import job, op, schedule, RunRequest  # Define an op that requires configuration @op(config_schema={"param": str}) def my_op(context):     # Access the configuration parameter     param = context.op_config["param"]     <http://context.log.info|context.log.info>(f"Parameter value: {param}")  # Create a job that includes the op @job def my_job():     my_op()  # Define a schedule that returns a RunRequest with parameters @schedule(cron_schedule="0 0 * * *", job=my_job) def my_schedule(context):     # Define the run configuration with the parameter you want to pass     run_config = {         "ops": {             "my_op": {                 "config": {                     "param": "value_from_schedule"                 }             }         }     }          # Return a RunRequest with the run configuration     return RunRequest(run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), run_config=run_config)  # When the schedule triggers a run, it will use the run_config provided in the RunRequest # to configure the job and pass the parameter to the op.
In this example, the
my_schedule
schedule creates a
RunRequest
with a
run_config
that specifies the configuration for
my_op
. When the schedule triggers a run, the
param
value will be passed to
my_op
, and the op will log the parameter value. Make sure that the structure of the
run_config
in the
RunRequest
matches the expected configuration schema of your job and ops. This way, the parameters will be correctly passed through when the job is executed.
s

Sandeep Mullangi

03/14/2024, 12:18 PM
@Scout Is this right syntax for op decorator? @op(required_resource_keys={"dbt"}, config_schema={"param": list, "model_name": str},out={"result": Out()})
s

Scout

03/14/2024, 12:18 PM
The syntax you've provided for the
@op
decorator is almost correct, but there are a few adjustments needed to align with the correct usage according to the documentation. Here's the corrected syntax based on the documentation:
Copy code
python from dagster import op, Out  # Define an op with required resources, a config schema, and an output definition @op(required_resource_keys={"dbt"}, config_schema={"param": list, "model_name": str}) def my_op(context):     # Access resources and config     dbt = context.resources.dbt     param = context.op_config["param"]     model_name = context.op_config["model_name"]          # Your logic here...      # Return a result     return "result"
In this corrected syntax: - The
config_schema
is defined correctly with the types for each config field. - The
required_resource_keys
is a set of strings that specify the resource handles required by this op. - The
out
parameter is not needed in this case because by default, an op has a single output named
result
. If you want to specify the output definition explicitly, you can use the
Out
class, but it's not necessary unless you need to customize the output behavior. If you do want to customize the output, you would include the
Out
class within the body of the op definition, like this:
Copy code
python from dagster import op, Out  @op(required_resource_keys={"dbt"}, config_schema={"param": list, "model_name": str}) def my_op(context):     # Access resources and config     dbt = context.resources.dbt     param = context.op_config["param"]     model_name = context.op_config["model_name"]          # Your logic here...      # Return a result using the Output class     yield Out("result")
However, if you're using the
return
statement, you don't need to define
out
explicitly unless you have multiple outputs or need to customize the output. If you do have multiple outputs, you would define them in a dictionary passed to the
out
parameter, and use
yield
to emit each output:
Copy code
python from dagster import op, Out  @op(required_resource_keys={"dbt"}, config_schema={"param": list, "model_name": str}, out={"result": Out(), "another_result": Out()}) def my_op(context):     # Access resources and config     dbt = context.resources.dbt     param = context.op_config["param"]     model_name = context.op_config["model_name"]          # Your logic here...      # Yield multiple results     yield Out("result", value="first_result")     yield Out("another_result", value="second_result")
Remember to adjust your op's logic to match the configuration and resource requirements you've specified.
3 Views