Daniel Gafni
04/04/2023, 2:57 PMobserve_fn
and @observable_source_asset
Hey guys.
I have some `SourceAsset`'s that are a bit tricky - I have around 10 of them (potentially more coming in the future) and each one is either a .parquet
or an .avro
file. The assets are partitioned by date (the path to the file includes it).
I've implemented a custom IOManager
for them. I'm able to load these assets and do stuff with them.
Now I'm trying to make my `SourceAsset`s observable. Mostly because right now Dagit complains about missing upstream partitions whenever I'm using these assets to materialize downstream assets.
However, I'm not sure how to efficiently reuse this weird code I have in my custom IOManager
. Sure, I could create common functions and use them in both the IOManager and observable_source_asset
, but this seems like an unnecessary thing. My IOManager
already knows how to read this asset, why would I need to tell Dagster how to do it again (inside observable_source_asset
or observe_fn
)?
The best scenario would be calling io_manager.load_input(context)
. But I'm not sure how to access the io_manager instance from the op
/ observable_source_asset
/ observe_fn
functions body. The SourceAssset
has an io_manager_key
attribute, but the OPExecutionContext
doesn't have it (perhaps because the IOManagers are supposed to do their job before an op
starts). Right now I'm just using the default key "io_manager"
to get the io_manager from context.resources
, but this is the default io_manager, even tho I've passed my custom IOManager to io_manager_def
in SourceAsset
I can't access my custom IOManager in the observe_fn
.
What would be the best way to do it? Also, seems like this case should be pretty common. Perhaps Dagster should provide a convenient way to create such observable_source_asset
? Perhaps, the observe_fn
could be a method of the IOManager
? Or a data_version_fn
could be specified for the SourceAsset
? This way it could automatically use the provided IOManager to load the object and apply the data_version_fn
to it.
Or maybe I'm getting it all wrong and there is another way to achieve the same result...
fyi @sandy
Another issue: seems like it's not possible to launch a partitioned observation from DagitDaniel Gafni
04/04/2023, 6:35 PMobserve_fn
that's supposed to load the data if the IOManager already specifies data loading? Shouldn't we just have data_version_fn
which takes a loaded object and computes the DataVersion
?sandy
04/04/2023, 10:47 PMDaniel Gafni
04/04/2023, 10:55 PMDaniel Gafni
04/05/2023, 7:23 AMDaniel Gafni
04/05/2023, 8:01 PMDaniel Gafni
04/05/2023, 8:05 PMsandy
04/05/2023, 8:55 PMDaniel Gafni
04/12/2023, 1:09 PMIOManager
should support computing the DataVersion so the user doesn't have to call it every time manually.
This should be straightforward for normal assets, right? and probably for source assets toosandy
04/14/2023, 5:33 PMSeems like you definitely want to use the actual data to compute DataVersion at least if you are the one writing it (not for SourceAssets).I think there's a significant tradeoff here. Computing the hash of a large dataset can be very expensive. In many cases, I think it's OK to sacrifice some accuracy to avoid this expense. It's also always not straightforward how to do so - e.g. how do you compute the hash of a table or of a Spark DataFrame? Also, another solution to "cosmetic" changes in the code is just to not update the code version when those cosmetic changes occur. You also don't necessarily need to load a value into memory in order to compute its data version. For example, if data is stored in a file, you might just want to look at the bytes of the file instead of spending the cycles to deserialize it into a DataFrame. That all said, I still agree with you that there's value in connecting this to the IO manager. At the very least, the IO manager usually contains logic to locate the data in storage (e.g. construct the file name).
Daniel Gafni
04/14/2023, 7:37 PMIOManager
?Daniel Gafni
04/18/2023, 5:02 PMsandy
04/21/2023, 11:50 PMSo what would be the next steps in connecting this to theThis is a topic that we're unfortunately likely to move slowly on because: • I think there are a lot of ways we could implement this and I don't want to jump into one without considering the larger design space • This is not something that I have bandwidth to put a ton of thought into at the moment, although I would like to be able to do so in the future One next step would be to try to write out what the ideal code that a user would write would look like for all the situations discussed above?IOManager