I have a scheduled job which is a partitioned job....
# ask-ai
t
I have a scheduled job which is a partitioned job. Downstream, I have another job which is triggered via a run_status_sensor once the partitioned job has successfully run. How can I access the partition key in the sensor to pass this into the RunRequest?
s
To access the partition key in a
run_status_sensor
and pass it into a
RunRequest
, you can utilize the
tags
attribute of the
DagsterRun
object associated with the run that the sensor is monitoring. The partition key is stored as a tag with the key
dagster/partition
when a partitioned job is run. Here's an example of how you can access the partition key within a `run_status_sensor`:
Copy code
python from dagster import run_status_sensor, RunRequest, DagsterRunStatus  @run_status_sensor(run_status=DagsterRunStatus.SUCCESS, monitored_jobs=[my_partitioned_job]) def my_run_status_sensor(context):     partition_key = context.dagster_run.tags.get("dagster/partition")     if partition_key:         return RunRequest(             run_key=context.dagster_run.run_id,  # or some other run key logic             run_config={},  # your run config here             tags={"dagster/partition": partition_key}  # pass the partition key as a tag         )     else:         # Handle the case where the partition key is not found         ...
In this example,
my_partitioned_job
is the partitioned job that the sensor is monitoring. When the job completes successfully, the sensor function
my_run_status_sensor
is invoked. Inside the function,
context.dagster_run.tags.get("dagster/partition")
is used to retrieve the partition key from the tags of the run that just completed. This partition key is then used to create a
RunRequest
object, which can be used to trigger another job with the same partition context. Please note that the
run_config
and any additional logic for handling cases where the partition key is not found would need to be implemented according to your specific requirements.
t
@Scout Is it possible to trigger a partitioned job in a sensor that monitors a non-partitioned job?
s
Yes, it is possible to trigger a partitioned job in a sensor that monitors a non-partitioned job. You can use a sensor to monitor the status of any job, and upon a certain condition (like successful completion), you can trigger another job, including a partitioned one. However, since the originating job is not partitioned, you won't have a partition key to pass directly. Instead, you would need to determine the appropriate partition key for the downstream partitioned job based on your business logic. This could be based on the current date, the data processed by the non-partitioned job, or any other relevant criteria. Here's a conceptual example of how you might trigger a partitioned job from a sensor monitoring a non-partitioned job:
Copy code
python from dagster import sensor, RunRequest  @sensor(job=my_non_partitioned_job) def my_sensor(context):     # Logic to determine the partition key for the downstream job     # This could be based on time, data outputs, or other factors     partition_key = determine_partition_key_based_on_logic()      # Create a RunRequest for the partitioned job with the determined partition key     return RunRequest(         run_key=f"run-for-partition-{partition_key}",  # Unique identifier for the run         run_config={},  # Your run config for the partitioned job         tags={"dagster/partition": partition_key}  # Pass the partition key as a tag     )  def determine_partition_key_based_on_logic():     # Implement your logic here to determine the partition key     ...
In this example,
my_non_partitioned_job
is the job being monitored by the sensor. The
determine_partition_key_based_on_logic
function is a placeholder for your logic to compute the partition key for the downstream partitioned job. Once you have the partition key, you create a
RunRequest
for the partitioned job, including the partition key in the tags. Remember that the actual implementation of
determine_partition_key_based_on_logic
will depend on how your partitions are defined and what criteria you use to map runs of the non-partitioned job to partitions of the downstream job.