Hello, I have been trying to build a Sensor which ...
# ask-community
l
Hello, I have been trying to build a Sensor which watches for new additions to a GCS bucket and which then uses an IOManager to read those files into Python pickle objects, to transform them and to then push them to a folder in the bucket. Following the tutorials, I clearly see that the ConfigurablePickledObjectGCSIOManager from the dagster_gcp package will work for the output part. However, I have issues using the IOManager on the Input part of this pipeline. This is what I have been trying to do so far : 1. Build the following Sensor to watch for new additions to the bucket :
Copy code
@sensor(job=ktg_etl_job, minimum_interval_seconds=10)
def parse_bucket_for_new_files(context, gcs: GCSResource):
    timezone = ZoneInfo("Europe/Paris")
    <http://context.log.info|context.log.info>(context.cursor)
    last_run_time = datetime.strptime(context.cursor, '%Y-%m-%d %H:%M:%S.%f%z') if context.cursor else datetime.now(tz=timezone)
    max_creation_time = last_run_time
    for blob in gcs.list_blobs("test-bucket"):
        blob_name = blob.name
        file_creation_time = blob.time_created.astimezone(timezone)
        <http://context.log.info|context.log.info>(file_creation_time)
        <http://context.log.info|context.log.info>(last_run_time)
        if file_creation_time > last_run_time:
            run_key = f"{blob_name}:{str(file_creation_time)}"
            run_config = {"ops": {"process_gcs_file": {"config": {"filename": blob_name}}}}
            # run_config = RunConfig({"config": {"filename": blob_name}})
            yield RunRequest(run_key=run_key, run_config=run_config)
            max_creation_time = max(max_creation_time, file_creation_time)
    context.update_cursor(str(max_creation_time))
2. Load my IO manager :
Copy code
io_manager = ConfigurablePickledObjectGCSIOManager(gcs=gcs, 
                                gcs_bucket='test-bucket',
                                gcs_prefix='modified')
3. Define an op which loads the files in-memory using the IOManager (this is where I have issues finding what to load into the IOManager) :
Copy code
@op(ins=In(io_manager_key=io_manager))
def process_gcs_file(context: InputContext):
    <http://context.log.info|context.log.info>(context.config)
    ???
    return loaded_file
4. Use this op in a graph asset with my two assets (here, raw_delivery as an upstream asset and delivery as the final asset) :
Copy code
@graph_asset
def delivery(raw_delivery):
    return process_gcs_file(raw_delivery)
Here are the questions I have regarding this structure : 1. Does this look like a correct structure in your view ? 2. How should I build my op in order to grab the raw file which I can use in later assets ? 3. Is there a structure using only assets instead of a graph one ? 4. Should I build a custom IOManager for the buckets reading part ? Thanks in advance for your help !
🤖 1
j
hey @louis bertolotti this looks almost there! i think the main thing to clear up is the difference between an IO manager and a more general resource. I think in your case, what you actually want to use is a resource to get the file from the bucket. An IO manager is specifically meant to store what’s returned from an asset and load that return value in any downstream assets. I think you should just be able to use the GCSResource (like you did in the sensor) to get the files
Copy code
@op
def process_gcs_file(context: InputContext, gcs: GCSResource):
    file_name = context.op_config["filename"]
    file = gcs.<whatever_function_to_download_the_file>(file_name)
     transformed_file = transform_the_file(file)
     gcs.<whatever_function_to_upload_the_transformed_file>(transformed_file)
l
This is exactly what I needed, thanks for your help and for the clarification about the role of the IOManager !