https://dagster.io/ logo
#ask-community
Title
# ask-community
k

KekronBekron

02/05/2023, 6:39 AM
Hello, I've a question about a file processing pattern. Say I have 100+ files with names like <sysname>.<domain>.some.thing where there are a dozen possible <sysname>, a dozen possible <domain>. In the asset graph, I want to group assets together by <domain>, which, as you see, can be retrieved from the names of files. Currently, I'm re-processing all files even if just one upstream file changes, through the same jupy-notebook. Am considering breaking things out, so that if just a file in <domain1> is updated (timestamp modified), I run the jupy-notebook to process domain1 files. In the end, I can run another script/nb to combine the csv files generated by each of the above domains. In my case, it's fine to rerun the whole thing, it only takes a few mins anyway. But I want to use this opportunity to get a structured solution and showcase dagster. Suggestions, please?
🤖 1
s

Sean Davis

02/05/2023, 11:06 PM
You might want to look at static partitions; you could use domain as a partition.
k

KekronBekron

02/06/2023, 2:14 AM
Hi Sean, I'm not sure I follow the example given here - https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#defining-a-job-with-static-partitions Although it looks like what I'm after, I don't fully get the example, I'm afraid.
s

sean

02/06/2023, 4:13 PM
Hi Kekron, thanks for your question-- Sean is right that partitions are a good solution here. If you create a job (or asset representing your CSV file) with an associated
PartitionsDefinition
, where each partition corresponds to one of your domains, then you can process only the files for a specific domain in each run of the job. I highly recommend reading the full “Partitions assets and jobs” concept page to get a sense of what is possible: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions
k

KekronBekron

02/07/2023, 10:29 AM
Thank you, I'll read up 👍
Hi, congrats on the 1.2 release. I see that Dynamically Partitioned Assets is available now, which seems like a good candidate for my requirement.
However, reading the example is only irritating me. I don't understand at all what this is supposed to mean: What does this mean?
if not context.instance.has_dynamic_partition(images_partitions_def.name, img_filename)
As someone new to the data orchestration world (no prior Airflow experience), at least I feel that there are way too many keywords to remember.
s

sean

03/13/2023, 4:47 PM
Hi Kekron, Let me break that line down for you: • the
context
is the
SensorEvaluationContext
• it gives you access to the Dagster Instance via
instance
. The
instance
provides APIs for all persisted data in Dagster • there is an instance method called
has_dynamic_partition
on the instance. You can use this to check if a partition already exists for a particular partitions definition (in this case,
images_partitions_def
). This feature is still under active development, so your feedback is appreciated.
k

KekronBekron

03/14/2023, 2:29 AM
Hi Sean, Thanks for writing back with a clarification. I understood this after looking at it for 5 minutes (it's entirely possible that I'm slow with code in general). What annoyed me was that a lot of the things refer back to some definition or the other (which is elsewhere); so jumping b/w things made it frustrating. So, image_partition_defs defines a dynamic partitioning instance.
But then what decides what the dynamic partitioning is based on? I see the answer is img_filename, but where is that defined/decided?
Can it not be something as simple as a glob, hiding the if not.context.instance.has_dynamic_partition under the covers? DynamicPartitionsDefinition(name="some-grouping", partition-by="<method>, <param-for-method>") Ex: DynamicPartitionsDefinition(name='systems', partition-by=(glob, 'filename-2023-05-*.csv'))
BTW, apologies for the lack of proper code formatting. Paste, for some reason, isn't behaving well for me in slack.
s

sean

03/14/2023, 8:20 PM
But then what decides what the dynamic partitioning is based on? I see the answer is img_filename, but where is that defined/decided?
img_filename
is just a local variable in the list comprehension that defines
new_images
--
images_partitions_def
has no reference to it. In fact the definition of
images_partitions_def
is dead simple:
Copy code
images_partitions_def = DynamicPartitionsDefinition(name="images")
Basically it’s just a container for arbitrary string partition keys. In this case, we happen to be filling it with keys derived from the filenames found inside the
MY_DIRECTORY
directory, but
images_partitions_def
knows nothing about this-- it’s more or less just a set of strings.