:wave: Hello, everyone! I’m new to Dagster and I’...
# ask-community
👋 Hello, everyone! I’m new to Dagster and I’m trying to figure out the best way to tackle my (likely unconventional) data pipeline. Briefly, I have a pipeline for processing the files in a manifest, and the number of manifests will grow over time. Each manifest should be processed only once unless it’s updated. I’m wondering how to best handle the dynamic nature of my inputs (i.e. the file manifests). 🧵 I can elaborate a bit more in the thread.
The idea of using software-defined assets (SDAs) sounds appealing because I could reconcile downstream assets if a manifest is updated. However, I realized that assets don’t currently support dynamic partitions.
I could use
in a dynamically partitioned non-asset job, but I lose the useful data lineage of SDAs.
Is there an elegant way of having an “asset pipeline” and update the list of expected assets dynamically, perhaps using a separate scheduled job?
hi @Bruno Grande! can you say a little more about what you view as the assets in this scenario? Is it basically one set of assets per file, like (original file) -> (processed file asset) -> (even more processed file asset)? Or is there some merging of these processed files that happens somewhere? There's not currently an elegant solution (although we definitely do want to support this use case in the future), but one roundabout idea that came to mind (based off of your "separate scheduled job" comment) was that you could have a sneaky
that actually reads the set of partition keys from a database, i.e.
Copy code
def get_partitions_def():
    all_filenames = call_to_database()
    return StaticPartitionsDefinition(all_filenames)

my_partitions = get_partitions_def()
You could have a separate job that updates the contents of the database so that it stays somewhat up to date. This isn't really a recommended pattern, because it generally means that every time you import this code, you'll need to make a call to a database, so you'd want to make sure that this was a pretty fast call (and probably cache the result).
Thanks for getting back to me, @owen! For additional context, this is intended for a bioinformatics project, where data tends to be stored as files instead of in databases. Here’s the general flow of data for each manifest file (CSV): 1. Download manifest from data repository using authenticated request 2. Split manifest into logical groups (to enable parallel processing downstream) 3. For each manifest chunk: a. Stage the files on the target compute platform b. Kick off a remote processing job on the target compute platform 4. Collate the output files for each processing job 5. Transfer these output files back into the original data repository I don’t know if it makes sense to make an asset for the data/files associated each instance of these steps. I might also be overestimating the value of assets for my use case. Ultimately, what matters most is making sure that each manifest is processed at most once per update (although I’m not expecting many updates per manifest, if any). I could use memoization, but I don’t know if that’s a conventional use of that feature.
interesting -- how important is it for your use case to have each manifest chunk represented as a separate asset? If you modeled steps 1-4 as 1 large source asset (the data repository with all the manifests), then one large output asset (the collated output files), would this do what you want? I was thinking you could potentially use a graph-backed asset (whose graph is dynamic ) to handle the parallelizing of the chunks, then collect the results together at the end to form a single asset.
I don’t think I need to represent the manifest chunks as separate assets. Just to make sure I understand what you’re proposing, I would have two assets: • An asset for a manifest ◦ This would be backed by an
that downloads the file from the data repository • An asset for the set of processed outputs from all manifest chunks ◦ This would be backed by a dynamic graph, which would handle the splitting of the manifest and the submission of remote processing jobs ◦ This asset would depend on the first one I think this would help achieve what I’m looking for because if the manifest is updated, then I would want to re-materialize the second asset. Do you know if there’s an easy way to use the file checksum (e.g. MD5) to determine whether it’s “out-of-date”? Or does Dagster only currently determine “out-of-dateness” based on whether upstream assets have been re-materialized or not. I wonder if I could use the asset definition’s metadata for this. 🤔
currently, "out-of-dateness" is just based on upstream materializations yeah. I think it would be somewhat hard to use the asset definition metadata for that, just because that's generally set in code (so you wouldn't be able to easily update it automatically). You could attach the hash as part of the metadata on the Output from your asset (one way to do this is
context.add_output_metadata({"hash": my_hash})
). This method will add it to a particular materialization's metadata. You could then query this value in the next run of the asset with
context.instance.get_asset_events(asset_key=my_asset_key, limit=1)
. This event should have that metadata on it somewhere.
and yeah what you described was what I was imagining
Chiming in because this "rematerialise only if underlying data has changed" is something I'd very much like a polished way of achieving as well. Currently evaluating the metadata/hash method as way to get there also