Jonathan Neo
07/20/2023, 1:05 PMFreshnessPolicy
trigger if the upstream observable_source_asset
shows that the asset hasn't changed? I have a scenario where we have FreshnessPolicies
on dbt models every hour. However, the data in the Source System may not have changed in that hour. And therefore, I don't want to trigger Fivetran Assets to move the data into Snowflake because that would re-processing data that have already been processed. I want to only trigger asset materializations if the below conditions are True:
1. The FreshnessPolicy
attached to my leaf node is activated
2. The
3. observable_source_asset
returns a new data version for the root / starting node
Is that possible?Jonathan Neo
07/20/2023, 2:07 PMAutoMaterializePolicy
but it's only solves half the problem. https://docs.dagster.io/concepts/assets/asset-auto-execution#auto-materializing-assets
I feel like AutoMaterializePolicy
could be extended to cater for the use-case I've described above.Jonathan Neo
07/20/2023, 2:24 PM@observable_source_asset
: This decorator tells us whether an asset has Fresh data or not (it's essentially the same functionally as dbt's source freshness check)
ā¢ `AutoMaterializePolicy`: This is supposed to be the magic sauce that ties the two things above together. However, it's not doing what we want it to today. Ideally it should only materialize ceo_report
if the following conditions are True:
ā¦ The ceo_report
freshness policy has been triggered AND the observable_source_asset
indicate that there's fresh source data to pull from. If observable_source_asset
doesn't indicate that there's fresh data, then ceo_report
should not materialize to avoid wasting unnecessary compute, and then an alert should be raised informing the CEO about stale data ("Hey your dashboard is displaying stale data!").Dagster Jarred
07/21/2023, 4:40 AMZachary Bluhm
07/21/2023, 1:24 PMJoel Olazagasti
07/21/2023, 2:04 PMDuke
07/21/2023, 2:57 PMowen
07/21/2023, 4:28 PMowen
07/21/2023, 4:30 PMC
with parents A
and B
, and you'd want B
to only update daily, but A
and C
to update hourly?owen
07/21/2023, 4:31 PMJonathan Neo
07/21/2023, 5:08 PMas every time you run your Fivetran sync, you're just getting "fully up to date", rather than "filling in X hour of data", is that accurate?@owen Yes, in the example I gave above, Fivetran was doing a Full Refresh because there are some connectors that only support that, and that's what sparked our interest initially. But after doing some thinking, it doesn't matter if Fivetran could've been configured using an Incremental sync. Because even with Fivetran Incremental, the downstream asset that is triggered are dbt assets which executes against a Snowflake compute. Snowflake compute is charged on a per minute basis, so even if the run only takes 1 second, we still get billed for 1 minute of compute. It would be good to just not run fivetran and dbt at all if there's no fresh data being detected at the source system, thus saving on unnecessary and expensive snowflake costs.
Jonathan Neo
07/21/2023, 5:21 PMhow are you conceptualizing what the true "source data time" of your assets is?So the "source" refers to the data source that fivetran is pulling from. This could be a database (e.g. Postgres, MySQL), a file system (e.g. S3 bucket), an API. The "source data time" depends on what source type it is (database, file system, api). ā¢ For a database: ā¦ dagster will need to keep track of a
state
object that captures the latest column value which the user gets to specify e.g. event_timestamp
. dagster should then compare the select max(event_timestamp) from my_table
value against the value stored in the state
object.
ā¢ For a file system:
ā¦ dagster will need to keep track of a state
object that captures the last_modified timestamp for a given filepath or folder in the bucket. dagster should then compare the last_modified timestamp value against the value stored in the state
object.
ā¢ For an API:
ā¦ dagster will need to keep track of a state
object that captures the latest field value that a developer wants e.g. event_timestamp
. dagster should then compare the against the value stored in the state
object.
Classes can be written to automate the functionality described above for popular databases and filesystems. APIs have a very long tail, and it won't be possible to implement for every API, so users can write functionality themselves using the existing @observable_source_asset
decorator .Jonathan Neo
07/21/2023, 5:23 PMowen
07/21/2023, 11:22 PM@observable_source_asset
def my_observable_source():
# time = select max(event_timestamp) from ...
return DataTime(time)
Jonathan Neo
07/22/2023, 4:54 AM@observable_source_asset
def my_observable_source():
# time = select max(event_timestamp) from ...
return DataTime(time)
In addition, I'd like another AutoMaterializePolicy that will only materialize if (1) the FreshnessPolicy is triggered (similar to the current .lazy()
behaviour, and (2) only if there is fresh data detected at the source.
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy_source_freshness()
)
def my_asset(my_observable_source):
...
owen
07/25/2023, 11:02 PMJonathan Neo
07/26/2023, 1:22 AM