Sbrisny
02/09/2024, 3:29 PMScout
02/09/2024, 3:29 PMpython from dagster_aws.s3.sensor import get_s3_keys @sensor(job=my_job) def my_s3_sensor(context): since_key = context.cursor or None new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key) if not new_s3_keys: return SkipReason("No new s3 files found for bucket my_s3_bucket.") last_key = new_s3_keys[-1] run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys] context.update_cursor(last_key) return run_requests
In this example, my_s3_sensor
is a sensor that polls an S3 bucket named my_s3_bucket
for new files. If new files are found, it creates a RunRequest
for each new file, which can trigger a job run. The since_key
is used as a cursor to keep track of the last processed S3 key, so that each sensor evaluation only considers new files. If no new files are found, the sensor returns a SkipReason
. Remember to replace my_job
with the actual job you want to trigger with this sensor.Scout
02/09/2024, 3:29 PMpython
from dagster_aws.s3.sensor import get_s3_keys
@sensor(job=my_job)
def my_s3_sensor(context):
since_key = context.cursor or None
new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)
if not new_s3_keys:
return SkipReason("No new s3 files found for bucket my_s3_bucket.")
last_key = new_s3_keys[-1]
run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
context.update_cursor(last_key)
return run_requests
In this example, my_s3_sensor
is a sensor that checks for new files in the specified S3 bucket ("my_s3_bucket"
). It uses the get_s3_keys
function to retrieve a list of new S3 keys since the last key that was processed (tracked by context.cursor
). If new files are found, it creates a RunRequest
for each new file, which can trigger a job run. The sensor then updates the cursor to the last processed key to ensure idempotence in subsequent sensor evaluations.
Remember to replace my_job
with the actual job you want to trigger with this sensor. Sensors | Dagster Docs button Dagster Example for Demonstrating the In... button Thank you for your feedback!Sbrisny
02/09/2024, 3:36 PMScout
02/09/2024, 3:36 PMRunRequest
is a request to launch a run for a job. If a RunRequest
fails or if you want to reprocess data, you might want to rerun it. However, rerunning a RunRequest
from a sensor isn't as straightforward as rerunning a job directly because sensors are designed to trigger runs based on external changes or events, such as the appearance of new files in an S3 bucket.
If you want to rerun a job that was previously triggered by a sensor (due to a failure or other reason), you would typically do this by manually triggering the job from the Dagit UI or via the Dagster API, ensuring that the job's run config is set up to process the desired data.
If you want to automate the rerun process in response to a job failure, you could consider using a run_failure_sensor
. This type of sensor can be set up to listen for job failures and trigger a new run. However, as of the last update from the documentation, run failure sensors do not support yielding RunRequest
objects directly. Instead, you might need to use the Dagster API to create a new run programmatically within the sensor.
Here's a conceptual example based on the documentation's guidance on how you might set up a sensor to trigger a rerun:
python
from dagster import sensor, RunRequest, SkipReason
from dagster.core.definitions.run_request import JobType
from dagster.core.storage.pipeline_run import PipelineRunsFilter, RunStatus
@sensor(job=my_job)
def rerun_failed_s3_sensor(context):
# Get the last failed run for the job
failed_runs = context.instance.get_runs(
filters=PipelineRunsFilter(
statuses=[RunStatus.FAILURE],
tags={"s3-sensor": "true"}, # Assuming you tag your runs from the S3 sensor
),
limit=1,
)
if not failed_runs:
return SkipReason("No failed run to retry.")
failed_run = failed_runs[0]
# Check if the failed run was already retried
if "retry_number" in failed_run.tags:
return SkipReason("The failed run was already retried.")
# Create a new RunRequest to retry the failed run
return RunRequest(
run_key=failed_run.run_id, # Use the failed run ID as the run key for idempotence
run_config=failed_run.run_config, # Use the same run config as the failed run
tags={"retry_number": "1"}, # Tag the new run as a retry
)
In this example, rerun_failed_s3_sensor
checks for the last failed run of a job that was tagged as triggered by the S3 sensor. If it finds such a run, it creates a RunRequest
to retry the run with the same run config. It also adds a tag to indicate that this is a retry.
Please note that this is a conceptual example and may require adjustments to fit your specific use case and Dagster version. Always refer to thScout
02/09/2024, 3:37 PM