Am I correct in understanding that if an Asset Rec...
# dagster-feedback
Am I correct in understanding that if an Asset Reconciliation Sensor selects a Partitioned Asset that has incomplete partitions, it launches a job for each unfilled partition, even in the past? So if I add a new asset that requires a backfill, it will basically launch the backfill, but by way of RunRequests for each individual partition? I'm torn on this behavior, but mostly I think that the sensor should only fill partitions in the future, and not launch any backfills. Current behavior creates the possibility of surprise-launching hundreds (or thousands) of jobs, either by targeting an asset that isn't fully backfilled, or adding a new asset that requires a backfill.
👍 5
This is an interesting point - we are also considering moving over to a fully declarative pipeline as that seems to be the "way of the future", but am also very wary of the flexibility the current asset reconciliation sensor provides out of the box. Ideally we have the flexibility to: • Declare that a net-new asset should not be backfilled by the reconciliation sensor (Stephen's point above) • Declare that an existing asset should only be reconciled for net-new partitions
👍 2
plus1 1
These are great points, and the "surprise backfill" issue in particular is a pretty rough one. A question: if you have told Dagster not to launch runs for old missing partitions, and you are missing a few old days worth of data, would you want that asset to still show up as out of date in the UI? My thinking is that it should be shown as out-of-date, which implies that this sort of information is conceptually different from the FreshnessPolicy of the asset. I’m imagining a declarative ReconciliationPolicy which can be attached to the asset definition (maybe subsidiary to the FreshnessPolicy?) Sketching out some things that might go into this policy: • An indicator of "how far back" the reconciliation sensor allowed to materialize partitions for a given asset, probably in terms of number of partitions. This could potentially be set by default to some reasonable number 5? 10? but I'm not confident there • An indicator of if upstream version updates to old partitions should be cause updates to this asset (@Zachary Bluhm’s second point). This would default to allowing old partitions to be updated. • A timing preference (either "asap", meaning that this asset should be updated as soon as its upstreams update or "lazy" meaning you let the sensor make a smarter/more efficient decision on when to run it in order to minimize total runs). This would default to "lazy". One extra thing to note is that these concepts mostly just make sense in the world of time-based partitions (which, admittedly, make up a large majority of partition use cases). For static partitions definitions, I think those should generally be treated as N separate non-partitioned assets
hey Owen! To answer your question: I think our users are always going to want more flexibility 😛 Our main 3 main use cases that drive this are: 1. We have some super expensive table that we might want to "backfill" on our own, i.e we add a single column that isn't used in other parts of the table that we will load manually. Or, we want the ability to "schedule" when the backfill happens (maybe on the weekend so the resource consumption doesn't impact day to day). This is rare, but when it happens this is a super nice control to have. 2. A table may be "append-only", and we never want to rewrite its history, even if its upstreams have modified partitions. But, we still want it to "wait" on its upstreams to execute net-new partitions before running itself for that same partition. 3. A net-new table is created, but there is no desire to backfill it at all. However, it should still reconcile net-new partitions. For case 1, it makes sense to mark the dataset as stale. For case 2 I think it doesn't make sense to mark as stale (unless a net-new partition has not been materialized). Case 3 is tricky, but I think the general opinion here would be to not mark it as stale by default for already existing partitions. This also makes me wonder if it makes sense (in our case at least) for tables to be able to define their own partition definition, or at least the start day of it. I also want to frame all of this in the context of DBT - I can imagine there might be other takes for folks with other use cases
I don’t love the idea of yet another policy to manage -- there’s a ton of 3d chess already happening with partitions and assets and sensors and staleness, and even versioning, which i hope to never have to touch -- but obviously there is a lot of nuance. Here's my mental model: • a Freshness Policy declares when an asset is marked Stale. (Default is when at least one upstream asset has a more recent materialization.) • a Reconciliation sensor submits stale assets for re-run. (Period.) Right now, there is this really weird situation where the sensor will "decide" when to submit upstream assets for materialization. This is a terrifying feature to me, and the reason is that it violates the model above -- we have a sensor submitting assets for rematerialization that are not stale. In that light, here's what I would add to the freshness policy: 1. Mark as stale when (at least one // all) upstream assets are more recent. cf. here . 2. Mark as stale when any of the the past N partitions are missing. (default=0 ==> only future). 3. Mark as stale if downstream assets can request more recent materializations. (Basically, on-demand rematerialization.) Point here is that "staleness" is the abstraction I want to reason about with an asset, because it keeps a clean break with whatever sensor logic does/does not exist, as well as new additions to the asset graph. (I don't want someone else to publish a new asset that changes the behavior of existing assets.)
❤️ 1
💯 1
Thanks again both of you for the detailed responses! @Stephen Bailey I am largely in agreement with the larger point that the FreshnessPolicy is/should be the main abstraction that users focus on, and that more flexibility in that layer would be useful. One nuance that I want to dig into is the idea of assets that have somewhat ambiguous “staleness” values. This situation is actually quite common in the non-partitioned asset world. For example, if you have an online transactional database table (source asset), which gets sync’d to an analytical database table (regular asset), it is always possible to materialize the downstream table asset in order to pull in new data. It may be the case that you’re fairly indifferent on how often this syncing operation takes place (running it more often may have negligible cost implications, as each run only updates new rows), so setting a FreshnessPolicy on that particular node may be undesirable. You just want that table to be updated at whatever times allow downstream assets (which do have FreshnessPolicies) to get the data that they need in time. I don’t think this pattern (which I would argue is a core use case) can coexist with “The reconciliation sensor should submit only and all stale assets for rerun”. Currently, we treat assets in that category as neither fresh nor stale. They’re not fresh because they have no freshness policy defined, and not stale because there’s no upstream materialization event (we just are assuming that the upstream data for the source asset is always changing) and… well, it’d be annoying to have a bunch of assets marked as stale with no way to fix it. If we had some default freshness policy for all assets, it’d probably be categorized as “always fresh”, which would be undesirable as then it’d never be executed by the reconciliation logic. At the risk of being too pedantic, I think it’d potentially be more accurate to say something more like “The reconciliation sensor should only submit assets which are not fresh for rerun”. In this case, if you are certain of your desired behavior for that aforementioned regular asset (maybe it really should be only run once a day), then you could simply set a FreshnessPolicy on that asset specifying that fact, and prevent runs from being kicked off at a higher frequency than you’ve specified. Otherwise, that asset is at the whims of its downstream assets. Some extra miscellaneous points: • In the nearish future, we are adding the capability to materialize any/all missing/stale assets (i.e. things that show up as “stale” in Dagit) on a schedule. This is a more straightforward approach to the problem than the current reconciliation logic, but doesn’t work for the situation described above (for the reasons detailed above). It also means that you need to wait for your assets to become stale before Dagster fixes them, rather than Dagster anticipating when the asset will become stale and launching runs to keep assets up to date before that happens. However, this may be the sort of nice "safe" functionality you're looking for. • In the mediumish future, we’re planning on moving the reconciliation sensor into a daemon, which should not only make the reconciliation stuff feel less “special” (it’s a bit weird to have to add a special sensor to your repository/Definitions, and having reconciliation-specific properties on an asset feels very off if the only thing that interprets those properties is that special sensor), but also means that it can work cross-code location. • Along those lines, I do think that there are some properties that do purely exist in the reconciliation domain, rather than the freshness domain. The first one is the property of “should this asset be allowed to be kicked off by reconciliation logic at all”. You may have a FreshnessPolicy on something indicating when you expect it to be available by, but have some custom logic for determining when it needs to get kicked off (and so you don’t want Dagster to try to kick off that asset with its own logic). Another property would concern when an asset should be materialized. In many situations, there’s a large window of times during which it’d be acceptable to materialize an upstream asset in order to get all of the data that you need. In general, given the FreshnessPolicy that’s defined, it should be acceptable to wait past the first acceptable moment in order to minimize the number of times that that upstream asset needs to get executed (multiple downstream freshness policies may depend on this upstream asset at different cadences). The reconciliation logic takes this into account (for non-partitioned assets), but you might want some way of telling it not to for specific nodes. I think these properties can potentially be hidden away in more elegant ways than a whole new complex policy to manage, but I did want to point that out.
Yeah, I guess I could see that "fine-to-materialize" approach being a typical use case, although I just don't like how potentially unexpected it is. Our asset graphs is not 2 layers deep, many of them are 8 layers deep and cross multiple systems -- that long-distance lineage seems to me to be the main draw, and I'm not confident in Dagster's ability to work it out -- a sort of "canal gating" seems more controllable and less risky. (my nightmare is some adding a FreshnessPolicy to a end-of-stream asset that starts kicking off dbt runs and ml model training jobs.) But again, I'm not thinking systematically, just about our graph, which is one big connected 600 node asset graph. At a high-level, I would just encourage you all to think really clearly about the way user's are able to reason about this funcitonality. It's deeply new, and I know that my data scientists (who use Dagster as a tool) need a simple rule of thumb. I actually tend to think of the value here as a Unified Event Stream, where it's <Upstream Asset Materialized --> Trigger Downstream Asset>, or <Asset Became Stale -> Materialize Asset>, which is why I like the simple framework.
I think a consequence of this discussion was this change in the last version:
for time-window-partitioned assets, no runs will be kicked off for partitions whose windows end more than one day before the latest window for that asset. In essence, this gives a time bound of ~1 day for the sensor to materialize a partition of an asset once that partition comes into being.
While I understand this reduces the troubles of generating countless surprise backfills when one adds a new asset into the reconciliation group, this also prevents the automatic backfill of missed partition that this sensor was allowing before. And this use case is rather important for us. Our infrastructure is not great, so it's possible the reconciliation couldn't run during the weekend, or during the vacations (damn French people) of the guy in charge of this pipeline who will know how to backfill. With this change to 1 day of look back, when we put Dagster back online on the next Monday, the reconciliation sensor will not be able to automatically backfill the weekend's or the week's missed partition. So picking up from your initial suggestions @owen, we would like to be able to customize that 1 day time-delta, so we can set it to 1 week or 1 month. What we love with Dagster, is that we can put our efforts into writing smart sensors that will highly reduce the amount of manual infrastructure issues related backfills we have to do.
@Nicolas Parot Alvarez yep you're totally right -- we do plan on making this customizable (ideally per-asset) in the future, but we haven't decided on exactly how we want to expose that functionality to users. Do you have an ideal way of expressing that in mind?
Hello, In my current usage, the easiest way would be to have a new
parameter in
build_asset_reconciliation_sensor(look_back_days: int = 1, ...)
. But, being able to fully define the reconciliation at the moment of the asset definition would probably be the best and allow removing the reconciliation sensor that sticks out as a different breed than the other sensors. Something like:
@asset(reconciliation_policy: ReconciliationPolicy = None)
ReconciliationPolicy(assets: Union[Sequence[AssetKey], Sequence[AssetDefinition], AssetSelection] = this_asset.upstream(depth=1), look_back_days: int = 1, ...)
Or maybe allow both, and the sensor overwrites the assets' reconciliation policies.