https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Alexander Buck

08/23/2023, 8:17 PM
I’m struggling to understand what the high-level architecture for the following task would be in Dagster: - My data consists of a series of folders with a common structure and several files within it. Folders get added periodically over time. - As new folders are added, I need to process the files within each folder, appending values to database tables. - This processing is done by reading a summary file, which defines the full list of other files to process. - Process all of those files through a second job definition to materialize further assets. With that background, this is how I think I would do it. The arrows (->) indicate what Dagster tool I think I should be using: 1. Scan object storage (filesystem or S3, doesn’t matter) for a new file named
Summary
-> Use a
@sensor
2. Start a job with that file that creates several assets -> Create a
job
that the
@sensor
triggers with the file in the
RunConfig
and an appropriate
run_key
that identifies this particular
Summary
file from other summary files that exist in other folders. 3. One of those assets is a list of other files. Start an instance of a given job for every file in this list. -> ?? Not sure how an asset can trigger another job. (
@asset_sensor?
). 1. These jobs can fail because those other may not yet exist, i.e. the file upload process is ongoing and these files may not have copied yet. I’d like this to keep retrying until they do exist. 2. The assets in this second job set are all dependent on assets materialized in the first job. I just don’t know how to do step 3. If this is not at all the right way of tackling this problem with Dagster, I’m open to suggestions on how to re-think this task.
@asset_sensor
doesn’t seem like the right fit, because it runs once for a new materialization. If the additional files don’t yet exist (i.e. they’re still copying into the object storage) then their RunRequests will fail and never get re-attempted.
j

jamie

08/23/2023, 8:31 PM
hey @Alexander Buck - based on my understanding, here’s what i would do 1. start with a sensor that scans object store for new summary files - exactly what you had for step 1 2. similar to what you have for 2, have the sensor trigger a job that will take the info from the summary files and append the data to the various database tables. i’m not sure using assets here makes much sense, but i could be missing something from your overall problem setup 3. if you then need to run additional jobs on the database tables, you could use another sensor to trigger the additional jobs when the first job (in step 2) succeeds. You could do this with a run status sensor https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#run-status-sensors
blob cheer 1
a

Alexander Buck

08/23/2023, 8:57 PM
Oh I didn't know about
@run_status_sensor
I'll give that a look! Thanks! I think using assets with IO managers to handle the database transactions should be the same as an op that interacts with the table directly. Writing the list of subsequent files to parse to a database was an idea a coworker had. A second sensor would query that table and try to dispatch jobs on those. That run status sensor could come in handy here so I can update that table when a job runs successfully and the file is no longer needed in the table!
3 Views