Hey Dagster team! I would like your input on how ...
# ask-community
a
Hey Dagster team! I would like your input on how to best use Dagster for the following use-case: Description of the workflow We have a set of pipelines which process satellite images. These are satellite images come in .zip files, where each .zip contains between one and three images). We apply perprocessing and an ML algorithm to process each image, and update an index DB with the image metadata and saves paths, etc.. See figure below. Overview of our particular use-case We currently have four Dagster jobs which run each process: 1. Unzip job. (each zip file contains multiple images) 2. Preprocess job (needs to run for each image) 3. ML Algorithm job (needs to run for each image) 4. Index into image catalog (needs to run for each image) Important to note: • Each of the jobs contain multiple Dagster ops, and runtime can take 30-90 minutes per job, dependent on the image size • Due to robustness issues and the inherent complexities of satellite imagery, these jobs occasionally fail. (this was the motivation of splitting them into separate jobs) How have we tackled this? We want to run the entire process automatically every time a new
.zip
file is uploaded. This means running preprocess, ML algo and indexing jobs for each image contained in the .zip file. To do this, we materialize an asset at each step, and use a
multi_asset_sensor
to trigger the next job on asset materialization. This is indicated for the unzip -> preprocess step in the diagram, in red. We follow this approach to connect all the jobs in the diagram. Questions • Is this "_asset materialization -> asset sensor which trigger the next job_" approach the right way to go about this? The current flow feels quite clunky, and we often have sensors failing because of cursor issues or multiple asset materializations not being picked up (because they are materialized within quick succession, and the sensor only takes the last one) • How would this look using the software-defined assets paradigm? Would each image be considered a separate asset? How would you trigger a run for each of these assets then? • Alternatively, would it make sense to consider each "image" as a partition somehow, and use partitioned assets?
dagster bot responded by community 1
TL;DR: my question boils down to: how can we use "assets" when we have a growing list of assets which need to be processed with various jobs. All the examples I've found so far are using assets and materialization to update items to ensure they're up to date. However, how do you use assets when you have a growing list of images which you are processing ?
l
I am using a "generic" asset to materialize multiple paths, but I am running into a similar issue to you "we often have sensors failing because of cursor issues or multiple asset materializations not being picked up (because they are materialized within quick succession, and the sensor only takes the last one)" I am trying to establish a better run key for the sensor. Have you got any information for the sensor not kickjing off due to too quick succession? Thanks.
a
I'm using the
run_key
equal the full path to the image we're processing. However, this causes the sensor to
Skip
whenever I reprocess the same image (because the process was updated/needs re-running for some reason) I then tried adding a date-time to the
run_key
(
= f"{path_to_image}_{datetime}"
), but this causes multiple runs from other asset materializations in the past because now all the run keys were different 🤯 The behavior I'm looking for is: every time
import_job
completes, a subsequent set of job(s) is requested, depending on the files generated in the
import_job
. The cursor and run_key logic is making this very convoluted in this case.
Have you got any information for the sensor not kickjing off due to too quick succession?
E.g. if I materialize the "generic" asset three times within a 30 second time-frame, only the latest materialization yields a run request. The way around this was to use
multi_asset_sensors
, as recommended here, but this caused multiple runs to get triggered outside of the recent materialization scope because the run_key, cursor were in a state that didn't allow the runs to be skipped.