Steven Schmatz
02/22/2024, 11:23 PMScout
02/22/2024, 11:23 PMpython from dagster_aws.s3.sensor import get_s3_keys @sensor(job=log_file_job) def my_s3_sensor(context): since_key = context.cursor or None new_s3_keys = get_s3_keys("my_s3_rfp_bucket", since_key=since_key) if not new_s3_keys: return SkipReason("No new s3 files found for bucket my_s3_rfp_bucket.") last_key = new_s3_keys[-1] run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys] context.update_cursor(last_key) return run_requests
For monitoring job progress, users can utilize Dagster's built-in UI, Dagit, which provides visibility into job runs, including their status and logs. Dagit serves as a central place for teams to monitor and interact with their data pipelines. If teams require additional functionality, such as uploading files to trigger runs, they might integrate Dagster with other tools or platforms. For example, they could use a tool like Retool to create a custom interface for uploading files to S3, and then rely on Dagster's sensors to detect those files and trigger the appropriate jobs. For more advanced monitoring or custom interactions, teams might also use Dagster's GraphQL API to programmatically interact with Dagster instances, allowing them to build custom dashboards or integrate with other monitoring tools. Ultimately, the choice of tools and approaches depends on the team's requirements, existing infrastructure, and preferences. Dagster is designed to be flexible and extensible, so it can integrate with a wide range of tools and workflows.