https://dagster.io/ logo
#ask-community
Title
# ask-community
i

iridium

02/16/2024, 1:16 PM
Copy code
@job
def my_job():
    process_file()
    print("My job is running!")



@sensor(job=my_job, minimum_interval_seconds=10)
def my_s3_sensor(context):
    context.log.debug("Checking for new s3 files.")
    since_key = context.cursor or None
    new_s3_keys = get_s3_keys("jli-backup", since_key=since_key)
    if not new_s3_keys:
        return SkipReason("No new s3 files found for bucket my_s3_bucket.")
    last_key = new_s3_keys[-1]
    run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
    context.update_cursor(last_key)
    return run_requests