Hi, I'm asking here before we home-brew a hack... ...
# ask-community
m
Hi, I'm asking here before we home-brew a hack... is there any way to override the staleness criteria of an asset? Effectively here I'm trying to set up a TTL-style cache that ignores upstream data changes. The use case is that we have a comprehensive set of "metrics" we compute for some assets, which takes a substantial amount of time and compute. Generally we only want to run this when the underlying asset code has changed, but not when the data has changed. Here is some pseudo code
Copy code
@asset 
def data()->pd.DataFrame:
    ... # some data 
    return df

@asset 
def model(data):
   ... # Some ML stuff
   return model_

@asset
def model_validation_report(model, data):
    ... # Some really expensive ShapValue, GridSerach, Optuna stuff.. 
    return report 

@asset
def deploy_model_endpoint(model, model_validation_report):
    ... # This should run when model changes, and depends on model_validation_report to keep the documentation in sync, 
        # but model_validation_report doesn't have to be re-run 
        # each time data or model change
Here I'd like to avoid re-running model_validation_report each time data and or model change
l
How are you currently dealing with new versions of the model?
m
A properly new model (eg. a change to the model definition / feature set / hyperparameters) is handled as a new StaticPartition of the model asset (not that the model is a partitioned asset) Simply re-training the model with the latest data is a new asset version (all versions are persisted to blob storage) Does that make sense?
🌈 1
The
model_validation_report
is usually ran manually ~1/week to check for model drift, etc... but the latest models are deployed daily
Here is a rough sketch:
The data the model is prediction on refreshes hourly The model itself is re-trained EOD, but used throughout the day to generate predictions for the latest data Usually there is no major change between training runs of the model given only 1 day's worth of data has changed - so we don't usually run the Model Report daily However, I don't know how to easily automate "Upload Stuff to Blob Storage" given that updates to Data / Model mark the Model Report as stale.
l
The new model = new partition is an interesting pattern! My next idea gets into home-brewy territory, but maybe you could run a scheduled Job that Pulls the latest model and runs the report weekly?
m
That's more or less what I do - the report asset basically "runs" every day but the code I wrote just loads and returns the last report if it's less than one week old. Something like (pseudocode, can't be bothered)
Copy code
@asset
def model_validation_report(model, data):
   
   last_ran_report, run_date = load_previous_report(...)

   if now - run_date < 7 days:
      return last_ran_report 
   else:
      new_report = run_report(model, data)

      save(new_report)
      return new_report
c
Hi Mycchaka, this is an interesting use case. We don't have a great way to handle different staleness differently between downstream assets, since it seems like you want
deploy_model_endpoint
and
model_validation_report
to have different notions of "staleness" when receiving the same "model" input.
Wondering if you've seen our guide on data versions and code versions? https://docs.dagster.io/guides/dagster/asset-versioning-and-caching I think one somewhat-hacky way you could represent your dependency graph to get the behavior you're looking for is to do the following: • Represent
model
as a regular asset, but also as an observable source asset. By using an observable source asset, you can continue to load the model and pass it to downstream assets, but you can assign a custom data version. This data version can be bumped when the report asset hasn't been materialized in over a week. • Observe the observable source asset on a schedule, so it is regularly updated to check if the data version has changed. • Then, you can have
model_validation_report
be downstream of the observable source asset, so it is only considered stale after the observable source asset is observed to be out of date.
Copy code
@asset
def model():
    ...

class LoadModel(IOManager):
    def load_input(self, context) -> object:
        # loads the model
        return model


@observable_source_asset(io_manager_key="load_model_input_manager")
def report_observer():
    data_version = ... # get new data version if model_validation_report hasn't been materialized in the last week
    return DataVersion(data_version)


@asset
def model_validation_report(context, report_observer):
    ...


@asset
def deploy(model, model_validation_report):
    ...


defs = Definitions(
    assets=[model, report_observer, model_validation_report, deploy],
    resources={
        "load_model_input_manager": LoadModel(),
    },
)