https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

Sbrisny

02/09/2024, 3:29 PM
@Scout it is possible to Build a Sensor on a aws s3 bucket? Show me an example?
s

Scout

02/09/2024, 3:29 PM
Yes, it is possible to build a sensor on an AWS S3 bucket. Here's an example from the documentation:
Copy code
python from dagster_aws.s3.sensor import get_s3_keys  @sensor(job=my_job) def my_s3_sensor(context):     since_key = context.cursor or None     new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)     if not new_s3_keys:         return SkipReason("No new s3 files found for bucket my_s3_bucket.")     last_key = new_s3_keys[-1]     run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]     context.update_cursor(last_key)     return run_requests
In this example,
my_s3_sensor
is a sensor that polls an S3 bucket named
my_s3_bucket
for new files. If new files are found, it creates a
RunRequest
for each new file, which can trigger a job run. The
since_key
is used as a cursor to keep track of the last processed S3 key, so that each sensor evaluation only considers new files. If no new files are found, the sensor returns a
SkipReason
. Remember to replace
my_job
with the actual job you want to trigger with this sensor.
Yes, it is possible to build a sensor on an AWS S3 bucket in Dagster. Here's an example from the documentation:
Copy code
python
from dagster_aws.s3.sensor import get_s3_keys

@sensor(job=my_job)
def my_s3_sensor(context):
    since_key = context.cursor or None
    new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)
    if not new_s3_keys:
        return SkipReason("No new s3 files found for bucket my_s3_bucket.")
    last_key = new_s3_keys[-1]
    run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
    context.update_cursor(last_key)
    return run_requests
In this example,
my_s3_sensor
is a sensor that checks for new files in the specified S3 bucket (
"my_s3_bucket"
). It uses the
get_s3_keys
function to retrieve a list of new S3 keys since the last key that was processed (tracked by
context.cursor
). If new files are found, it creates a
RunRequest
for each new file, which can trigger a job run. The sensor then updates the cursor to the last processed key to ensure idempotence in subsequent sensor evaluations. Remember to replace
my_job
with the actual job you want to trigger with this sensor. Sensors | Dagster Docs button Dagster Example for Demonstrating the In... button Thank you for your feedback!
s

Sbrisny

02/09/2024, 3:36 PM
@Scout could i rerun a s3 Sensor runrequest?
s

Scout

02/09/2024, 3:36 PM
In Dagster, a sensor's
RunRequest
is a request to launch a run for a job. If a
RunRequest
fails or if you want to reprocess data, you might want to rerun it. However, rerunning a
RunRequest
from a sensor isn't as straightforward as rerunning a job directly because sensors are designed to trigger runs based on external changes or events, such as the appearance of new files in an S3 bucket. If you want to rerun a job that was previously triggered by a sensor (due to a failure or other reason), you would typically do this by manually triggering the job from the Dagit UI or via the Dagster API, ensuring that the job's run config is set up to process the desired data. If you want to automate the rerun process in response to a job failure, you could consider using a
run_failure_sensor
. This type of sensor can be set up to listen for job failures and trigger a new run. However, as of the last update from the documentation, run failure sensors do not support yielding
RunRequest
objects directly. Instead, you might need to use the Dagster API to create a new run programmatically within the sensor. Here's a conceptual example based on the documentation's guidance on how you might set up a sensor to trigger a rerun:
Copy code
python
from dagster import sensor, RunRequest, SkipReason
from dagster.core.definitions.run_request import JobType
from dagster.core.storage.pipeline_run import PipelineRunsFilter, RunStatus

@sensor(job=my_job)
def rerun_failed_s3_sensor(context):
    # Get the last failed run for the job
    failed_runs = context.instance.get_runs(
        filters=PipelineRunsFilter(
            statuses=[RunStatus.FAILURE],
            tags={"s3-sensor": "true"},  # Assuming you tag your runs from the S3 sensor
        ),
        limit=1,
    )

    if not failed_runs:
        return SkipReason("No failed run to retry.")

    failed_run = failed_runs[0]

    # Check if the failed run was already retried
    if "retry_number" in failed_run.tags:
        return SkipReason("The failed run was already retried.")

    # Create a new RunRequest to retry the failed run
    return RunRequest(
        run_key=failed_run.run_id,  # Use the failed run ID as the run key for idempotence
        run_config=failed_run.run_config,  # Use the same run config as the failed run
        tags={"retry_number": "1"},  # Tag the new run as a retry
    )
In this example,
rerun_failed_s3_sensor
checks for the last failed run of a job that was tagged as triggered by the S3 sensor. If it finds such a run, it creates a
RunRequest
to retry the run with the same run config. It also adds a tag to indicate that this is a retry. Please note that this is a conceptual example and may require adjustments to fit your specific use case and Dagster version. Always refer to th
e latest Dagster documentation for the most accurate and up-to-date information on implementing sensors and handling retries.
7 Views