Mycchaka Kleinbort
07/17/2023, 4:19 PMdata
from dagster import StaticPartitionsDefinition, asset, FreshnessPolicy
oceans_partitions_def = StaticPartitionsDefinition(
["arctic", "atlantic", "indian", "pacific", "southern"]
)
@asset(freshness_policy=FreshnessPolicy(cron_schedule='0 */1* * *', maximum_lag_minutes=5)) # Update this every hour
def data()->pd.DataFrame:
return pd.read_sql('SELECT * FROM my_ocean_data_that_updates_every_minute')
@asset(partitions_def=oceans_partitions_defs) # How do I mark this as stale when `data` updates?
def ml_model_for_each_ocean(context, data):
relevant_data = data.loc[lambda x: x['ocean'] == context.asset_partition_key_for_output()]
model = build_model(relevant_data)
return model
claire
07/17/2023, 11:48 PM