https://dagster.io/ logo
Title
d

Daniel Gafni

04/04/2023, 2:57 PM
Questions regarding the
observe_fn
and
@observable_source_asset
Hey guys. I have some `SourceAsset`'s that are a bit tricky - I have around 10 of them (potentially more coming in the future) and each one is either a
.parquet
or an
.avro
file. The assets are partitioned by date (the path to the file includes it). I've implemented a custom
IOManager
for them. I'm able to load these assets and do stuff with them. Now I'm trying to make my `SourceAsset`s observable. Mostly because right now Dagit complains about missing upstream partitions whenever I'm using these assets to materialize downstream assets. However, I'm not sure how to efficiently reuse this weird code I have in my custom
IOManager
. Sure, I could create common functions and use them in both the IOManager and
observable_source_asset
, but this seems like an unnecessary thing. My
IOManager
already knows how to read this asset, why would I need to tell Dagster how to do it again (inside
observable_source_asset
or
observe_fn
)? The best scenario would be calling
io_manager.load_input(context)
. But I'm not sure how to access the io_manager instance from the
op
/
observable_source_asset
/
observe_fn
functions body. The
SourceAssset
has an
io_manager_key
attribute, but the
OPExecutionContext
doesn't have it (perhaps because the IOManagers are supposed to do their job before an
op
starts). Right now I'm just using the default key
"io_manager"
to get the io_manager from
context.resources
, but this is the default io_manager, even tho I've passed my custom IOManager to
io_manager_def
in
SourceAsset
I can't access my custom IOManager in the
observe_fn
. What would be the best way to do it? Also, seems like this case should be pretty common. Perhaps Dagster should provide a convenient way to create such
observable_source_asset
? Perhaps, the
observe_fn
could be a method of the
IOManager
? Or a
data_version_fn
could be specified for the
SourceAsset
? This way it could automatically use the provided IOManager to load the object and apply the
data_version_fn
to it. Or maybe I'm getting it all wrong and there is another way to achieve the same result... fyi @sandy Another issue: seems like it's not possible to launch a partitioned observation from Dagit
In other words: why do we need
observe_fn
that's supposed to load the data if the IOManager already specifies data loading? Shouldn't we just have
data_version_fn
which takes a loaded object and computes the
DataVersion
?
s

sandy

04/04/2023, 10:47 PM
That's interesting - I haven't thought about this, but it makes sense. In many cases, you won't actually want to load the asset - you'll instead want to determine the version by looking at the modified timestamp on the file or a hash of the bytes on disk. But I could also see cases where you'd want to look at the in-memory contents as you mentioned. Want to file a github issue for this? Right now, we also don't have the ability to associate a data version with a particular partition of a source asset, although Sean is working on this. cc @sean
d

Daniel Gafni

04/04/2023, 10:55 PM
Thanks Sandy, I’ll open an issue. Yeah the timestamp might be enough in a lot of cases. I simply didn’t think about it lol. I was computing a hash for the entire polars dataframe. Partitions support would be great to have, but it’s not blocking anything right now. Dagit’s warnings are annoying at most.
👍 1
Maybe we could also add the file modification timestamp DataVersion function to the UPathIOManager as a solid default behavior
@sean hey Sean, could you please clarify whether you are working on partitioned source assets observations or not? It just would be nice to know.
@sandy Here is the issue, is the description good enough?
s

sandy

04/05/2023, 8:55 PM
that description is great
👍 1
d

Daniel Gafni

04/12/2023, 1:09 PM
Hi Sandy So I just took a closer look at caching & versioning page Seems like you definitely want to use the actual data to compute DataVersion at least if you are the one writing it (not for SourceAssets). Otherwise downstream assets will become stale even after "cosmetic" updates to the upstream asset, and we don't want that. Using the file last modified timestamp won't help here. And seems like
IOManager
should support computing the DataVersion so the user doesn't have to call it every time manually. This should be straightforward for normal assets, right? and probably for source assets too
s

sandy

04/14/2023, 5:33 PM
Seems like you definitely want to use the actual data to compute DataVersion at least if you are the one writing it (not for SourceAssets).
I think there's a significant tradeoff here. Computing the hash of a large dataset can be very expensive. In many cases, I think it's OK to sacrifice some accuracy to avoid this expense. It's also always not straightforward how to do so - e.g. how do you compute the hash of a table or of a Spark DataFrame? Also, another solution to "cosmetic" changes in the code is just to not update the code version when those cosmetic changes occur. You also don't necessarily need to load a value into memory in order to compute its data version. For example, if data is stored in a file, you might just want to look at the bytes of the file instead of spending the cycles to deserialize it into a DataFrame. That all said, I still agree with you that there's value in connecting this to the IO manager. At the very least, the IO manager usually contains logic to locate the data in storage (e.g. construct the file name).
d

Daniel Gafni

04/14/2023, 7:37 PM
Agree with everything you've said. Also wanted to add that an asset may also be just re-materialized without changing the data output. The downstream assets should not be updated in this case. I agree there is a tradeoff. Also, sometimes you would still want to hash the large dataset - for example, if it's in the root of all your other assets, and recomputing all of them is actually more expensive. So what would be the next steps in connecting this to the
IOManager
?
Other currently non-implemented features maybe related to this: • mark partition as materialized from Dagit (the IOManager could check if it really exists with the observe_fn) • mark partition as empty from Dagit
s

sandy

04/21/2023, 11:50 PM
So what would be the next steps in connecting this to the
IOManager
?
This is a topic that we're unfortunately likely to move slowly on because: • I think there are a lot of ways we could implement this and I don't want to jump into one without considering the larger design space • This is not something that I have bandwidth to put a ton of thought into at the moment, although I would like to be able to do so in the future One next step would be to try to write out what the ideal code that a user would write would look like for all the situations discussed above