How to create a sensor for Azure Blob Storage that...
# ask-ai
c
How to create a sensor for Azure Blob Storage that checks a specific container a folder?
s
To create a sensor for Azure Blob Storage that checks a specific container and folder, you would typically use the
@sensor
decorator to define a sensor function that polls the Azure Blob Storage for changes. However, the documentation does not provide a specific example for Azure Blob Storage sensors. That said, based on the general pattern for writing sensors in Dagster, you can create a sensor by following these steps: 1. Define a sensor function using the
@sensor
decorator. 2. Within the sensor function, use the Azure Blob Storage SDK to list blobs in a specific container and folder. 3. Yield a
RunRequest
for each new or updated blob that you want to trigger a job run for. Here's a conceptual example based on the pattern provided in the documentation for creating sensors:
Copy code
python from dagster import sensor, RunRequest from azure.storage.blob import BlobServiceClient, ContainerClient  # Initialize the BlobServiceClient with your Azure Storage Account blob_service_client = BlobServiceClient.from_connection_string("your_connection_string")  @sensor(job=your_dagster_job) def azure_blob_storage_sensor(context):     container_client = blob_service_client.get_container_client("your_container_name")     blobs_list = container_client.list_blobs(name_starts_with="your_folder_name/")      for blob in blobs_list:         # Implement your logic to check for new or updated blobs         # For example, you could compare the last modified time of the blob with a stored timestamp         if <condition_to_trigger_run>:             yield RunRequest(                 run_key=blob.name,                 run_config={                     # Your run config here                 },             )
In this example, replace
"your_connection_string"
,
"your_container_name"
, and
"your_folder_name/"
with your actual Azure Blob Storage connection string, container name, and folder path, respectively. Also, replace
your_dagster_job
with the actual job you want to trigger. The
<condition_to_trigger_run>
should be replaced with the logic you want to use to determine whether to trigger a run, such as checking if the blob's last modified time is more recent than the last time the sensor ran. Please note that this is a conceptual example and not a direct code snippet from the documentation. You will need to adapt this pattern to fit your specific use case and handle any Azure-specific logic for interacting with Blob Storage.
c
@Scout Implement the if logic using last modified and provide run_config for the RunRequest for your_dagster_job.
s
As a friendly reminder, the following is a conceptual example and not a direct code snippet from the documentation. You will need to adapt this pattern to fit your specific use case, including the implementation details for interacting with Azure Blob Storage and the specifics of your Dagster job's run configuration. Here's how you might implement the sensor logic to check for blobs that have been modified since the last time the sensor ran, and provide a
run_config
for the `RunRequest`:
Copy code
python
from dagster import sensor, RunRequest
from azure.storage.blob import BlobServiceClient
from datetime import datetime, timezone

# Initialize the BlobServiceClient with your Azure Storage Account
blob_service_client = BlobServiceClient.from_connection_string("your_connection_string")

@sensor(job=your_dagster_job)
def azure_blob_storage_sensor(context):
    container_client = blob_service_client.get_container_client("your_container_name")
    blobs_list = container_client.list_blobs(name_starts_with="your_folder_name/")

    # Retrieve the last recorded run time from sensor state or use a default
    last_checked_time = context.cursor or datetime.min.replace(tzinfo=timezone.utc).isoformat()

    for blob in blobs_list:
        # Parse the last modified time of the blob
        blob_last_modified = blob['properties']['last_modified']

        # Check if the blob was modified since the last time the sensor checked
        if blob_last_modified > datetime.fromisoformat(last_checked_time):
            # Update the last_checked_time to the most recent modification time
            context.update_cursor(blob_last_modified.isoformat())

            # Define the run config for the job
            run_config = {
                # Your run config here
                # Example:
                "solids": {
                    "process_blob": {
                        "config": {
                            "blob_name": blob.name,
                            "container_name": "your_container_name",
                        }
                    }
                }
            }

            # Yield a RunRequest to trigger a job run with the specified run config
            yield RunRequest(
                run_key=blob.name,
                run_config=run_config,
            )
In this example,
your_dagster_job
should be replaced with the actual job you want to trigger. The
run_config
dictionary should be structured according to the configuration schema of your job. The
process_blob
solid is a hypothetical solid that would process the blob; you should replace this with the actual solid or op names and configuration that your job requires. The
context.cursor
is used to store the last time the sensor checked for modifications. This value is updated with t
he most recent
blob_last_modified
time after a
RunRequest
is yielded. This ensures that the next time the sensor runs, it only considers blobs modified after the last recorded time. Please ensure that you have the necessary error handling, logging, and any other required Azure Blob Storage SDK interactions to make this example work in your environment.
c
@Scout How would this be implemented if
your_dagster_job
has a partition, e.g. daily_partition?
s
In Dagster, if
your_dagster_job
is a partitioned job, you would typically trigger runs for specific partitions based on the data you're processing. For a daily partitioned job, you would map the blobs to the corresponding partition that represents the day they belong to. Here's a conceptual example of how you might implement a sensor for a daily partitioned job that processes blobs from Azure Blob Storage:
Copy code
python
from dagster import sensor, RunRequest, PartitionSetDefinition
from azure.storage.blob import BlobServiceClient
from datetime import datetime, timezone, timedelta

# Initialize the BlobServiceClient with your Azure Storage Account
blob_service_client = BlobServiceClient.from_connection_string("your_connection_string")

# Define your partition set
daily_partition_set = PartitionSetDefinition(
    name="daily_partition_set",
    pipeline_name="your_dagster_job",
    partition_fn=lambda: [datetime.utcnow().date() - timedelta(days=i) for i in range(10)],  # last 10 days
    run_config_fn_for_partition=lambda partition: {
        # Your run config here
        # Example:
        "solids": {
            "process_blob": {
                "config": {
                    "date": partition.value.isoformat(),
                }
            }
        }
    },
)

@sensor(job=daily_partition_set)
def azure_blob_storage_sensor(context):
    container_client = blob_service_client.get_container_client("your_container_name")
    blobs_list = container_client.list_blobs(name_starts_with="your_folder_name/")

    # Retrieve the last recorded run time from sensor state or use a default
    last_checked_time = context.cursor or datetime.min.replace(tzinfo=timezone.utc).isoformat()

    for blob in blobs_list:
        # Parse the last modified time of the blob
        blob_last_modified = blob['properties']['last_modified']

        # Check if the blob was modified since the last time the sensor checked
        if blob_last_modified > datetime.fromisoformat(last_checked_time):
            # Update the last_checked_time to the most recent modification time
            context.update_cursor(blob_last_modified.isoformat())

            # Determine the partition for the blob based on its last modified date
            blob_partition_date = blob_last_modified.date()
            partition = daily_partition_set.get_partition_for_date(blob_partition_date)

            if partition:
                # Yield a RunRequest to trigger a job run for the specific partition
                yield RunRequest(
                    run_key=f"{blob.name}:{blob_partition_date}",
                    run_config=daily_partition_set.run_config_for_partition(partition),
                )
In this example,
daily_partition_set
is a
PartitionSetDefinition
that defines how to partit
ion your job runs. The
partition_fn
generates the partition keys (in this case, dates for the last 10 days), and
run_config_fn_for_partition
generates the run configuration for each partition. The sensor checks each blob's last modified time and determines the corresponding partition based on that date. If a partition exists for that date, it yields a
RunRequest
with the run configuration for that partition. Please note that this is a conceptual example and you will need to adapt it to fit your specific use case. You should ensure that the partitioning logic matches how your data and job are structured. Additionally, you may need to handle edge cases, such as time zone differences or blobs that don't neatly fit into a daily partition.