Hi all! Is there some nice way to model "non-pure ...
# ask-community
Hi all! Is there some nice way to model "non-pure functions" as assets? e.g. I have some non-pure code, that produces a "batch" of raw data every time it runs. Then from there we have various pure functions (to transform/combine the data). I care about all batches of data ever produced, not just the most recent one. What I could do: • Model the non-pure function as a
• Use an S3 sensor to create a dynamically partitioned
for each output written to S3 by this job • Use `asset`s for the (pure) downstream tasks But in this model I don't have a single place in dagit to see my data end-to-end. The jobs and their logs don't show up in the asset graph here. The link between the job and the assets is implicit via S3, not explicit, like with assets that depend on other assets.
🤖 1
Assuming I'm not entirely missing a feature, I guess what I'm looking for is one of: • UI changes so I can see my job and assets in a single lineage graph. • A special type of non-re-materializable asset, where every "run" just creates a new partition instead of re-creating the existing data.
Hi Timo, I don’t see why you couldn’t use a partitioned asset at the root of your graph to represent a non-pure function. I understand that’s not quite the same as the “special type of non-re-materializable asset” you’re inquiring about, but we don’t currently have that in our API (though it’s an interesting idea-- you might want to open a feature request).
If my nonpure function is something like "download top 100 hackernews stories" won't that create big risk that a rematerialization of a partition "overwrites" data I collected in the past with new/different data? My goal is to only ever add new data. Perhaps there's some way I can make a partition know if it's been materialized in the past and exit immediately (or return the data it produced last time)?
End result being is that if I create a new partition, then the initial materialization goes off and collects a batch of data. But any re-materializations of that partition are no-ops.
Yes, that risk is there-- one thing you could do is inside the materialization function, query the instance for any previous materializations:
Copy code
def my_asset(context):
    if context.partition_key in context.instance.get_materialization_count_by_partition(AssetKey(["my_asset"]):
    raise Exception()
That’s a private API, but I can expand the public
to work with partitions for this week, that will enable you to tell if it’s been materialized too.
Thanks for the suggestions! I will go play and see what I can do with these. I've also created a feature request here: https://github.com/dagster-io/dagster/issues/13336
🙏 1
Somewhat unrelated: I've seen your name in a number of GitHub issues I've been following as well. Just wanted to say your community support efforts are noticed and appreciated. Thanks! 🙌🙏
Hacked something together which suits my needs for now. Sharing in case someone else finds this message: https://github.com/dagster-io/dagster/issues/13336#issuecomment-1495217302