Sara
04/21/2022, 12:06 PMSamuel Stütz
04/21/2022, 12:50 PMtrain_job = training.to_job(
resource_defs={
"bq": bigquery_resource,
"io_manager": gcs_parquet_pickle_io_manager,
...
},
config={
"inputs": {
"grouping": "brand",
"time_interval": "daily",
"split_date": f"{datetime.now() - timedelta(days=14):%Y-%m-%d}",
"forecast_horizon_days": 30,
"confidence_interval": 0.95,
"skip_train": False,
},
"resources": {
"io_manager": IO_CONFIG_GCS,
...
},
},
)
with this graph
@graph
def training(
grouping: str,
time_interval: str,
split_date: str,
forecast_horizon_days: int,
confidence_interval: float,
skip_train: bool,
):
"""Train a single model and register it"""
train_df, test_df, train_query, test_query = feature_gen(
grouping, time_interval, split_date
)
model = fit_model(
train_query, test_query, grouping, time_interval, skip_train
)
forecast_df = forecast_unlogged(model, forecast_horizon_days, confidence_interval)
eval_df = evaluate(model, test_df, forecast_df, grouping)
success, _ = model_validation(prev=eval_df)
model_uri = log_model(model, success)
return model
so the job gets inputs injectes as paramters.
alternatively you can have one op which spits out those params