Hey, trying again :slightly_smiling_face: I’m loo...
# ask-community
o
Hey, trying again 🙂 I’m looking for an advice on how to structure a Dagster pipeline for the following case - we want to create a pipeline we can use for running ML models on parts of our historical data (list of time intervals, usually several hours for each interval, not a full day) with different parameters every time (for example, different version of the model or different configuration). • The request arrives as a configuration file with the list of intervals, and the model config • Each unit of work (time interval + model config) can take hours to run, and uses a lot of resources (CPU, memory, GPUs) • We want to run many of these time intervals in parallel Our current solution uses a sensor that reads the file from a bucket and creates many runs of a job that does the calculation (the sensor passes the time interval and model config as part of the run config). Each run ends up spinning a different k8s job with all the resources it needs and it all scales very nicely. However, managing it is sometimes challenging compared to a solution that uses partitions. For example, if sometimes fails (after several retries), we need to find the specific run that failed and re-execute. It would also be nice to be able to find all of the partitions that belong to a specific request (filename) and rerun the ones that failed. Is there a better way to structure this? Can I change the job to use a DynamicPartitionsDefinition (or a Multi Partition with dynamic partitions)? Would it be easier to implement this using Assets? I thought of having the sensor add a multi partition partition for each
request file|line number
(so I can later easily find all of the partitions belonging to a specific request file) Thanks!
🤖 1
s
Hi Oren, IIUC I see two possibilities for you: • Stick with ops and use a
DynamicPartitionsDefinition
containing the time interval-- we do have a
TimeWindowPartitionsDefinition
class, but it requires a cron schedule, and it sounds like your intervals are irregular. This solution is an incremental step from what you have and might be easier, but the benefits may be limited. • Switch to using assets with either
DynamicPartitionsDefinition
or
MultiPartitionsDefinition
. Assets are the future of Dagster so this is in principle more desirable, but the current asset model might not have the flexibility to accommodate you. What does a run of your job look like-- are there a lot of different ops?
o
Thanks Sean! Yes, I think that for a job, a DynamicPartitionsDefinition would work better. It’s not just that the time intervals are irregular, but there’s also the model version dimension (we run different version of the model for the same date so we can compare some of the results). The current job has 4 steps - 1. read list of images (for the given time range), 2. download the images, 3. run the model on the images, and 4. write results to DB. Packing these into a single OP to make life easier for the asset might not be too bad, and I could use IO Managers instead of some of the steps. Is there an example of a
MultiPartitionsDefinition
with only dynamic partitions? Anything I need to know about it, or it’s just a matter of creating the correct MultiPartitionKeys from the sensor?
s
For your above listed steps it doesn’t sound like you have a meaningful data intermediate so I think a single multi-dimensionally partitioned asset with a time interval and model version partition is the way to go.
Is there an example of a MultiPartitionsDefinition with only dynamic partitions?
I’m not aware of one, but cc @claire for confirmation.
c
Hi Oren. Here's a simple example:
Copy code
@asset(
    partitions_def=MultiPartitionsDefinition(
        {"a": DynamicPartitionsDefinition(name="a"), "b": DynamicPartitionsDefinition(name="b")}
    )
)
def multipartitioned_with_dynamic_dimensions():
    ...
Multipartitioned assets with dynamic dimensions function the same as other multipartitioned assets, just like you mentioned you'll need to create the correct `MultiPartitionKey`s
👍 1