Mycchaka Kleinbort
07/17/2023, 1:40 PM@asset
def data()->pd.DataFrame:
... # some data
return df
@asset
def model(data):
... # Some ML stuff
return model_
@asset
def model_validation_report(model, data):
... # Some really expensive ShapValue, GridSerach, Optuna stuff..
return report
@asset
def deploy_model_endpoint(model, model_validation_report):
... # This should run when model changes, and depends on model_validation_report to keep the documentation in sync,
# but model_validation_report doesn't have to be re-run
# each time data or model change
Here I'd like to avoid re-running model_validation_report each time data and or model changeLiam Pieri
07/17/2023, 2:12 PMMycchaka Kleinbort
07/17/2023, 3:21 PMMycchaka Kleinbort
07/17/2023, 3:23 PMmodel_validation_report
is usually ran manually ~1/week to check for model drift, etc... but the latest models are deployed dailyMycchaka Kleinbort
07/17/2023, 3:28 PMMycchaka Kleinbort
07/17/2023, 3:31 PMLiam Pieri
07/17/2023, 5:24 PMMycchaka Kleinbort
07/18/2023, 11:33 AM@asset
def model_validation_report(model, data):
last_ran_report, run_date = load_previous_report(...)
if now - run_date < 7 days:
return last_ran_report
else:
new_report = run_report(model, data)
save(new_report)
return new_report
claire
07/18/2023, 6:46 PMdeploy_model_endpoint
and model_validation_report
to have different notions of "staleness" when receiving the same "model" input.claire
07/18/2023, 6:54 PMmodel
as a regular asset, but also as an observable source asset. By using an observable source asset, you can continue to load the model and pass it to downstream assets, but you can assign a custom data version. This data version can be bumped when the report asset hasn't been materialized in over a week.
• Observe the observable source asset on a schedule, so it is regularly updated to check if the data version has changed.
• Then, you can have model_validation_report
be downstream of the observable source asset, so it is only considered stale after the observable source asset is observed to be out of date.
@asset
def model():
...
class LoadModel(IOManager):
def load_input(self, context) -> object:
# loads the model
return model
@observable_source_asset(io_manager_key="load_model_input_manager")
def report_observer():
data_version = ... # get new data version if model_validation_report hasn't been materialized in the last week
return DataVersion(data_version)
@asset
def model_validation_report(context, report_observer):
...
@asset
def deploy(model, model_validation_report):
...
defs = Definitions(
assets=[model, report_observer, model_validation_report, deploy],
resources={
"load_model_input_manager": LoadModel(),
},
)