Todd de Quincey
03/06/2024, 4:23 PMScout
03/06/2024, 4:23 PMrun_status_sensor
and pass it into a RunRequest
, you can utilize the tags
attribute of the DagsterRun
object associated with the run that the sensor is monitoring. The partition key is stored as a tag with the key dagster/partition
when a partitioned job is run. Here's an example of how you can access the partition key within a `run_status_sensor`: python from dagster import run_status_sensor, RunRequest, DagsterRunStatus @run_status_sensor(run_status=DagsterRunStatus.SUCCESS, monitored_jobs=[my_partitioned_job]) def my_run_status_sensor(context): partition_key = context.dagster_run.tags.get("dagster/partition") if partition_key: return RunRequest( run_key=context.dagster_run.run_id, # or some other run key logic run_config={}, # your run config here tags={"dagster/partition": partition_key} # pass the partition key as a tag ) else: # Handle the case where the partition key is not found ...
In this example, my_partitioned_job
is the partitioned job that the sensor is monitoring. When the job completes successfully, the sensor function my_run_status_sensor
is invoked. Inside the function, context.dagster_run.tags.get("dagster/partition")
is used to retrieve the partition key from the tags of the run that just completed. This partition key is then used to create a RunRequest
object, which can be used to trigger another job with the same partition context. Please note that the run_config
and any additional logic for handling cases where the partition key is not found would need to be implemented according to your specific requirements.Todd de Quincey
03/06/2024, 4:40 PMScout
03/06/2024, 4:40 PMpython from dagster import sensor, RunRequest @sensor(job=my_non_partitioned_job) def my_sensor(context): # Logic to determine the partition key for the downstream job # This could be based on time, data outputs, or other factors partition_key = determine_partition_key_based_on_logic() # Create a RunRequest for the partitioned job with the determined partition key return RunRequest( run_key=f"run-for-partition-{partition_key}", # Unique identifier for the run run_config={}, # Your run config for the partitioned job tags={"dagster/partition": partition_key} # Pass the partition key as a tag ) def determine_partition_key_based_on_logic(): # Implement your logic here to determine the partition key ...
In this example, my_non_partitioned_job
is the job being monitored by the sensor. The determine_partition_key_based_on_logic
function is a placeholder for your logic to compute the partition key for the downstream partitioned job. Once you have the partition key, you create a RunRequest
for the partitioned job, including the partition key in the tags. Remember that the actual implementation of determine_partition_key_based_on_logic
will depend on how your partitions are defined and what criteria you use to map runs of the non-partitioned job to partitions of the downstream job.