https://dagster.io/ logo
#ask-community
Title
# ask-community
g

geoHeil

05/15/2022, 7:49 AM
How can I handle a data asset in dagster which is partitioned - but there is no clear notion i.e. date when an update occurs? For example, let`s assume some external (public) dataset like https://data.world/buffalony/ru4s-wz29 or https://data.buffalony.gov/Quality-of-Life/Recyclable-Materials/ru4s-wz29 should be polled for changes (once every day/week/whatever) and if fresh data is available, the data is downloaded and stored in a data warehouse for this specific asset in a new partition. I.e. how can I harmonize the notion of a partitioned asset with the daily trigger to check for updates and an asset which might update only every now-and-then?
s

sandy

05/16/2022, 3:28 PM
Hey @geoHeil - to help make this concrete for me, do you have an example of what one of the partition names could be in this kind of scenario?
g

geoHeil

05/16/2022, 9:29 PM
Sure! To stick to the example above I will refer to the same dataset (even though the described use case should be more generically applicable). Assuming the dataset contains some metadata (for the particular example http://api.us.socrata.com/api/catalog/v1?ids=ru4s-wz29 in the key
updatedAt
). The metadata should be polled on some schedule (let`s assume daily). In case the
updatedAt
is: - larger/newer/unseen compared to the last successful run, the actual dataset will be extracted from a source system (https://data.buffalony.gov/api/views/ru4s-wz29/rows.csv?&accessType=DOWNLOAD) and stored into a data warehouse - if the asset was not yet/never materialized simply take the current value as the starting partition - unchanged/equal to the last point in time then do not trigger a data update Thus the partition names would be the concrete timestamps (let`s assume dates for simplicity) of the
updatedAt
key.
s

sandy

05/16/2022, 10:41 PM
That makes sense. We don't currently support "runtime" partitions. I.e. you have to define the set of partitions in the definition and can't add partitions when you kick off runs. Let me check with the team about how difficult this would be to add.
g

geoHeil

05/17/2022, 10:59 AM
great - looking forward to it already (and hoping it is not super complex to add)
The minimal most dumb workaround I could think of in this case: polling on a fixed (regularly i.e. daily partitioned schedule) and then updating the actual data asset (physically materializing it) only in case of updated data
https://dagster.slack.com/archives/C01U954MEER/p1650011014029099 has landed in main i.e.
asset_key = next(iter(my_asset.asset_keys))
should work now. However, when having a case like this one (i.e dynamic partitions and triggering an update only in case of recent data being available) i.e. querying the state of
DagsterEventType.ASSET_MATERIALIZATION
the recent materializations of self (i.e. the asset being currently defined) - how can I get a reference to `self`s` asset key?
In particular when an asset prefix is used (defined outside when instantiating the asset group) getting a reference to self is hard I guess. Would this mean I need to think about a) hardcoding the FQDN of the assetkey inside its definition? or b) restructuring the job? i.e. into an asset which holds nothing more but the metadata, and a sensor + job which then performs the update steps (in case new data is available) and materializes a new partition as well as AssetMetadata/Observation details?
Furthermore for a downstream job (i.e. above named ingestion, downstream cleaning) there should be a 1:1 mapping for the partition (and only trigger once a new partition was ingested.
Even more complex: I am currently in-progress trying to implement something with works with passing additional metadata to the IO manager. In case no fresher data is available I want: 1. the job to NO-OP i.e. not trigger any further computation or cleaning 2. still keep the monitoring (to see that something was executed but no newer data was available). However as outlined in https://dagster.slack.com/archives/C01U954MEER/p1652876973832529 asset materializations are logged even if only a NO-OP one happened. This breaks my current logic which reads asset materialization events (only the latest one) and retrieves the metadata to decide if fresher data is available. However, if a noop is retrieved - the logic falsely concludes that no other tag ever was written. Currently, I only see the alternative to retrieve the Materialization events (for a particular asset) without a limit (i.e. full scan) and iterate over all of them manually intil a first match is found - which obviously is not really desirable/scalable. Do you have thoughts for a better solution? Or perhaps predicate push-down it? Perhaps via the graphql API? (ideally via Python ;))
https://dagster.slack.com/archives/C01U954MEER/p1652888107498619 is related when trying to dynamically generate the partitions in the IO manager - but I face problems passing tags (as no additional run is instigated rather this happens during the computation/materialization of the asset)