{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SageMaker/DeepAR demo on electricity dataset\n", "\n", "This notebook complements the [DeepAR introduction notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/deepar_synthetic/deepar_synthetic.ipynb). \n", "\n", "Here, we will consider a real use case and show how to use DeepAR on SageMaker for predicting energy consumption of 370 customers over time, based on a [dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) that was used in the academic papers [[1](https://media.nips.cc/nipsbooks/nipspapers/paper_files/nips29/reviews/526.html)] and [[2](https://arxiv.org/abs/1704.04110)]. \n", "\n", "In particular, we will see how to:\n", "* Prepare the dataset\n", "* Use the SageMaker Python SDK to train a DeepAR model and deploy it\n", "* Make requests to the deployed model to obtain forecasts interactively\n", "* Illustrate advanced features of DeepAR: missing values, additional time features, non-regular frequencies and category information\n", "\n", "For more information see the DeepAR [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) or [paper](https://arxiv.org/abs/1704.04110), \n", "\n", "### Lab time\n", "Running this notebook takes around 35 to 40 minutes on a ml.c4.2xlarge for the training, and inference is done on a ml.m4.xlarge (the usage time will depend on how long you leave your served model running)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import timeit\n", "start_time = timeit.default_timer()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import sys\n", "from urllib.request import urlretrieve\n", "import zipfile\n", "from dateutil.parser import parse\n", "import json\n", "from random import shuffle\n", "import random\n", "import datetime\n", "import os\n", "\n", "import boto3\n", "import s3fs\n", "import sagemaker\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "from __future__ import print_function\n", "from ipywidgets import interact, interactive, fixed, interact_manual\n", "import ipywidgets as widgets\n", "from ipywidgets import IntSlider, FloatSlider, Checkbox" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set random seeds for reproducibility\n", "np.random.seed(42)\n", "random.seed(42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before starting, we can override the default values for the following:\n", "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# s3_bucket = sagemaker.Session().default_bucket() # replace with an existing bucket if needed\n", "s3_bucket='' # customize to your bucket\n", "s3_prefix = 'deepar-electricity-demo-notebook' # prefix used for all data stored within the bucket\n", "\n", "role = sagemaker.get_execution_role() # IAM role to use by SageMaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "region = sagemaker_session.boto_region_name\n", "\n", "s3_data_path = \"s3://{}/{}/data\".format(s3_bucket, s3_prefix)\n", "s3_output_path = \"s3://{}/{}/output\".format(s3_bucket, s3_prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we configure the container image to be used for the region that we are running in." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, \"forecasting-deepar\", \"latest\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import electricity dataset and upload it to S3 to make it available for Sagemaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a first step, we need to download the original data set of from the UCI data set repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATA_HOST = \"https://archive.ics.uci.edu\"\n", "DATA_PATH = \"/ml/machine-learning-databases/00321/\"\n", "ARCHIVE_NAME = \"LD2011_2014.txt.zip\"\n", "FILE_NAME = '/tmp/' + ARCHIVE_NAME[:-4] # Modified to use '/tmp' directory, Pil 8thOct2018" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def progress_report_hook(count, block_size, total_size):\n", " mb = int(count * block_size // 1e6)\n", " if count % 500 == 0:\n", " sys.stdout.write(\"\\r{} MB downloaded\".format(mb))\n", " sys.stdout.flush()\n", "\n", "if not os.path.isfile(FILE_NAME):\n", " print(\"downloading dataset (258MB), can take a few minutes depending on your connection\")\n", " urlretrieve(DATA_HOST + DATA_PATH + ARCHIVE_NAME, '/tmp/' + ARCHIVE_NAME, reporthook=progress_report_hook)\n", "\n", " print(\"\\nextracting data archive\")\n", " zip_ref = zipfile.ZipFile('/tmp/' + ARCHIVE_NAME, 'r')\n", " zip_ref.extractall(\"/tmp\")\n", " zip_ref.close()\n", "else:\n", " print(\"File found skipping download\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, we load and parse the dataset and convert it to a collection of Pandas time series, which makes common time series operations such as indexing by time periods or resampling much easier. The data is originally recorded in 15min interval, which we could use directly. Here we want to forecast longer periods (one week) and resample the data to a granularity of 2 hours." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = pd.read_csv(FILE_NAME, sep=\";\", index_col=0, parse_dates=True, decimal=',')\n", "num_timeseries = data.shape[1]\n", "data_kw = data.resample('2H').sum() / 8\n", "timeseries = []\n", "for i in range(num_timeseries):\n", " timeseries.append(np.trim_zeros(data_kw.iloc[:,i], trim='f'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us plot the resulting time series for the first ten customers for the time period spanning the first two weeks of 2014." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, axs = plt.subplots(5, 2, figsize=(20, 20), sharex=True)\n", "axx = axs.ravel()\n", "for i in range(0, 10):\n", " timeseries[i].loc[\"2014-01-01\":\"2014-01-14\"].plot(ax=axx[i])\n", " axx[i].set_xlabel(\"date\") \n", " axx[i].set_ylabel(\"kW consumption\") \n", " axx[i].grid(which='minor', axis='x')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train and Test splits\n", "\n", "Often times one is interested in evaluating the model or tuning its hyperparameters by looking at error metrics on a hold-out test set. Here we split the available data into train and test sets for evaluating the trained model. For standard machine learning tasks such as classification and regression, one typically obtains this split by randomly separating examples into train and test sets. However, in forecasting it is important to do this train/test split based on time rather than by time series.\n", "\n", "In this example, we will reserve the last section of each of the time series for evalutation purpose and use only the first part as training data. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# we use 2 hour frequency for the time series\n", "freq = '2H'\n", "\n", "# we predict for 7 days\n", "prediction_length = 7 * 12\n", "\n", "# we also use 7 days as context length, this is the number of state updates accomplished before making predictions\n", "context_length = 7 * 12" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We specify here the portion of the data that is used for training: the model sees data from 2014-01-01 to 2014-09-01 for training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start_dataset = pd.Timestamp(\"2014-01-01 00:00:00\", freq=freq)\n", "end_training = pd.Timestamp(\"2014-09-01 00:00:00\", freq=freq)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The DeepAR JSON input format represents each time series as a JSON object. In the simplest case each time series just consists of a start time stamp (``start``) and a list of values (``target``). For more complex cases, DeepAR also supports the fields ``dynamic_feat`` for time-series features and ``cat`` for categorical features, which we will use later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_data = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": ts[start_dataset:end_training - 1].tolist() # We use -1, because pandas indexing includes the upper bound \n", " }\n", " for ts in timeseries\n", "]\n", "print(len(training_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As test data, we will consider time series extending beyond the training range: these will be used for computing test scores, by using the trained model to forecast their trailing 7 days, and comparing predictions with actual values.\n", "To evaluate our model performance on more than one week, we generate test data that extends to 1, 2, 3, 4 weeks beyond the training range. This way we perform *rolling evaluation* of our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "num_test_windows = 4\n", "\n", "test_data = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": ts[start_dataset:end_training + k * prediction_length].tolist()\n", " }\n", " for k in range(1, num_test_windows + 1) \n", " for ts in timeseries\n", "]\n", "print(len(test_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now write the dictionary to the `jsonlines` file format that DeepAR understands (it also supports gzipped jsonlines and parquet)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def write_dicts_to_file(path, data):\n", " with open(path, 'wb') as fp:\n", " for d in data:\n", " fp.write(json.dumps(d).encode(\"utf-8\"))\n", " fp.write(\"\\n\".encode('utf-8'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "write_dicts_to_file(\"/tmp/train.json\", training_data)\n", "write_dicts_to_file(\"/tmp/test.json\", test_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have the data files locally, let us copy them to S3 where DeepAR can access them. Depending on your connection, this may take a couple of minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = boto3.resource('s3')\n", "def copy_to_s3(local_file, s3_path, override=False):\n", " assert s3_path.startswith('s3://')\n", " split = s3_path.split('/')\n", " bucket = split[2]\n", " path = '/'.join(split[3:])\n", " buk = s3.Bucket(bucket)\n", " \n", " if len(list(buk.objects.filter(Prefix=path))) > 0:\n", " if not override:\n", " print('File s3://{}/{} already exists.\\nSet override to upload anyway.\\n'.format(s3_bucket, s3_path))\n", " return\n", " else:\n", " print('Overwriting existing file')\n", " with open(local_file, 'rb') as data:\n", " print('Uploading file to {}'.format(s3_path))\n", " buk.put_object(Key=path, Body=data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "copy_to_s3(\"/tmp/train.json\", s3_data_path + \"/train/train.json\")\n", "copy_to_s3(\"/tmp/test.json\", s3_data_path + \"/test/test.json\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's have a look to what we just wrote to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3filesystem = s3fs.S3FileSystem()\n", "with s3filesystem.open(s3_data_path + \"/train/train.json\", 'rb') as fp:\n", " print(fp.readline().decode(\"utf-8\")[:100] + \"...\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are all set with our dataset processing, we can now call DeepAR to train a model and generate predictions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a model\n", "\n", "Here we define the estimator that will launch the training job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator = sagemaker.estimator.Estimator(\n", " sagemaker_session=sagemaker_session,\n", " image_name=image_name,\n", " role=role,\n", " train_instance_count=1,\n", " train_instance_type='ml.c4.2xlarge',\n", " base_job_name='deepar-electricity-demo',\n", " output_path=s3_output_path\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we need to set the hyperparameters for the training job. For example frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options (number of epochs, batch size, learning rate...). We use default parameters for every optional parameter in this case (you can always use [Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune them)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hyperparameters = {\n", " \"time_freq\": freq,\n", " \"epochs\": \"400\",\n", " \"early_stopping_patience\": \"40\",\n", " \"mini_batch_size\": \"64\",\n", " \"learning_rate\": \"5E-4\",\n", " \"context_length\": str(context_length),\n", " \"prediction_length\": str(prediction_length)\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.set_hyperparameters(**hyperparameters)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are ready to launch the training job. SageMaker will start an EC2 instance, download the data from S3, start training the model and save the trained model.\n", "\n", "If you provide the `test` data channel as we do in this example, DeepAR will also calculate accuracy metrics for the trained model on this test. This is done by predicting the last `prediction_length` points of each time-series in the test set and comparing this to the actual value of the time-series. \n", "\n", "**Note:** the next cell may take a few minutes to complete, depending on data size, model complexity, training options." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "data_channels = {\n", " \"train\": \"{}/train/\".format(s3_data_path),\n", " \"test\": \"{}/test/\".format(s3_data_path)\n", "}\n", "\n", "estimator.fit(inputs=data_channels, wait=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since you pass a test set in this example, accuracy metrics for the forecast are computed and logged (see bottom of the log).\n", "You can find the definition of these metrics from [our documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html). You can use these to optimize the parameters and tune your model or use SageMaker's [Automated Model Tuning service](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune the model for you." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create endpoint and predictor" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have a trained model, we can use it to perform predictions by deploying it to an endpoint.\n", "\n", "**Note: Remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To query the endpoint and perform predictions, we can define the following utility class: this allows making requests using `pandas.Series` objects rather than raw JSON strings." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class DeepARPredictor(sagemaker.predictor.RealTimePredictor):\n", " \n", " def __init__(self, *args, **kwargs):\n", " super().__init__(*args, content_type=sagemaker.content_types.CONTENT_TYPE_JSON, **kwargs)\n", " \n", " def predict(self, ts, cat=None, dynamic_feat=None, \n", " num_samples=100, return_samples=False, quantiles=[\"0.1\", \"0.5\", \"0.9\"]):\n", " \"\"\"Requests the prediction of for the time series listed in `ts`, each with the (optional)\n", " corresponding category listed in `cat`.\n", " \n", " ts -- `pandas.Series` object, the time series to predict\n", " cat -- integer, the group associated to the time series (default: None)\n", " num_samples -- integer, number of samples to compute at prediction time (default: 100)\n", " return_samples -- boolean indicating whether to include samples in the response (default: False)\n", " quantiles -- list of strings specifying the quantiles to compute (default: [\"0.1\", \"0.5\", \"0.9\"])\n", " \n", " Return value: list of `pandas.DataFrame` objects, each containing the predictions\n", " \"\"\"\n", " prediction_time = ts.index[-1] + 1\n", " quantiles = [str(q) for q in quantiles]\n", " req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)\n", " res = super(DeepARPredictor, self).predict(req)\n", " return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)\n", " \n", " def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):\n", " instance = series_to_dict(ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None)\n", "\n", " configuration = {\n", " \"num_samples\": num_samples,\n", " \"output_types\": [\"quantiles\", \"samples\"] if return_samples else [\"quantiles\"],\n", " \"quantiles\": quantiles\n", " }\n", " \n", " http_request_data = {\n", " \"instances\": [instance],\n", " \"configuration\": configuration\n", " }\n", " \n", " return json.dumps(http_request_data).encode('utf-8')\n", " \n", " def __decode_response(self, response, freq, prediction_time, return_samples):\n", " # we only sent one time series so we only receive one in return\n", " # however, if possible one will pass multiple time series as predictions will then be faster\n", " predictions = json.loads(response.decode('utf-8'))['predictions'][0]\n", " prediction_length = len(next(iter(predictions['quantiles'].values())))\n", " prediction_index = pd.DatetimeIndex(start=prediction_time, freq=freq, periods=prediction_length) \n", " if return_samples:\n", " dict_of_samples = {'sample_' + str(i): s for i, s in enumerate(predictions['samples'])}\n", " else:\n", " dict_of_samples = {}\n", " return pd.DataFrame(data={**predictions['quantiles'], **dict_of_samples}, index=prediction_index)\n", "\n", " def set_frequency(self, freq):\n", " self.freq = freq\n", " \n", "def encode_target(ts):\n", " return [x if np.isfinite(x) else \"NaN\" for x in ts] \n", "\n", "def series_to_dict(ts, cat=None, dynamic_feat=None):\n", " \"\"\"Given a pandas.Series object, returns a dictionary encoding the time series.\n", "\n", " ts -- a pands.Series object with the target time series\n", " cat -- an integer indicating the time series category\n", "\n", " Return value: a dictionary\n", " \"\"\"\n", " obj = {\"start\": str(ts.index[0]), \"target\": encode_target(ts)}\n", " if cat is not None:\n", " obj[\"cat\"] = cat\n", " if dynamic_feat is not None:\n", " obj[\"dynamic_feat\"] = dynamic_feat \n", " return obj" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can deploy the model and create and endpoint that can be queried using our custom DeepARPredictor class." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor = estimator.deploy(\n", " initial_instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " predictor_cls=DeepARPredictor)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make predictions and plot results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can use the `predictor` object to generate predictions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.predict(ts=timeseries[120], quantiles=[0.10, 0.5, 0.90]).head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below we define a plotting function that queries the model and displays the forecast." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot(\n", " predictor, \n", " target_ts, \n", " cat=None, \n", " dynamic_feat=None, \n", " forecast_date=end_training, \n", " show_samples=False, \n", " plot_history=7 * 12,\n", " confidence=80\n", "):\n", " print(\"calling served model to generate predictions starting from {}\".format(str(forecast_date)))\n", " assert(confidence > 50 and confidence < 100)\n", " low_quantile = 0.5 - confidence * 0.005\n", " up_quantile = confidence * 0.005 + 0.5\n", " \n", " # we first construct the argument to call our model\n", " args = {\n", " \"ts\": target_ts[:forecast_date],\n", " \"return_samples\": show_samples,\n", " \"quantiles\": [low_quantile, 0.5, up_quantile],\n", " \"num_samples\": 100\n", " }\n", "\n", "\n", " if dynamic_feat is not None:\n", " args[\"dynamic_feat\"] = dynamic_feat\n", " fig = plt.figure(figsize=(20, 6))\n", " ax = plt.subplot(2, 1, 1)\n", " else:\n", " fig = plt.figure(figsize=(20, 3))\n", " ax = plt.subplot(1,1,1)\n", " \n", " if cat is not None:\n", " args[\"cat\"] = cat\n", " ax.text(0.9, 0.9, 'cat = {}'.format(cat), transform=ax.transAxes)\n", "\n", " # call the end point to get the prediction\n", " prediction = predictor.predict(**args)\n", "\n", " # plot the samples\n", " if show_samples: \n", " for key in prediction.keys():\n", " if \"sample\" in key:\n", " prediction[key].plot(color='lightskyblue', alpha=0.2, label='_nolegend_')\n", " \n", " \n", " # plot the target\n", " target_section = target_ts[forecast_date-plot_history:forecast_date+prediction_length]\n", " target_section.plot(color=\"black\", label='target')\n", " \n", " # plot the confidence interval and the median predicted\n", " ax.fill_between(\n", " prediction[str(low_quantile)].index, \n", " prediction[str(low_quantile)].values, \n", " prediction[str(up_quantile)].values, \n", " color=\"b\", alpha=0.3, label='{}% confidence interval'.format(confidence)\n", " )\n", " prediction[\"0.5\"].plot(color=\"b\", label='P50')\n", " ax.legend(loc=2) \n", " \n", " # fix the scale as the samples may change it\n", " ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)\n", " \n", " if dynamic_feat is not None:\n", " for i, f in enumerate(dynamic_feat, start=1):\n", " ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)\n", " feat_ts = pd.Series(\n", " index=pd.DatetimeIndex(start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)),\n", " data=f\n", " )\n", " feat_ts[forecast_date-plot_history:forecast_date+prediction_length].plot(ax=ax, color='g')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can interact with the function previously defined, to look at the forecast of any customer at any point in (future) time. \n", "\n", "For each request, the predictions are obtained by calling our served model on the fly.\n", "\n", "Here we forecast the consumption of an office after week-end (note the lower week-end consumption). \n", "You can select any time series and any forecast date, just click on `Run Interact` to generate the predictions from our served endpoint and see the plot." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "style = {'description_width': 'initial'}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@interact_manual(\n", " customer_id=IntSlider(min=0, max=369, value=91, style=style), \n", " forecast_day=IntSlider(min=0, max=100, value=51, style=style),\n", " confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),\n", " history_weeks_plot=IntSlider(min=1, max=20, value=1, style=style),\n", " show_samples=Checkbox(value=False),\n", " continuous_update=False\n", ")\n", "def plot_interact(customer_id, forecast_day, confidence, history_weeks_plot, show_samples):\n", " plot(\n", " predictor,\n", " target_ts=timeseries[customer_id],\n", " forecast_date=end_training + datetime.timedelta(days=forecast_day),\n", " show_samples=show_samples,\n", " plot_history=history_weeks_plot * 12 * 7,\n", " confidence=confidence\n", " )" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Additional features\n", "\n", "We have seen how to prepare a dataset and run DeepAR for a simple example.\n", "\n", "In addition DeepAR supports the following features:\n", "\n", "* missing values: DeepAR can handle missing values in the time series during training as well as for inference.\n", "* Additional time features: DeepAR provides a set default time series features such as hour of day etc. However, you can provide additional feature time series via the `dynamic_feat` field. \n", "* generalize frequencies: any integer multiple of the previously supported base frequencies (minutes `min`, hours `H`, days `D`, weeks `W`, month `M`) are now allowed; e.g., `15min`. We already demonstrated this above by using `2H` frequency.\n", "* categories: If your time series belong to different groups (e.g. types of product, regions, etc), this information can be encoded as one or more categorical features using the `cat` field.\n", "\n", "We will now demonstrate the missing values and time features support. For this part we will reuse the electricity dataset but will do some artificial changes to demonstrate the new features: \n", "* We will randomly mask parts of the time series to demonstrate the missing values support.\n", "* We will include a \"special-day\" that occurs at different days for different time series during this day we introduce a strong up-lift\n", "* We train the model on this dataset giving \"special-day\" as a custom time series feature" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As discussed above we will create a \"special-day\" feature and create an up-lift for the time series during this day. This simulates real world application where you may have things like promotions of a product for a certain time or a special event that influences your time series. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_special_day_feature(ts, fraction=0.05):\n", " # First select random day indices (plus the forecast day)\n", " num_days = (ts.index[-1] - ts.index[0]).days\n", " rand_indices = list(np.random.randint(0, num_days, int(num_days * 0.1))) + [num_days]\n", " \n", " feature_value = np.zeros_like(ts)\n", " for i in rand_indices:\n", " feature_value[i * 12: (i + 1) * 12] = 1.0\n", " feature = pd.Series(index=ts.index, data=feature_value)\n", " return feature\n", "\n", "def drop_at_random(ts, drop_probability=0.1):\n", " assert(0 <= drop_probability < 1)\n", " random_mask = np.random.random(len(ts)) < drop_probability\n", " return ts.mask(random_mask)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "special_day_features = [create_special_day_feature(ts) for ts in timeseries]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now create the up-lifted time series and randomly remove time points.\n", "\n", "The figures below show some example time series and the `special_day` feature value in green. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "timeseries_uplift = [ts * (1.0 + feat) for ts, feat in zip(timeseries, special_day_features)]\n", "time_series_processed = [drop_at_random(ts) for ts in timeseries_uplift]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, axs = plt.subplots(5, 2, figsize=(20, 20), sharex=True)\n", "axx = axs.ravel()\n", "for i in range(0, 10):\n", " ax = axx[i]\n", " ts = time_series_processed[i][:400]\n", " ts.plot(ax=ax)\n", " ax.set_ylim(-0.1 * ts.max(), ts.max())\n", " ax2 = ax.twinx()\n", " special_day_features[i][:400].plot(ax=ax2, color='g')\n", " ax2.set_ylim(-0.2, 7)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "training_data_new_features = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": encode_target(ts[start_dataset:end_training]),\n", " \"dynamic_feat\": [special_day_features[i][start_dataset:end_training].tolist()]\n", " }\n", " for i, ts in enumerate(time_series_processed)\n", "]\n", "print(len(training_data_new_features))\n", "\n", "# as in our previous example, we do a rolling evaluation over the next 7 days\n", "num_test_windows = 7\n", "\n", "test_data_new_features = [\n", " {\n", " \"start\": str(start_dataset),\n", " \"target\": encode_target(ts[start_dataset:end_training + 2*k*prediction_length]),\n", " \"dynamic_feat\": [special_day_features[i][start_dataset:end_training + 2*k*prediction_length].tolist()]\n", " }\n", " for k in range(1, num_test_windows + 1) \n", " for i, ts in enumerate(timeseries_uplift)\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def check_dataset_consistency(train_dataset, test_dataset=None):\n", " d = train_dataset[0]\n", " has_dynamic_feat = 'dynamic_feat' in d\n", " if has_dynamic_feat:\n", " num_dynamic_feat = len(d['dynamic_feat'])\n", " has_cat = 'cat' in d\n", " if has_cat:\n", " num_cat = len(d['cat'])\n", " \n", " def check_ds(ds):\n", " for i, d in enumerate(ds):\n", " if has_dynamic_feat:\n", " assert 'dynamic_feat' in d\n", " assert num_dynamic_feat == len(d['dynamic_feat'])\n", " for f in d['dynamic_feat']:\n", " assert len(d['target']) == len(f)\n", " if has_cat:\n", " assert 'cat' in d\n", " assert len(d['cat']) == num_cat\n", " check_ds(train_dataset)\n", " if test_dataset is not None:\n", " check_ds(test_dataset)\n", " \n", "check_dataset_consistency(training_data_new_features, test_data_new_features)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "write_dicts_to_file(\"/tmp/train_new_features.json\", training_data_new_features)\n", "write_dicts_to_file(\"/tmp/test_new_features.json\", test_data_new_features)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "s3_data_path_new_features = \"s3://{}/{}-new-features/data\".format(s3_bucket, s3_prefix)\n", "s3_output_path_new_features = \"s3://{}/{}-new-features/output\".format(s3_bucket, s3_prefix)\n", "\n", "print('Uploading to S3 this may take a few minutes depending on your connection.')\n", "copy_to_s3(\"/tmp/train_new_features.json\", s3_data_path_new_features + \"/train/train_new_features.json\", override=True)\n", "copy_to_s3(\"/tmp/test_new_features.json\", s3_data_path_new_features + \"/test/test_new_features.json\", override=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "estimator_new_features = sagemaker.estimator.Estimator(\n", " sagemaker_session=sagemaker_session,\n", " image_name=image_name,\n", " role=role,\n", " train_instance_count=1,\n", " train_instance_type='ml.c4.2xlarge',\n", " base_job_name='deepar-electricity-demo-new-features',\n", " output_path=s3_output_path_new_features\n", ")\n", "\n", "hyperparameters = {\n", " \"time_freq\": freq,\n", " \"context_length\": str(context_length),\n", " \"prediction_length\": str(prediction_length),\n", " \"epochs\": \"400\",\n", " \"learning_rate\": \"5E-4\",\n", " \"mini_batch_size\": \"64\",\n", " \"early_stopping_patience\": \"40\",\n", " \"num_dynamic_feat\": \"auto\", # this will use the `dynamic_feat` field if it's present in the data\n", "}\n", "estimator_new_features.set_hyperparameters(**hyperparameters)\n", "\n", "estimator_new_features.fit(\n", " inputs={\n", " \"train\": \"{}/train/\".format(s3_data_path_new_features),\n", " \"test\": \"{}/test/\".format(s3_data_path_new_features)\n", " }, \n", " wait=True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As before, we spawn an endpoint to visualize our forecasts on examples we send on the fly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "predictor_new_features = estimator_new_features.deploy(\n", " initial_instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " predictor_cls=DeepARPredictor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_id = 120\n", "predictor_new_features.predict(\n", " ts=time_series_processed[customer_id][:-prediction_length], \n", " dynamic_feat=[special_day_features[customer_id].tolist()], \n", " quantiles=[0.1, 0.5, 0.9]\n", ").head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As before, we can query the endpoint to see predictions for arbitrary time series and time points." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@interact_manual(\n", " customer_id=IntSlider(min=0, max=369, value=13, style=style), \n", " forecast_day=IntSlider(min=0, max=100, value=21, style=style),\n", " confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),\n", " missing_ratio=FloatSlider(min=0.0, max=0.95, value=0.2, step=0.05, style=style),\n", " show_samples=Checkbox(value=False),\n", " continuous_update=False\n", ")\n", "def plot_interact(customer_id, forecast_day, confidence, missing_ratio, show_samples): \n", " forecast_date = end_training + datetime.timedelta(days=forecast_day)\n", " target = time_series_processed[customer_id][start_dataset:forecast_date + prediction_length]\n", " target = drop_at_random(target, missing_ratio)\n", " dynamic_feat = [special_day_features[customer_id][start_dataset:forecast_date + prediction_length].tolist()]\n", " plot(\n", " predictor_new_features,\n", " target_ts=target, \n", " dynamic_feat=dynamic_feat,\n", " forecast_date=forecast_date,\n", " show_samples=show_samples, \n", " plot_history=7*12,\n", " confidence=confidence\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete endpoints" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_new_features.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# code you want to evaluate\n", "elapsed = timeit.default_timer() - start_time\n", "print(elapsed/60)" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "notice": "Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 2 }