{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Household energy consumption forecast\n", "#### *- Leveraging Amazon Forecast API -*\n", "---\n", "\n", "Previously, we leveraged the Amazon Forecast console to predict household energy consumption. We are now going to leverage the high level APIs from this service to achieve the same: **this notebook takes approximately an hour to run.**\n", "\n", "The overall process for using Amazon Forecast is the following:\n", "\n", "1. Create a **Dataset Group**, this is the large box that isolates models and the data they are trained on from each other. You can see that as an independant forecasting \"project\".\n", "1. Create a **Dataset**: in Forecast there are 3 types of dataset, *Target Time Series*, *Related Time Series*, and *Item Metadata*. The *Target Time Series* is required, the others provide additional context that certain algorithms can leverage.\n", "1. **Import data**, this moves the information from S3 into a storage volume where the data can be used for training and validation. You can see this as the ingestion process into the Forecast dataset.\n", "1. **Train a model**, Forecast automates this process for you but you can also select particular algorithms, and you can provide your own hyper parameters or use Hyper Parameter Optimization (HPO) to determine the most performant values for your data.\n", "1. **Deploy a Predictor**, here you are deploying your model so you can use it to generate a forecast.\n", "1. **Query the Forecast**, given a request bounded by time for an item, return the forecast for it. Once you have this you can evaluate its performance or use it to guide your decisions about the future.\n", "\n", "In this notebook we will be walking through the first 3 steps outlined above. One additional task that will be done here is to trim part of our training and validation data so that we can measure the accuracy of a forecast against our predictions.\n", "\n", "## Table Of Contents\n", "* **Preparation:**\n", " * Setup\n", " * Data Preparation\n", " * Creating the Dataset Group and Dataset\n", "* **Building a predictor:**\n", " * Create a Predictor\n", " * Deploy a Predictor\n", " * Obtain a Forecast\n", "* **Evaluating your forecast:** now is the time to pull down the predictions from this Predictor, and compare them to the actual observed values:\n", " * Obtaining a Prediction\n", " * Plotting the Actual Results\n", " * Plotting the Prediction\n", " * Comparing the Prediction to Actual Results\n", "* **Cleanup:** after building completing the notebooks you may want to delete the following to prevent any unwanted charges:\n", " * Forecasts\n", " * Predictors\n", " * Datasets\n", " * Dataset Groups\n", "\n", "For more informations about Amazon Forecast APIs, please check the [documentation](https://docs.aws.amazon.com/forecast/latest/dg/what-is-forecast.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Preparation\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## IAM role and authorizations\n", "For the purpose of this sample, you need to make sure this SageMaker instance has the following authorizations:\n", "* `IAMFullAccess`\n", "* `AmazonForecastFullAccess`\n", "* `AmazonS3FullAccess`\n", "* `AmazonSageMakerFullAccess`\n", "\n", "Browse to the IAM Console and check that the role your SageMaker instance runs with if configured like this:\n", "\n", "![Datasets](pictures/iam-policy.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The **PassRole** policy visible above is configured as an inline policy:\n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"iam:PassRole\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": \"*\"\n", " }\n", " ]\n", "}\n", "```\n", "\n", "Last but not least, also make sure that the **trust relationships** of this same role includes at least the following:\n", "\n", "![Datasets](pictures/iam-trust-relationship.png)\n", "\n", "You can click on the **Edit trust relationship** button as seen on the picture above and fill in the following JSON document:\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [{\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": [\n", " \"sagemaker.amazonaws.com\",\n", " \"forecast.amazonaws.com\",\n", " \"s3.amazonaws.com\"\n", " ]\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }]\n", "}\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import the standard Python libraries that are used in this lesson." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import botocore.exceptions\n", "import dateutil.parser\n", "import json\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import os\n", "import pandas as pd\n", "import sagemaker\n", "import sys\n", "import time\n", "\n", "from datetime import datetime\n", "from sagemaker import get_execution_role\n", "\n", "%matplotlib inline\n", "plt.style.use('Solarize_Light2')\n", "prop_cycle = plt.rcParams['axes.prop_cycle']\n", "colors = prop_cycle.by_key()['color']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The next part of the setup process is to validate that your account can communicate with Amazon Forecast, the cell below does just that. We also configure the S3 bucket name and region name for this notebook:\n", "- If you don't have an S3 bucket, create it first on S3 or use the default bucket attached to this notebook\n", "- Although we have set the region to eu-west-1 as a default value below, you can choose any of the regions that the service is available in." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "BUCKET = sagemaker.Session().default_bucket()\n", "PREFIX = 'forecast-workshop'\n", "region = 'eu-west-1'\n", "\n", "session = boto3.Session(region_name=region) \n", "forecast = session.client(service_name='forecast') \n", "forecastquery = session.client(service_name='forecastquery')\n", "role = get_execution_role()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Paths to local data:\n", "DATA = 'data'\n", "PROCESSED_DATA = 'data/processed'\n", "\n", "os.makedirs(PROCESSED_DATA, exist_ok=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Preparation\n", "In the Data ingestion & EDA notebook, we already stored the hourly consumption of our households in a CSV file, we will load it here and prepare an appropriate input for Forecast. To begin, use Pandas to read the CSV and to show a sample of the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hourly_df = pd.read_csv(os.path.join(DATA, 'electricityusagedata.csv'), header=None, parse_dates=[0])\n", "hourly_df.columns = ['timestamp', 'energy', 'client_id']\n", "hourly_df = hourly_df.set_index('timestamp')\n", "hourly_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice in the output above there are 3 columns of data:\n", "\n", "1. The **Timestamp** (timestamp)\n", "1. A **Value** (energy)\n", "1. An **Item** (client_id)\n", "\n", "These are the 3 key required pieces of information to generate a forecast with Amazon Forecast. More can be added but these 3 must always remain present. Note that we don't have to rename our columns, as the data will be pushed without headers nor indexes to Forecast. This is just done for convenience in this notebook." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's have a look at one of the time series: feel free to change the `client_id` in the next cell to explore other inputs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client_df = hourly_df[hourly_df['client_id'] == 'client_1']\n", "daily_rolling_average = client_df['energy'].rolling(window=24).mean()\n", "\n", "fig = plt.figure(figsize=(24,6))\n", "plt.plot(client_df['energy'], alpha=0.8, label='Energy consumption')\n", "\n", "# Adding daily rolling average:\n", "plt.plot(daily_rolling_average.index, daily_rolling_average, alpha=0.5, color='white', linewidth=4)\n", "plt.plot(daily_rolling_average.index, daily_rolling_average, color=colors[5], linewidth=1.5, label='Rolling average')\n", "\n", "plt.title(r'Energy consumption for $\\bf{client\\_1}$', fontsize=16)\n", "plt.xlabel('Time')\n", "plt.ylabel('Energy consumption (kWh)')\n", "plt.legend(fontsize=14)\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(np.min(hourly_df.index))\n", "print(np.max(hourly_df.index))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset happens to span January 01, 2014 to January 01, 2015. We will use the following train test split:\n", "\n", "* Training start: 2014-01-01 01:00:00\n", "* Training end: 2014-09-30 23:00:00\n", "* Testing start: 2014-10-01 00:00:00\n", "* Testing end: 2015-01-01 00:00:00\n", "\n", "Let's store these two ranges in different CSV files:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_start = pd.to_datetime('2014-01-01 01:00:00')\n", "training_end = pd.to_datetime('2014-09-30 23:00:00')\n", "testing_start = pd.to_datetime('2014-10-01 00:00:00')\n", "testing_end = pd.to_datetime('2015-01-01 00:00:00')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_df = hourly_df[training_start:training_end].reset_index()\n", "testing_df = hourly_df[testing_start:testing_end].reset_index()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's export them to CSV files and place them into our `data/processed` folder: Amazon Forecast expects your CSV file to not include any header or index:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_df.to_csv(f\"{PROCESSED_DATA}/train.csv\", header=False, index=False)\n", "testing_df.to_csv(f\"{PROCESSED_DATA}/test.csv\", header=False, index=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!ls -lh $PROCESSED_DATA/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this time the data is ready to be sent to S3 where Forecast will use it later. The following cells will upload the data to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "KEY = f'{PREFIX}/train.csv'\n", "boto3.Session().resource('s3').Bucket(BUCKET).Object(KEY).upload_file(f'{PROCESSED_DATA}/train.csv')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating the Dataset Group and Dataset \n", "\n", "In Amazon Forecast , a dataset is a collection of file(s) which contain data that is relevant for a forecasting task. A dataset must conform to a schema provided by Amazon Forecast. \n", "\n", "More details about `Domain` and dataset type can be found on the [documentation](https://docs.aws.amazon.com/forecast/latest/dg/howitworks-domains-ds-types.html) . For this example, we are using [CUSTOM](https://docs.aws.amazon.com/forecast/latest/dg/custom-domain.html) domain with 3 required attributes `timestamp`, `target_value` and `item_id`.\n", "\n", "It is important to also convey how Amazon Forecast can understand your time series information. The cell immediately below does that, the next one configures your variable names for the Project, DatasetGroup, and Dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATASET_FREQUENCY = \"H\" \n", "TIMESTAMP_FORMAT = \"yyyy-MM-dd hh:mm:ss\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project = 'energy_consumption_forecast'\n", "datasetName = project + '_ds'\n", "datasetGroupName = project + '_dsg'\n", "s3DataPath = \"s3://\" + BUCKET + \"/\" + KEY" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Dataset Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Let's try to create a dataset group:\n", "try:\n", " create_dataset_group_response = forecast.create_dataset_group(\n", " DatasetGroupName=datasetGroupName,\n", " Domain=\"CUSTOM\",\n", " )\n", " datasetGroupArn = create_dataset_group_response['DatasetGroupArn']\n", " \n", "except Exception as e:\n", " error_code = e.response['Error']['Code']\n", " \n", " # If the dataset group already exists, we get its ARN:\n", " if (error_code == 'ResourceAlreadyExistsException'):\n", " print('A dataset group with this name already exists, you can use it to create and ingest new datasets')\n", " \n", " # List all the existing dataset groups:\n", " forecast_dsg_list = forecast.list_dataset_groups()\n", "\n", " # Loop through all the Forecast dataset groups:\n", " for dsg in forecast_dsg_list['DatasetGroups']:\n", " # Get the project name (the string after the first delimiter in the ARN)\n", " dsg_name = dsg['DatasetGroupArn'].split('/')[1]\n", "\n", " # Once we find it, we store the ARN and break out of the loop:\n", " if (dsg_name == datasetGroupName):\n", " datasetGroupArn = dsg['DatasetGroupArn']\n", " break\n", " \n", " else:\n", " raise\n", " \n", "print(f'- Dataset group name: {datasetGroupName}')\n", "print(f'- Dataset group ARN: {datasetGroupArn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we browse to the console, we will see a new DatasetGroup has been created:\n", "\n", "![Dataset Group](pictures/api-dataset-group.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also use the API to get the metadata associated to this object:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset_group(DatasetGroupArn=datasetGroupArn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now define the schema: valid values for AttributeType are `string`, `integer`, `float` or `timestamp`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "schema = {\n", " \"Attributes\": [\n", " { \"AttributeName\": \"timestamp\", \"AttributeType\": \"timestamp\" },\n", " { \"AttributeName\": \"target_value\", \"AttributeType\": \"float\" },\n", " { \"AttributeName\": \"item_id\", \"AttributeType\": \"string\" }\n", " ]\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that we are using the **same order** than the columns of the training dataset uploaded earlier: here, our `target_value` is the `energy` field and the `item_id` is the `client_id` field:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time \n", "\n", "try:\n", " response = forecast.create_dataset(\n", " Domain='CUSTOM',\n", " DatasetType='TARGET_TIME_SERIES',\n", " DatasetName=datasetName,\n", " DataFrequency=DATASET_FREQUENCY, \n", " Schema=schema\n", " )\n", " datasetArn = response['DatasetArn']\n", " \n", "except Exception as e:\n", " error_code = e.response['Error']['Code']\n", "\n", " # If the dataset group already exists, we get its ARN:\n", " if (error_code == 'ResourceAlreadyExistsException'):\n", " print('A dataset with this name already exists, you can use it to ingest new data into it:')\n", " \n", " # List all the existing datasets:\n", " forecast_ds_list = forecast.list_datasets()\n", "\n", " # Loop through all the Forecast datasets:\n", " for ds in forecast_ds_list['Datasets']:\n", " # Get the project name (the string after the first delimiter in the ARN)\n", " ds_name = ds['DatasetArn'].split('/')[1]\n", "\n", " # Once we find it, we store the ARN and break out of the loop:\n", " if (ds_name == datasetName):\n", " datasetArn = ds['DatasetArn']\n", " break\n", " \n", " else:\n", " raise\n", " \n", "print(f'- Dataset name: {datasetName}')\n", "print(f'- Dataset ARN: {datasetArn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also use the API to get the metadata associated to this object: we can confirm that this is a target time series and that its frequency is hourly which is consistent with the training set we uploaded:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset(DatasetArn=datasetArn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add Dataset to Dataset Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.update_dataset_group(DatasetGroupArn=datasetGroupArn, DatasetArns=[datasetArn])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we browse to the console, we will see a new dataset is now attached to our dataset group. Its status is **active** but the latest import status mentions **Not uploaded**:\n", "\n", "![Datasets](pictures/api-datasets.png)" ] }, { "cell_type": "markdown", "metadata": { "toc-hr-collapsed": false }, "source": [ "### Create IAM Role for Forecast\n", "\n", "Like many AWS services, Forecast will need to assume an IAM role in order to interact with your S3 resources securely. In this sample notebooks, we use the `get_or_create_iam_role()` utility function to create an IAM role:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_or_create_iam_role(role_name):\n", " iam = boto3.client(\"iam\")\n", "\n", " assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"forecast.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", " }\n", "\n", " try:\n", " create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", " )\n", " role_arn = create_role_response[\"Role\"][\"Arn\"]\n", " print(\"Created\", role_arn)\n", " \n", " except iam.exceptions.EntityAlreadyExistsException:\n", " print(\"The role \" + role_name + \" exists, ignore to create it\")\n", " role_arn = boto3.resource('iam').Role(role_name).arn\n", "\n", " print(\"Attaching policies\")\n", "\n", " iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = \"arn:aws:iam::aws:policy/AmazonForecastFullAccess\"\n", " )\n", "\n", " iam.attach_role_policy(\n", " RoleName=role_name,\n", " PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',\n", " )\n", "\n", " print(\"Waiting for a minute to allow IAM role policy attachment to propagate\")\n", " time.sleep(60)\n", "\n", " print(\"Done.\")\n", " return role_arn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the role to provide to Amazon Forecast.\n", "role_name = \"ForecastNotebookRole-EnergyConsumptionPrediction\"\n", "role_arn = get_or_create_iam_role(role_name=role_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Data Import Job\n", "\n", "\n", "Now that Forecast knows how to understand the CSV we are providing, the next step is to import the data from S3 into Amazon Forecast." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "datasetImportJobName = 'energy_consumption_dataset_import_job'\n", "ds_import_job_response = forecast.create_dataset_import_job(\n", " DatasetImportJobName=datasetImportJobName,\n", " DatasetArn=datasetArn,\n", " DataSource= {\n", " \"S3Config\" : {\n", " \"Path\": s3DataPath,\n", " \"RoleArn\": role_arn\n", " }\n", " },\n", " TimestampFormat=TIMESTAMP_FORMAT\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ds_import_job_arn=ds_import_job_response['DatasetImportJobArn']\n", "print(ds_import_job_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the status of dataset, when the status change from **CREATE_IN_PROGRESS** to **ACTIVE**, we can continue to next steps. Depending on the data size. It can take 10 mins to be **ACTIVE**. This process will take 5 to 10 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " status = forecast.describe_dataset_import_job(DatasetImportJobArn=ds_import_job_arn)['Status']\n", " print(str(pd.to_datetime(datetime.now()))[:19], \"| Data ingestion:\", status)\n", " \n", " if status in ('ACTIVE', 'CREATE_FAILED'): break\n", " time.sleep(60)\n", "\n", "print(status)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset_import_job(DatasetImportJobArn=ds_import_job_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point you have successfully imported your data into Amazon Forecast and now it is time to get started to build your first model. The import status now mentions **Active**:\n", "\n", "![Datasets](pictures/api-datasets-active.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Building a predictor\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a Predictor\n", "\n", "Your data was imported to be used by Amazon Forecast, here we will once again define our dataset information and then start building a model (or **predictor** in Amazon Forecast own words)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Forecast horizon` is the number of time points to predict in the future. For weekly data, a value of 12 means 12 weeks. Our example is hourly data, we would like to forecast what happens the next day, so we can set this value to 24 (meaning 24 hours).\n", "\n", "Every Amazon Forecast predictor uses an **algorithm** to train a model, then uses the **trained model** to make a **forecast** using an input dataset group. To help you get started, Amazon Forecast provides the following predefined algorithms:\n", "\n", "* CNN-QR: `arn:aws:forecast:::algorithm/CNN-QR`\n", "* DeepAR+: `arn:aws:forecast:::algorithm/Deep_AR_Plus`\n", "* Prophet: `arn:aws:forecast:::algorithm/Prophet`\n", "* Exponential Smoothing (ETS): `arn:aws:forecast:::algorithm/ETS`\n", "* Autoregressive Integrated Moving Average (ARIMA): `arn:aws:forecast:::algorithm/ARIMA`\n", "* Non-Parametric Time Series (NPTS): `arn:aws:forecast:::algorithm/NPTS`\n", "\n", "For more details about the different algorithms and how to choose them, check the [documentation](https://docs.aws.amazon.com/forecast/latest/dg/aws-forecast-choosing-recipes.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Amazon Forecast DeepAR+** is a supervised learning algorithm for forecasting scalar (one-dimensional) time series using recurrent neural networks (RNNs). Classical forecasting methods, such as *autoregressive integrated moving average* (ARIMA) or *exponential smoothing* (ETS), fit a single model to each individual time series, and then use that model to extrapolate the time series into the future.\n", "\n", "In many applications, however, you have many similar time series across a set of cross-sectional units. These time-series can relate to different clients, products, server loads, or requests for web pages. In this case, it can be beneficial to train a single model jointly over all of the time series. DeepAR+ takes this approach. **When your dataset contains hundreds of feature time series, the DeepAR+ algorithm outperforms the standard ARIMA and ETS methods**. You can also use the trained model for generating forecasts for new time series that are similar to the ones it has been trained on (**cold start** for never-seen before signals).\n", "\n", "In the case of our energy dataset, we only have 10 different households:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(training_df['client_id'].unique())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given this small number of time series, we will select ARIMA, which we will do in the following cell:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictorName= project + '_arima'\n", "forecastHorizon = 24\n", "algorithmArn = 'arn:aws:forecast:::algorithm/ARIMA'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now create our **Predictor** by training a model on the data provided to Amazon Forecast. In the background, Amazon Forecast cut the most recent data to build a train / validation split. The model is trained on the new training split and the performance metrics are evaluated against the new validation split." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " create_predictor_response = forecast.create_predictor(\n", " PredictorName = predictorName, \n", " AlgorithmArn = algorithmArn,\n", " ForecastHorizon = forecastHorizon,\n", " PerformAutoML = False,\n", " PerformHPO = False,\n", " EvaluationParameters= {\n", " \"NumberOfBacktestWindows\": 1, \n", " \"BackTestWindowOffset\": 24\n", " }, \n", " InputDataConfig = {\"DatasetGroupArn\": datasetGroupArn},\n", " FeaturizationConfig = {\n", " \"ForecastFrequency\": \"H\", \n", " \"Featurizations\": [{\n", " \"AttributeName\": \"target_value\", \n", " \"FeaturizationPipeline\": [{\n", " \"FeaturizationMethodName\": \"filling\", \n", " \"FeaturizationMethodParameters\": {\n", " \"frontfill\": \"none\", \n", " \"middlefill\": \"zero\", \n", " \"backfill\": \"zero\"\n", " }\n", " }]\n", " }]\n", " }\n", " )\n", "\n", " predictor_arn = create_predictor_response['PredictorArn']\n", " \n", "except Exception as e:\n", " error_code = e.response['Error']['Code']\n", "\n", " # If the predictor already exists, we get its ARN:\n", " if (error_code == 'ResourceAlreadyExistsException'):\n", " print('A predictor with this name already exists, you can query it to check its status or request a forecast:')\n", " \n", " # List all the existing predictors:\n", " forecast_pred_list = forecast.list_predictors()\n", "\n", " # Loop through all the Forecast predictors:\n", " for predictor in forecast_pred_list['Predictors']:\n", " # Get the project name (the string after the first delimiter in the ARN)\n", " predictor_name = predictor['PredictorArn'].split('/')[1]\n", "\n", " # Once we find it, we store the ARN and break out of the loop:\n", " if (predictor_name == predictorName):\n", " predictor_arn = predictor['PredictorArn']\n", " break\n", " \n", " else:\n", " raise\n", " \n", "print(f'- Predictor name: {predictorName}')\n", "print(f'- Predictor ARN: {predictor_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our predictor (model) is now training as we can see in the console:\n", "\n", "![Datasets](pictures/api-predictor-training.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the status of the predictor: when the status change from **CREATE_IN_PROGRESS** to **ACTIVE**, we can continue to next steps. Depending on data size, model selection and hyper parameters,it can take 10 mins to more than one hour to be **ACTIVE**. This is still a training task so for this model, it will take around 15 minutes:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " status = forecast.describe_predictor(PredictorArn=predictor_arn)['Status']\n", " print(str(pd.to_datetime(datetime.now()))[:19], \"| Model training:\", status)\n", " \n", " if status in ('ACTIVE', 'CREATE_FAILED'): break\n", " time.sleep(60)\n", "\n", "print(status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get Error Metrics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our predictor is now **active** and we can see in the console the associated performance (WAPE and RMSE metric computed on the validation dataset). You can click on the predictor name to see more details or to download the backtest results for instance:\n", "\n", "![Datasets](pictures/api-predictor-active.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also collect these metrics through the API:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.get_accuracy_metrics(PredictorArn=predictor_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a Forecast\n", "\n", "Now we can create a forecast using the predictor that was trained. During the **first forecast creation**, Amazon Forecast **retrains** a model on the entire dataset: this time, it doesn't split the training set in train / validation, it uses your whole training data to train a model for every item_id.\n", "\n", "Once the new model is trained, it is hosted (similarly to a SageMaker Endpoint, it provisions the necessary resources to host the trained model so that it can serve predictions) so that it can perform inference. This `CreateForecast` operation creates a forecast for every item (item_id) in the dataset group that was used to train the predictor. After a forecast is created, you can query the forecast or export it to an Amazon S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecastName = project + '_arima_forecast'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " create_forecast_response = forecast.create_forecast(\n", " ForecastName=forecastName,\n", " PredictorArn=predictor_arn\n", " )\n", " forecast_arn = create_forecast_response['ForecastArn']\n", " \n", "except Exception as e:\n", " error_code = e.response['Error']['Code']\n", "\n", " # If the predictor already exists, we get its ARN:\n", " if (error_code == 'ResourceAlreadyExistsException'):\n", " print('A forecast with this name already exists, you can use it to obtain a prediction:')\n", " \n", " # List all the existing predictors:\n", " forecast_fc_list = forecast.list_forecasts()\n", "\n", " # Loop through all the Forecast forecasts:\n", " for fc in forecast_fc_list['Forecasts']:\n", " # Get the project name (the string after the first delimiter in the ARN)\n", " fc_name = fc['ForecastArn'].split('/')[1]\n", "\n", " # Once we find it, we store the ARN and break out of the loop:\n", " if (fc_name == forecastName):\n", " forecast_arn = fc['ForecastArn']\n", " break\n", " \n", " else:\n", " raise\n", " \n", "print(f'- Forecast name: {forecastName}')\n", "print(f'- Forecast ARN: {forecast_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our forecast is now provisioning as we can see in the console:\n", "\n", "![Datasets](pictures/api-forecast-provisioning.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the status of the forecast: when the status change from **CREATE_IN_PROGRESS** to **ACTIVE**, we can continue to next steps. Depending on model size (that depends on algorithm selection) and number of items,it can take 10 mins to more than one hour to be **ACTIVE**. For this model, it will take 15-20 minutes for the **first** forecast:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " status = forecast.describe_forecast(ForecastArn=forecast_arn)['Status']\n", " print(str(pd.to_datetime(datetime.now()))[:19], \"| Forecast generation:\", status)\n", " \n", " if status in ('ACTIVE', 'CREATE_FAILED'): break\n", " time.sleep(60)\n", "\n", "print(status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our forecast has finished training on the whole dataset and is now ready to be queried:\n", "\n", "![Datasets](pictures/api-forecast-active.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Evaluating your forecast\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get a forecast (obtaining a prediction)\n", "Once created, the forecast results are ready to be viewed. To query a forecast, you can specify dates (use the ISO 8601 format for this: `yyyy-MM-dd'T'HH:mm:ss`. For example, `2015-01-01T08:00:00`) or the `query_forecast` will send back the whole forecast horizon. Our forecast horizon is 24 hours, so that will be 24 data points for this example:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client_id = 'client_1'\n", "\n", "print(forecast_arn)\n", "print()\n", "forecastResponse = forecastquery.query_forecast(\n", " ForecastArn=forecast_arn,\n", " Filters={'item_id': client_id}\n", ")\n", "print(forecastResponse)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Getting the actual results\n", "\n", "Earlier we created a file of observed values for both the training and the testing periods, we are now going to select a given date and client_id from that dataframe and are going to plot the actual consumption data for that customer. We need to reduce the data to just the day we wish to plot, which is October, 1st 2014, and we only grab the items for household `client_1`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "actual_train_df = pd.read_csv(f'{PROCESSED_DATA}/train.csv', names=['timestamp','actual','item_id'], parse_dates=[0])\n", "actual_train_df = actual_train_df[(actual_train_df['item_id'] == client_id)]\n", "\n", "testing_start = pd.to_datetime('2014-10-01 00:00:00')\n", "testing_end = pd.to_datetime('2014-10-01 23:00:00')\n", "actual_test_df = pd.read_csv(f'{PROCESSED_DATA}/test.csv', names=['timestamp','actual','item_id'], parse_dates=[0])\n", "actual_train_df = actual_train_df.append(actual_test_df.iloc[0], ignore_index=True)\n", "actual_train_df = actual_train_df.set_index('timestamp')\n", "actual_test_df = actual_test_df.set_index('timestamp')\n", "actual_test_df = actual_test_df[testing_start:testing_end]\n", "actual_test_df = actual_test_df[(actual_test_df['item_id'] == client_id)]\n", "\n", "actual_test_df.tail()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Post-processing the prediction\n", "\n", "Next we need to convert the JSON response from the Predictor to a dataframe that we can plot. The cell below generates a dataframe for p10, p50 and p90. `p50` gives the prediction median while `p90 - p10` gives the 80% confidence interval." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictions_df = dict()\n", "for percentile in ['p10', 'p50', 'p90']:\n", " predictions_df.update({percentile: pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions'][percentile])})\n", " predictions_df[percentile]['Timestamp'] = pd.to_datetime(predictions_df[percentile]['Timestamp'])\n", " predictions_df[percentile].columns = ['Timestamp', percentile]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results_df = pd.merge(actual_test_df.reset_index(), predictions_df['p10'], how='inner', left_on=['timestamp'], right_on='Timestamp')\n", "\n", "for percentile in ['p50', 'p90']:\n", " results_df = pd.merge(results_df, predictions_df[percentile], how='inner', left_on=['timestamp'], right_on='Timestamp')\n", "\n", "results_df = results_df[['timestamp', 'actual', 'p10', 'p50', 'p90']]\n", "results_df = results_df.set_index('timestamp')\n", "results_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Comparing the prediction to actual results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# First, plot the actual data during the evaluation period:\n", "fig = plt.figure(figsize=(24,6))\n", "plt.plot(results_df['actual'], label='Actual data', linewidth=1, color='#AAAAAA', linestyle='--', alpha=0.8)\n", "# plt.plot(results_df['actual'], linewidth=5, color=colors[4], alpha=0.1)\n", "\n", "# Next, plot the prediction enveloppe and the median value predicted:\n", "plt.plot(results_df['p50'], label='Prediction median', color=colors[2], linewidth=1.5)\n", "p10 = results_df['p10']\n", "p90 = results_df['p90']\n", "plt.fill_between(p10.index, p10, p90, color=colors[2], alpha=0.1, label='80% confidence interval')\n", "plt.plot(results_df['p10'], label='Confidence interval lower bound', color=colors[2], linewidth=0.5, alpha=0.5)\n", "\n", "# And then, add the training period for this item_id:\n", "start = pd.to_datetime('2014-09-25 00:00:00')\n", "end = pd.to_datetime('2014-10-02 01:00:00')\n", "plt.plot(actual_train_df.loc[start:end, 'actual'], label='Historical data', linewidth=1, color='#AAAAAA', alpha=0.8)\n", "\n", "# Show the plot with a legend\n", "plt.legend(fontsize=14, ncol=5, loc='upper center')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Conclusion\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## New forecast generation strategy\n", "To get a new forecast: \n", "* Wait 24 hours and update dataset with updated real data. Then, create a new forecast without model retraining.\n", "* Train a predictor with a longer forecast horizon (this retrains a model).\n", "* Create a forecast and update the dataset with the output of this forecast: then, create a new forecast without model retraining." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "### Helper function to delete Forecast resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_till_delete(callback, check_time=5, timeout=None):\n", " elapsed_time = 0\n", " while timeout is None or elapsed_time < timeout:\n", " try:\n", " out = callback()\n", " \n", " except botocore.exceptions.ClientError as e:\n", " # When given the resource not found exception, deletion has occured\n", " if e.response['Error']['Code'] == 'ResourceNotFoundException':\n", " print('Delete successful')\n", " return\n", " \n", " else:\n", " raise\n", " \n", " time.sleep(check_time)\n", " elapsed_time += check_time\n", "\n", " raise TimeoutError('Forecast resource deletion timed-out.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Defining the things to cleanup\n", "\n", "The cells below will delete the items that were created in this notebook one at a time until all items that were created have been removed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Delete the Forecast: ', end='')\n", "wait_till_delete(lambda: forecast.delete_forecast(ForecastArn=forecast_arn))\n", "\n", "print('Delete the Predictor: ', end='')\n", "wait_till_delete(lambda: forecast.delete_predictor(PredictorArn=predictor_arn))\n", "\n", "print('Delete the Import: ', end='')\n", "wait_till_delete(lambda: forecast.delete_dataset_import_job(DatasetImportJobArn=ds_import_job_arn))\n", "\n", "print('Delete the Dataset: ', end='')\n", "wait_till_delete(lambda: forecast.delete_dataset(DatasetArn=datasetArn))\n", "\n", "print('Delete the DatasetGroup: ', end='')\n", "wait_till_delete(lambda: forecast.delete_dataset_group(DatasetGroupArn=datasetGroupArn))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Delete your file in S3\n", "boto3.Session().resource('s3').Bucket(BUCKET).Object(KEY).delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### IAM role and policy cleanup\n", "\n", "The very last step in the notebooks is to remove the policies that were attached to a role and then to delete it. No changes should need to be made here, just execute the cell." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "iam.detach_role_policy(PolicyArn=\"arn:aws:iam::aws:policy/AmazonS3FullAccess\", RoleName=role_name)\n", "iam.detach_role_policy(PolicyArn=\"arn:aws:iam::aws:policy/AmazonForecastFullAccess\", RoleName=role_name)\n", "iam.delete_role(RoleName=role_name)" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }