{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Predict energy consumption with SageMaker DeepAR\n", "[DeepAR](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) is a supervised learning algorithm for forecasting scalar time series. This notebook demonstrates how to prepare a dataset of time series for training DeepAR and how to use the trained model for inference. We demonstrate the following topics:\n", "- Prepare the dataset\n", "- Use the SageMaker Python SDK to train a DeepAR model and deploy it\n", "- Make requests to the deployed model to obtain forecasts interactively\n", "\n", "[Dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) \n", "[Source notebook](https://github.com/aws/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/deepar_electricity/DeepAR-Electricity.ipynb)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from __future__ import print_function\n", "\n", "%matplotlib inline\n", "\n", "import sys\n", "import zipfile\n", "from dateutil.parser import parse\n", "import json\n", "from random import shuffle\n", "import random\n", "import datetime\n", "import os\n", "\n", "import boto3\n", "import s3fs\n", "import sagemaker\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "from datetime import timedelta\n", "\n", "from ipywidgets import interact, interactive, fixed, interact_manual\n", "import ipywidgets as widgets\n", "from ipywidgets import IntSlider, FloatSlider, Checkbox" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set random seeds for reproducibility\n", "np.random.seed(42)\n", "random.seed(42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()\n", "region = sagemaker_session.boto_region_name\n", "role = sagemaker.get_execution_role() # IAM role to use by SageMaker\n", "\n", "s3_bucket = sagemaker.Session().default_bucket() # replace with an existing bucket if needed\n", "s3_prefix = \"deepar-electricity-demo-notebook\" # prefix used for all data stored within the bucket\n", "s3_data_path = \"s3://{}/{}/data\".format(s3_bucket, s3_prefix)\n", "s3_output_path = \"s3://{}/{}/output\".format(s3_bucket, s3_prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, \"forecasting-deepar\", \"latest\")\n", "image_name = sagemaker.image_uris.retrieve(framework='forecasting-deepar',region=region)\n", "print(image_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import electricity dataset and upload it to S3 to make it available for Sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATA_HOST = \"sagemaker-sample-files\"\n", "DATA_PATH = \"datasets/timeseries/uci_electricity/\"\n", "ARCHIVE_NAME = \"LD2011_2014.txt.zip\"\n", "FILE_NAME = ARCHIVE_NAME[:-4]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client = boto3.client(\"s3\")\n", "\n", "if not os.path.isfile(FILE_NAME):\n", " print(\"downloading dataset (258MB), can take a few minutes depending on your connection\")\n", " s3_client.download_file(DATA_HOST, DATA_PATH + ARCHIVE_NAME, ARCHIVE_NAME)\n", "\n", " print(\"\\nextracting data archive\")\n", " zip_ref = zipfile.ZipFile(ARCHIVE_NAME, \"r\")\n", " zip_ref.extractall(\"./\")\n", " zip_ref.close()\n", "else:\n", " print(\"File found skipping download\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = pd.read_csv(FILE_NAME, sep=\";\", index_col=0, parse_dates=True, decimal=\",\")\n", "num_timeseries = data.shape[1]\n", "data_kw = data.resample(\"2H\").sum() / 8\n", "timeseries = []\n", "for i in range(num_timeseries):\n", " timeseries.append(np.trim_zeros(data_kw.iloc[:, i], trim=\"f\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_start_date = \"2014-01-01\"\n", "sample_end_date = \"2014-01-21\"\n", "\n", "fig, axs = plt.subplots(5, 2, figsize=(20, 20), sharex=True)\n", "axx = axs.ravel()\n", "for i in range(0, 10):\n", " timeseries[i].loc[sample_start_date:sample_end_date].plot(ax=axx[i])\n", " axx[i].set_xlabel(\"date\")\n", " axx[i].set_ylabel(\"kW consumption\")\n", " axx[i].grid(which=\"minor\", axis=\"x\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convert data to DeepAR JSON lines format" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# we use 2 hour frequency for the time series\n", "freq = \"2H\"\n", "\n", "prediction_days = 7\n", "\n", "# we predict for `prediction_days` days\n", "prediction_length = prediction_days * 12\n", "\n", "# we also use `prediction_days` days as context length, this is the number of state updates accomplished before making predictions\n", "context_length = prediction_days * 12" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Training data\n", "We specify here the portion of the data that is used for training: the model sees data from 2014-01-01 to 2014-09-01 for training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start_dataset = pd.Timestamp(\"2014-01-01 00:00:00\", freq=freq)\n", "end_training = pd.Timestamp(\"2014-09-01 00:00:00\", freq=freq)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_data = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": ts[\n", " start_dataset : end_training - timedelta(days=1)\n", " ].tolist(), # We use -1, because pandas indexing includes the upper bound\n", " }\n", " for ts in timeseries\n", "]\n", "print(len(training_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test data\n", "As test data, we will consider time series extending beyond the training range: these will be used for computing test scores, by using the trained model to forecast their trailing 7 days, and comparing predictions with actual values. To evaluate our model performance on more than one week, we generate test data that extends to 1, 2, 3, 4 weeks beyond the training range. This way we perform rolling evaluation of our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "num_test_windows = 4\n", "\n", "test_data = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": ts[start_dataset : end_training + timedelta(days=k * prediction_length)].tolist(),\n", " }\n", " for k in range(1, num_test_windows + 1)\n", " for ts in timeseries\n", "]\n", "print(len(test_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write data to JSON lines" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def write_dicts_to_file(path, data):\n", " with open(path, \"wb\") as fp:\n", " for d in data:\n", " fp.write(json.dumps(d).encode(\"utf-8\"))\n", " fp.write(\"\\n\".encode(\"utf-8\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "write_dicts_to_file(\"train.json\", training_data)\n", "write_dicts_to_file(\"test.json\", test_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Copy data to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp train.json {s3_data_path}/train/train.json\n", "!aws s3 cp train.json {s3_data_path}/test/test.json" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!head -2 ./train.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train a model\n", "- Create estimator with container image URI\n", "- Set hyperparameters\n", "- Configure data channel\n", "- Fit the estimator" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator = sagemaker.estimator.Estimator(\n", " image_uri=image_name,\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", " instance_count=1,\n", " instance_type=\"ml.c4.4xlarge\",\n", " base_job_name=\"deepar-electricity-demo\",\n", " output_path=s3_output_path,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hyperparameters = {\n", " \"time_freq\": freq,\n", " \"epochs\": \"400\",\n", " \"early_stopping_patience\": \"40\",\n", " \"mini_batch_size\": \"64\",\n", " \"learning_rate\": \"5E-4\",\n", " \"context_length\": str(context_length),\n", " \"prediction_length\": str(prediction_length),\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.set_hyperparameters(**hyperparameters)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "data_channels = {\"train\": \"{}/train/\".format(s3_data_path), \"test\": \"{}/test/\".format(s3_data_path)}\n", "\n", "estimator.fit(inputs=data_channels, wait=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create endpoint and predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.serializers import IdentitySerializer" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class DeepARPredictor(sagemaker.predictor.Predictor):\n", " def __init__(self, *args, **kwargs):\n", " super().__init__(\n", " *args,\n", " # serializer=JSONSerializer(),\n", " serializer=IdentitySerializer(content_type=\"application/json\"),\n", " **kwargs,\n", " )\n", "\n", " def predict(\n", " self,\n", " ts,\n", " cat=None,\n", " dynamic_feat=None,\n", " num_samples=100,\n", " return_samples=False,\n", " quantiles=[\"0.1\", \"0.5\", \"0.9\"],\n", " ):\n", " \"\"\"Requests the prediction of for the time series listed in `ts`, each with the (optional)\n", " corresponding category listed in `cat`.\n", "\n", " ts -- `pandas.Series` object, the time series to predict\n", " cat -- integer, the group associated to the time series (default: None)\n", " num_samples -- integer, number of samples to compute at prediction time (default: 100)\n", " return_samples -- boolean indicating whether to include samples in the response (default: False)\n", " quantiles -- list of strings specifying the quantiles to compute (default: [\"0.1\", \"0.5\", \"0.9\"])\n", "\n", " Return value: list of `pandas.DataFrame` objects, each containing the predictions\n", " \"\"\"\n", " prediction_time = ts.index[-1] + ts.index.freq\n", " quantiles = [str(q) for q in quantiles]\n", " req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)\n", " res = super(DeepARPredictor, self).predict(req)\n", " return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)\n", "\n", " def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):\n", " instance = series_to_dict(\n", " ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None\n", " )\n", "\n", " configuration = {\n", " \"num_samples\": num_samples,\n", " \"output_types\": [\"quantiles\", \"samples\"] if return_samples else [\"quantiles\"],\n", " \"quantiles\": quantiles,\n", " }\n", "\n", " http_request_data = {\"instances\": [instance], \"configuration\": configuration}\n", "\n", " return json.dumps(http_request_data).encode(\"utf-8\")\n", "\n", " def __decode_response(self, response, freq, prediction_time, return_samples):\n", " # we only sent one time series so we only receive one in return\n", " # however, if possible one will pass multiple time series as predictions will then be faster\n", " predictions = json.loads(response.decode(\"utf-8\"))[\"predictions\"][0]\n", " prediction_length = len(next(iter(predictions[\"quantiles\"].values())))\n", " prediction_index = pd.date_range(\n", " start=prediction_time, freq=freq, periods=prediction_length\n", " )\n", " if return_samples:\n", " dict_of_samples = {\"sample_\" + str(i): s for i, s in enumerate(predictions[\"samples\"])}\n", " else:\n", " dict_of_samples = {}\n", " return pd.DataFrame(\n", " data={**predictions[\"quantiles\"], **dict_of_samples}, index=prediction_index\n", " )\n", "\n", " def set_frequency(self, freq):\n", " self.freq = freq\n", "\n", "\n", "def encode_target(ts):\n", " return [x if np.isfinite(x) else \"NaN\" for x in ts]\n", "\n", "\n", "def series_to_dict(ts, cat=None, dynamic_feat=None):\n", " \"\"\"Given a pandas.Series object, returns a dictionary encoding the time series.\n", "\n", " ts -- a pands.Series object with the target time series\n", " cat -- an integer indicating the time series category\n", "\n", " Return value: a dictionary\n", " \"\"\"\n", " obj = {\"start\": str(ts.index[0]), \"target\": encode_target(ts)}\n", " if cat is not None:\n", " obj[\"cat\"] = cat\n", " if dynamic_feat is not None:\n", " obj[\"dynamic_feat\"] = dynamic_feat\n", " return obj" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor = estimator.deploy(\n", " initial_instance_count=1, instance_type=\"ml.m5.large\", predictor_cls=DeepARPredictor\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.predict(ts=timeseries[120], quantiles=[0.10, 0.5, 0.90]).head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot(\n", " predictor,\n", " target_ts,\n", " cat=None,\n", " dynamic_feat=None,\n", " forecast_date=end_training,\n", " show_samples=False,\n", " plot_history=7 * 12,\n", " confidence=80,\n", "):\n", " freq = target_ts.index.freq\n", " print(\n", " \"calling served model to generate predictions starting from {}\".format(str(forecast_date))\n", " )\n", " assert confidence > 50 and confidence < 100\n", " low_quantile = 0.5 - confidence * 0.005\n", " up_quantile = confidence * 0.005 + 0.5\n", "\n", " # we first construct the argument to call our model\n", " args = {\n", " \"ts\": target_ts[:forecast_date],\n", " \"return_samples\": show_samples,\n", " \"quantiles\": [low_quantile, 0.5, up_quantile],\n", " \"num_samples\": 100,\n", " }\n", "\n", " if dynamic_feat is not None:\n", " args[\"dynamic_feat\"] = dynamic_feat\n", " fig = plt.figure(figsize=(20, 6))\n", " ax = plt.subplot(2, 1, 1)\n", " else:\n", " fig = plt.figure(figsize=(20, 3))\n", " ax = plt.subplot(1, 1, 1)\n", "\n", " if cat is not None:\n", " args[\"cat\"] = cat\n", " ax.text(0.9, 0.9, \"cat = {}\".format(cat), transform=ax.transAxes)\n", "\n", " # call the end point to get the prediction\n", " prediction = predictor.predict(**args)\n", "\n", " # plot the samples\n", " if show_samples:\n", " for key in prediction.keys():\n", " if \"sample\" in key:\n", " prediction[key].plot(color=\"lightskyblue\", alpha=0.2, label=\"_nolegend_\")\n", "\n", " # plot the target\n", " target_section = target_ts[\n", " forecast_date - plot_history * freq : forecast_date + prediction_length * freq\n", " ]\n", " target_section.plot(color=\"black\", label=\"target\")\n", "\n", " # plot the confidence interval and the median predicted\n", " ax.fill_between(\n", " prediction[str(low_quantile)].index,\n", " prediction[str(low_quantile)].values,\n", " prediction[str(up_quantile)].values,\n", " color=\"b\",\n", " alpha=0.3,\n", " label=\"{}% confidence interval\".format(confidence),\n", " )\n", " prediction[\"0.5\"].plot(color=\"b\", label=\"P50\")\n", " ax.legend(loc=2)\n", "\n", " # fix the scale as the samples may change it\n", " ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)\n", "\n", " if dynamic_feat is not None:\n", " for i, f in enumerate(dynamic_feat, start=1):\n", " ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)\n", " feat_ts = pd.Series(\n", " index=pd.date_range(\n", " start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)\n", " ),\n", " data=f,\n", " )\n", " feat_ts[\n", " forecast_date - plot_history * freq : forecast_date + prediction_length * freq\n", " ].plot(ax=ax, color=\"g\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "style = {\"description_width\": \"initial\"}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@interact_manual(\n", " customer_id=IntSlider(min=0, max=369, value=91, style=style),\n", " forecast_day=IntSlider(min=0, max=100, value=51, style=style),\n", " confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),\n", " history_weeks_plot=IntSlider(min=1, max=20, value=1, style=style),\n", " show_samples=Checkbox(value=False),\n", " continuous_update=False,\n", ")\n", "def plot_interact(customer_id, forecast_day, confidence, history_weeks_plot, show_samples):\n", " plot(\n", " predictor,\n", " target_ts=timeseries[customer_id],\n", " forecast_date=end_training + datetime.timedelta(days=forecast_day),\n", " show_samples=show_samples,\n", " plot_history=history_weeks_plot * 12 * 7,\n", " confidence=confidence,\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_model()\n", "predictor.delete_endpoint()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Release resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%html\n", "\n", "

Shutting down your kernel for this notebook to release resources.

\n", "\n", " \n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.9 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "vscode": { "interpreter": { "hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49" } } }, "nbformat": 4, "nbformat_minor": 4 }