Hey there, would it be possible to ignore a `Fresh...
# dagster-feedback
j
Hey there, would it be possible to ignore a
FreshnessPolicy
trigger if the upstream
observable_source_asset
shows that the asset hasn't changed? I have a scenario where we have
FreshnessPolicies
on dbt models every hour. However, the data in the Source System may not have changed in that hour. And therefore, I don't want to trigger Fivetran Assets to move the data into Snowflake because that would re-processing data that have already been processed. I want to only trigger asset materializations if the below conditions are True: 1. The
FreshnessPolicy
attached to my leaf node is activated 2. The 3.
observable_source_asset
returns a new data version for the root / starting node Is that possible?
šŸ‘ 2
I have looked at
AutoMaterializePolicy
but it's only solves half the problem. https://docs.dagster.io/concepts/assets/asset-auto-execution#auto-materializing-assets I feel like
AutoMaterializePolicy
could be extended to cater for the use-case I've described above.
There are 3 components that could do what we want, however there's some things missing at the moment from making it possible. Here are the 3 components: ā€¢ `FreshnessPolicy`: This allows end users to specify and define when they want to receive data assets e.g. a CEO wants their dashboard to have fresh data by 8am every day. ā€¢
@observable_source_asset
: This decorator tells us whether an asset has Fresh data or not (it's essentially the same functionally as dbt's source freshness check) ā€¢ `AutoMaterializePolicy`: This is supposed to be the magic sauce that ties the two things above together. However, it's not doing what we want it to today. Ideally it should only materialize
ceo_report
if the following conditions are True: ā—¦ The
ceo_report
freshness policy has been triggered AND the
observable_source_asset
indicate that there's fresh source data to pull from. If
observable_source_asset
doesn't indicate that there's fresh data, then
ceo_report
should not materialize to avoid wasting unnecessary compute, and then an alert should be raised informing the CEO about stale data ("Hey your dashboard is displaying stale data!").
šŸ‘ 3
d
cc @sandy
z
Thanks for outlining that use case Jonathan - I think we have a very similar use case that caused us to ditch using freshness policies some time ago due to the reasons mentioned That last bullet point really highlights in a nutshell how we'd like for everything to work as well šŸ‘
šŸ‘ 2
j
Looking for the same functionality here! We have a lot of cases where we want downstream assets refreshed more/as often as some of their upstreams
šŸ‘ 3
d
I think this could extend to any upstream nodes instead of just the source assets? Dagster already knows the path it needs to refresh the leaf node, so it should be able to skip any nodes that don't need a refresh
o
Hey @Jonathan Neo! thanks for putting this together -- it seems like in this case, it wouldn't make sense to model your data as explicitly partitioned, as every time you run your Fivetran sync, you're just getting "fully up to date", rather than "filling in X hour of data", is that accurate? Definitely agree on the vision here, and I think there are some pieces about your setup that I'd like to clarify on as well. In your case, how are you conceptualizing what the true "source data time" of your assets is? I'm assuming this is some sort of setup where a "chunk" of data is dropped into some location at point in time T, and then fivetran moves data from that location to your database. In this case, what do these chunks of data look like, and how do you know what "data time" they represent? Are these chunks of data some pre-known size (i.e. 3 hours at a time), do they represent "all the data since the last chunk dropped", or is it unknown until you actually inspect the contents?
@Joel Olazagasti In your case, is it safe to assume that there is at least one upstream of each of these downstream assets that will be refreshing as often as it is? As in, you'd have some asset
C
with parents
A
and
B
, and you'd want
B
to only update daily, but
A
and
C
to update hourly?
I definitely appreciate the feedback on this thread though, these are definitely use cases we're interested in solving for so this input helps a ton!
j
as every time you run your Fivetran sync, you're just getting "fully up to date", rather than "filling in X hour of data", is that accurate?
@owen Yes, in the example I gave above, Fivetran was doing a Full Refresh because there are some connectors that only support that, and that's what sparked our interest initially. But after doing some thinking, it doesn't matter if Fivetran could've been configured using an Incremental sync. Because even with Fivetran Incremental, the downstream asset that is triggered are dbt assets which executes against a Snowflake compute. Snowflake compute is charged on a per minute basis, so even if the run only takes 1 second, we still get billed for 1 minute of compute. It would be good to just not run fivetran and dbt at all if there's no fresh data being detected at the source system, thus saving on unnecessary and expensive snowflake costs.
how are you conceptualizing what the true "source data time" of your assets is?
So the "source" refers to the data source that fivetran is pulling from. This could be a database (e.g. Postgres, MySQL), a file system (e.g. S3 bucket), an API. The "source data time" depends on what source type it is (database, file system, api). ā€¢ For a database: ā—¦ dagster will need to keep track of a
state
object that captures the latest column value which the user gets to specify e.g.
event_timestamp
. dagster should then compare the
select max(event_timestamp) from my_table
value against the value stored in the
state
object. ā€¢ For a file system: ā—¦ dagster will need to keep track of a
state
object that captures the last_modified timestamp for a given filepath or folder in the bucket. dagster should then compare the last_modified timestamp value against the value stored in the
state
object. ā€¢ For an API: ā—¦ dagster will need to keep track of a
state
object that captures the latest field value that a developer wants e.g.
event_timestamp
. dagster should then compare the against the value stored in the
state
object. Classes can be written to automate the functionality described above for popular databases and filesystems. APIs have a very long tail, and it won't be possible to implement for every API, so users can write functionality themselves using the existing
@observable_source_asset
decorator .
It's like a sensor that polls the source at specified intervals, except I don't want the sensor to trigger the materialization of downstream assets. I just want the sensor to say that "data is fresh blob smile happy " or "data is not fresh sadge", and let the FreshnessPolicy perform the trigger only if data is fresh.
šŸ‘ 1
o
That makes a lot of sense! I think if I could distill this into a specific feature request, it'd be something along the lines of "Allow users to explicitly interact with the data time system". Right now, the data time of an asset is purely determined by either: ā€¢ The materialization timestamp of a root asset (if no observable source asset) ā€¢ The observation timestamp of an observable source asset (if you've consumed the latest version of the observable source, you're totally up to date, otherwise you have a data time equal to the timestamp of the last observation you did consume) For your use case, you'd want more flexibility in defining how these things relate to each other, would that be accurate? For an API, you could imagine something like:
Copy code
@observable_source_asset
def my_observable_source():
    # time = select max(event_timestamp) from ...
    return DataTime(time)
j
Hey @owen yes, the API you've suggested is what I'm looking for:
Copy code
@observable_source_asset
def my_observable_source():
    # time = select max(event_timestamp) from ...
    return DataTime(time)
In addition, I'd like another AutoMaterializePolicy that will only materialize if (1) the FreshnessPolicy is triggered (similar to the current
.lazy()
behaviour, and (2) only if there is fresh data detected at the source.
Copy code
@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy_source_freshness()
)
def my_asset(my_observable_source):
    ...
o
gotcha, that makes a lot of sense. another potential interpretation (which you may or may not find appealing) is to model your upstream data as a time-partitioned asset. however, instead of having discrete time partitions (such as hourly or daily), it might be more like a ContinuousTimeWindowPartitionsDefinition or something along those lines, where the observable source asset produces a new "high water mark" of data that exists for that asset, which Dagster would keep track of (and incorporate into freshness-based logic). One advantage of this approach would be that it would become more natural to have regular time window (e.g. daily) partitions definitions downstream of such an asset, as you could explicitly define a partition mapping. Do you have thoughts/opinions on that sort of API?
j
Hey @owen yes that makes sense to me. Although I haven't used dagster partitions yet, so I might not see the full picture.