Sebastian Charrier
02/08/2023, 12:28 AM@asset(
ins={
"daily_order_summary": AssetIn(key_prefix=["analytics"]),
"order_forecast_model": AssetIn(),
},
compute_kind="ml_tool",
key_prefix=["forecasting"],
)
def predicted_orders(
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
) -> pd.DataFrame:
#Predicted orders for the next 30 days based on the fit paramters
a, b = order_forecast_model
daily_order_summary = pd.read_json('<gs://data-analytics/test-data/test.json>')
start_date = daily_order_summary.order_date.max()
print(pd.to_datetime(start_date, unit='s'))
future_dates = pd.date_range(
start=pd.to_datetime(start_date, unit='s'), end=pd.to_datetime(start_date, unit='s') + pd.DateOffset(days=30)
)
#daily_order_summary.to_json('<gs://data-analytics/test-data/test.json>')
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})
"io_manager_gcs": gcs_pickle_io_manager.configured(
{
"gcs_bucket": "data-analytics",
"gcs_prefix": "garys-prefix"
}
),
"gcs": gcs_resource,
Sean Lopp
02/08/2023, 4:59 AMscott simpson
02/11/2023, 5:07 AMservice_account_creds = os.environ['GCP_SERVICE_ACCOUNT_CREDS']
gcs_credentials = service_account.Credentials.from_service_account_info(json.loads(service_account_creds))
storage_client = storage.Client(credentials=gcs_credentials)
resource_defs = {
"data_lake_io_manager": gcs_pickle_io_manager.configured(
{
"gcs_bucket": "my_bucket",
"gcs_prefix": "my_prefix"
}
),
"gcs": ResourceDefinition.hardcoded_resource(storage_client)
}
The gcs
param at the end was the key to getting it working