{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# What-if Analysis with RTS/TTS filling options\n", "\n", "This notebook describes how to perform what-if analysis using two different missing value filling options. As a sample use-case, we use product demand and price data with some missing values, and see how product demand forecast changes if we increase the prices of products.\n", "\n", "Following is the steps:\n", "\n", "1. [Import libraries and setup AWS resources](#Import-libraries-and-setup-AWS-resources)\n", "2. [Prepare training dataset CSVs](#Prepare-training-dataset-CSVs)\n", "3. [Create DatasetGroup and Datasets](#Create-DatasetGroup-and-Datasets)\n", "4. [Import the target time series data, and related time series data](#Import-the-target-time-series-data,-and-related-time-series-data)\n", "5. [Create the first Predictor](#Create-the-first-Predictor)\n", "6. [Create Forecast from the first Predictor](#Create-Forecast-from-the-first-Predictor)\n", "7. [Create 2nd Predictor with different futurefill option](#Create-2nd-Predictor-with-different-futurefill-option)\n", "8. [Create Forecast from the 2nd predictor](#Create-Forecast-from-the-2nd-predictor)\n", "9. [Query forecasts, visualize and compare](#Query-forecasts,-visualize-and-compare)\n", "10. [Resource Cleanup](#Resource-cleanup)\n", "\n", "**Note** : In order to get two versions of forecast with different filling options, this notebook is creating two Predictors, but please note that it is also possible to perform what-if analysis with just one Predictor with multiple imports of related time series dataset." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import libraries and setup AWS resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import os\n", "import time\n", "import datetime\n", "\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "import boto3\n", "\n", "# importing forecast notebook utility from notebooks/common directory\n", "sys.path.insert( 0, os.path.abspath(\"../../common\") )\n", "import util" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Configure the S3 bucket name and region name for this lesson.\n", "\n", "- If you don't have an S3 bucket, create it first on S3.\n", "- Although we have set the region to us-west-2 as a default value below, you can choose any of the regions that the service is available in." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "text_widget_bucket = util.create_text_widget( \"bucket_name\", \"input your S3 bucket name\" )\n", "text_widget_region = util.create_text_widget( \"region\", \"input region name.\", default_value=\"us-west-2\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket_name = text_widget_bucket.value\n", "assert bucket_name, \"bucket_name not set.\"\n", "\n", "region = text_widget_region.value\n", "assert region, \"region not set.\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "session = boto3.Session(region_name=region)\n", "s3 = session.client('s3')\n", "forecast = session.client(service_name='forecast') \n", "forecastquery = session.client(service_name='forecastquery')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the role to provide to Amazon Forecast.\n", "role_name = \"ForecastNotebookRole-WhatIfAnalysis\"\n", "role_arn = util.get_or_create_iam_role( role_name = role_name )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare training dataset CSVs\n", " \n", "1. Load historical product demand data\n", "2. Check the loaded data, and confirm missing values\n", "3. Split into target time series (demand) and related time series (price)\n", "4. Upload them onto S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv( \"./data/product_demand_with_nan.csv\" )\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Try to visualize TTS/RTS.\n", "# You can see gap in the lines (missing values)\n", "df[ df[\"item_id\"]==\"item_001\"].plot( x=\"timestamp\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[df[\"item_id\"]==\"item_001\" ]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check how many missing values exist in the data\n", "df.isna().sum()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_tts = df[[\"item_id\", \"timestamp\", \"demand\" ]]\n", "df_rts = df[[\"item_id\", \"timestamp\", \"price\" ]]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_tts.to_csv( \"./data/tts.csv\", index=False )\n", "df_rts.to_csv( \"./data/rts.csv\", index=False )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Upload to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "version = '0'\n", "project = \"whatif_and_filling\"+\"_\"+version\n", "\n", "key_tts = \"%s/tts.csv\" % project\n", "key_rts = \"%s/rts.csv\" % project\n", "\n", "s3.upload_file( Filename=\"./data/tts.csv\", Bucket=bucket_name, Key=key_tts )\n", "s3.upload_file( Filename=\"./data/rts.csv\", Bucket=bucket_name, Key=key_rts )\n", "\n", "s3_data_path_tts = \"s3://\" + bucket_name + \"/\" + key_tts\n", "s3_data_path_rts = \"s3://\" + bucket_name + \"/\" + key_rts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create DatasetGroup and Datasets\n", " \n", "Creating single set of DatasetGroup, Datasets. Please note that we don't have to create two RELATED_TIME_SERIES datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_dataset_group(\n", " DatasetGroupName = project + \"_dsg\",\n", " Domain=\"RETAIL\",\n", " )\n", "\n", "dataset_group_arn = response['DatasetGroupArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATASET_FREQUENCY = \"M\"\n", "TIMESTAMP_FORMAT = \"yyyy-MM-dd\"\n", "\n", "schema ={\n", " \"Attributes\":[\n", " {\n", " \"AttributeName\":\"item_id\",\n", " \"AttributeType\":\"string\"\n", " },\n", " {\n", " \"AttributeName\":\"timestamp\",\n", " \"AttributeType\":\"timestamp\"\n", " },\n", " {\n", " \"AttributeName\":\"demand\",\n", " \"AttributeType\":\"float\"\n", " },\n", " ]\n", "}\n", "\n", "response = forecast.create_dataset(\n", " Domain = \"RETAIL\",\n", " DatasetType = 'TARGET_TIME_SERIES',\n", " DatasetName = project + \"_tts\",\n", " DataFrequency = DATASET_FREQUENCY, \n", " Schema = schema\n", ")\n", "\n", "tts_dataset_arn = response['DatasetArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "schema ={\n", " \"Attributes\":[\n", " {\n", " \"AttributeName\":\"item_id\",\n", " \"AttributeType\":\"string\"\n", " },\n", " {\n", " \"AttributeName\":\"timestamp\",\n", " \"AttributeType\":\"timestamp\"\n", " },\n", " {\n", " \"AttributeName\":\"price\",\n", " \"AttributeType\":\"float\"\n", " },\n", " ]\n", "}\n", "\n", "response = forecast.create_dataset(\n", " Domain = \"RETAIL\",\n", " DatasetType = 'RELATED_TIME_SERIES',\n", " DatasetName = project + \"_rts\",\n", " DataFrequency = DATASET_FREQUENCY, \n", " Schema = schema\n", ")\n", "\n", "rts_dataset_arn = response['DatasetArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.update_dataset_group( \n", " DatasetGroupArn = dataset_group_arn, \n", " DatasetArns = [\n", " tts_dataset_arn,\n", " rts_dataset_arn,\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import the target time series data, and related time series data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_dataset_import_job(\n", " DatasetImportJobName = project + \"_tts_import\",\n", " DatasetArn = tts_dataset_arn,\n", " DataSource = {\n", " \"S3Config\" : {\n", " \"Path\" : s3_data_path_tts,\n", " \"RoleArn\" : role_arn\n", " }\n", " },\n", " TimestampFormat = TIMESTAMP_FORMAT\n", ")\n", "\n", "tts_dataset_import_job_arn = response['DatasetImportJobArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_dataset_import_job(\n", " DatasetImportJobName = project + \"_rts_import1\",\n", " DatasetArn = rts_dataset_arn,\n", " DataSource = {\n", " \"S3Config\" : {\n", " \"Path\" : s3_data_path_rts,\n", " \"RoleArn\" : role_arn\n", " }\n", " },\n", " TimestampFormat = TIMESTAMP_FORMAT\n", ")\n", "\n", "rts_dataset_import_job_arn = response['DatasetImportJobArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status_indicator = util.StatusIndicator()\n", "\n", "while True:\n", " status = forecast.describe_dataset_import_job( DatasetImportJobArn = tts_dataset_import_job_arn )['Status']\n", " status_indicator.update(status)\n", " if status in ('ACTIVE', 'CREATE_FAILED'): break\n", " time.sleep(10)\n", "\n", "status_indicator.end()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status_indicator = util.StatusIndicator()\n", "\n", "while True:\n", " status = forecast.describe_dataset_import_job( DatasetImportJobArn = rts_dataset_import_job_arn )['Status']\n", " status_indicator.update(status)\n", " if status in ('ACTIVE', 'CREATE_FAILED'): break\n", " time.sleep(10)\n", "\n", "status_indicator.end()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create the first Predictor\n", "\n", "Creating the 1st Predictor using futurefill option \"min\"." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PREDICTOR_NAME = project + \"_predictor_1\"\n", "FORECAST_HORIZON = 3\n", "config = [\n", " {\n", " # for Target time series\n", " \"AttributeName\": \"demand\",\n", " \"Transformations\": {\"frontfill\": \"none\",\"middlefill\": \"mean\", \"backfill\": \"mean\"}\n", " },\n", " {\n", " # for Related time series\n", " \"AttributeName\": \"price\",\n", " \"Transformations\": {\"middlefill\": \"mean\", \"backfill\": \"mean\",\"futurefill\": \"min\"}\n", " }\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = \\\n", " forecast.create_auto_predictor(PredictorName = PREDICTOR_NAME,\n", " ForecastHorizon = FORECAST_HORIZON,\n", " ForecastFrequency = DATASET_FREQUENCY,\n", " DataConfig = {\n", " 'DatasetGroupArn': dataset_group_arn, \n", " 'AttributeConfigs':config\n", " })" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_arn_1 = response['PredictorArn']\n", "print(f\"Waiting for Predictor with ARN {predictor_arn_1} to become ACTIVE. Depending on data size and predictor setting,it can take several hours to be ACTIVE.\\n\\nCurrent Status:\")\n", "status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn_1))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.describe_auto_predictor(PredictorArn=predictor_arn_1)\n", "print(f\"\\n\\nThe Predictor with ARN {predictor_arn_1} is now {response['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Forecast from the first Predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_forecast(\n", " ForecastName = project + \"_forecast_1\",\n", " PredictorArn = predictor_arn_1\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast_arn_1 = response['ForecastArn']\n", "print(f\"Waiting for Forecast with ARN {forecast_arn_1} to become ACTIVE. Depending on data size and predictor settings,it can take several hours to be ACTIVE.\\n\\nCurrent Status:\")\n", "status = util.wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn_1))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.describe_forecast(ForecastArn=forecast_arn_1)\n", "print(f\"\\n\\nThe Forecast with ARN {forecast_arn_1} is now {response['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create 2nd Predictor with different futurefill option\n", "\n", "Creating the 2nd Predictor using futurefill option \"max\"." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PREDICTOR_NAME = project + \"_predictor_2\"\n", "FORECAST_HORIZON = 3\n", "config = [\n", " {\n", " # for Target time series\n", " \"AttributeName\": \"demand\",\n", " \"Transformations\": {\"frontfill\": \"none\",\"middlefill\": \"mean\", \"backfill\": \"mean\"}\n", " },\n", " {\n", " # for Related time series\n", " \"AttributeName\": \"price\",\n", " \"Transformations\": {\"middlefill\": \"mean\", \"backfill\": \"mean\",\"futurefill\": \"max\"} #we use \"max\" for futurefill option\n", "\n", " }\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_auto_predictor(PredictorName = PREDICTOR_NAME,\n", " ForecastHorizon = FORECAST_HORIZON,\n", " ForecastFrequency = DATASET_FREQUENCY,\n", " DataConfig = {\n", " 'DatasetGroupArn': dataset_group_arn, \n", " 'AttributeConfigs':config\n", " })" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_arn_2 = response['PredictorArn']\n", "print(f\"Waiting for Predictor with ARN {predictor_arn_2} to become ACTIVE. Depending on data size and predictor setting,it can take several hours to be ACTIVE.\\n\\nCurrent Status:\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn_2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.describe_auto_predictor(PredictorArn=predictor_arn_2)\n", "print(f\"\\n\\nThe Predictor with ARN {predictor_arn_2} is now {response['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Forecast from the 2nd predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_forecast(\n", " ForecastName = project + \"_forecast_2\",\n", " PredictorArn = predictor_arn_2\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast_arn_2 = response['ForecastArn']\n", "print(f\"Waiting for Forecast with ARN {forecast_arn_2} to become ACTIVE. Depending on data size and predictor settings,it can take several hours to be ACTIVE.\\n\\nCurrent Status:\")\n", "status = util.wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn_2))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.describe_forecast(ForecastArn=forecast_arn_2)\n", "print(f\"\\n\\nThe Forecast with ARN {forecast_arn_2} is now {response['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query forecasts, visualize and compare\n", "\n", "So far we got two Forecasts for different futurefill options (min vs max). Let's get the forecasted product demands, visualize, and compare." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_data_period = ( df_tts[\"timestamp\"].min(), df_tts[\"timestamp\"].max() )\n", "\n", "def plot_compare( item_id ):\n", " plt.figure(figsize=(12, 6))\n", " plt.title(item_id)\n", " \n", " df_item_actual = df_tts[ df_tts[\"item_id\"]==item_id ]\n", " plt.plot( pd.to_datetime(df_item_actual[\"timestamp\"]), df_item_actual[\"demand\"], label=\"actual\", color=(1,0,0) )\n", "\n", " def plot_forecast( single_item_forecast, label, color, hatch ):\n", "\n", " x = []\n", " y_p10 = []\n", " y_p50 = []\n", " y_p90 = []\n", "\n", " # visually connect last actual value with forecasts\n", " df_connect = df_item_actual[ df_item_actual[\"timestamp\"]==training_data_period[1] ].reset_index(drop=True)\n", " x.append( datetime.datetime.strptime( df_connect.at[ 0, \"timestamp\" ], \"%Y-%m-%d\" ) )\n", " y_p10.append( df_connect.at[0,\"demand\"] )\n", " y_p50.append( df_connect.at[0,\"demand\"] )\n", " y_p90.append( df_connect.at[0,\"demand\"] )\n", "\n", " for p10, p50, p90 in zip( single_item_forecast[\"p10\"], single_item_forecast[\"p50\"], single_item_forecast[\"p90\"] ):\n", "\n", " date = datetime.datetime.strptime(p50[\"Timestamp\"],\"%Y-%m-%dT00:00:00\").date()\n", " x.append(date)\n", "\n", " y_p10.append(p10[\"Value\"])\n", " y_p50.append(p50[\"Value\"])\n", " y_p90.append(p90[\"Value\"])\n", "\n", " plt.plot( x, y_p50, label=\"%s p50\" % label, color=color )\n", " plt.fill_between( x, y_p10, y_p90, label=\"%s p10-p90\" % label, color=color, alpha=0.2, hatch=hatch )\n", "\n", " def plot_price( single_item_price, label, color ):\n", " x = []\n", " y = []\n", " \n", " for timestamp, price in zip( single_item_price[\"timestamp\"], single_item_price[\"price\"] ):\n", " date = datetime.datetime.strptime(timestamp,\"%Y-%m-%d\").date()\n", " x.append(date)\n", " y.append(price)\n", "\n", " plt.plot( x, y, label=label, color=color, linestyle=\":\" )\n", " \n", " response = forecastquery.query_forecast(\n", " ForecastArn = forecast_arn_1,\n", " Filters = { \"item_id\" : item_id }\n", " )\n", " plot_forecast( response[\"Forecast\"][\"Predictions\"], \"forecast1\", (0,0,1), \"+\" )\n", "\n", " response = forecastquery.query_forecast(\n", " ForecastArn = forecast_arn_2,\n", " Filters = { \"item_id\" : item_id }\n", " )\n", " plot_forecast( response[\"Forecast\"][\"Predictions\"], \"forecast2\", (1,0,1), \"x\" )\n", "\n", " bottom, top = plt.ylim()\n", " plt.ylim((-top*0.03, top*1.03))\n", "\n", " plt.legend( loc='lower left' )\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for item_id in [ \"item_132\",\"item_151\",\"item_234\" ]:\n", " plot_compare(item_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Resource cleanup\n", "\n", "#### For clean-up, this should be uncommented." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete forecasts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.wait_till_delete(lambda: forecast.delete_forecast(ForecastArn = forecast_arn_1))\n", "# util.wait_till_delete(lambda: forecast.delete_forecast(ForecastArn = forecast_arn_2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn = predictor_arn_1))\n", "# util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn = predictor_arn_2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete dataset import jobs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.wait_till_delete(lambda: forecast.delete_dataset_import_job(DatasetImportJobArn = tts_dataset_import_job_arn))\n", "# util.wait_till_delete(lambda: forecast.delete_dataset_import_job(DatasetImportJobArn = rts_dataset_import_job_arn))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete datasets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.wait_till_delete(lambda: forecast.delete_dataset(DatasetArn = tts_dataset_arn))\n", "# util.wait_till_delete(lambda: forecast.delete_dataset(DatasetArn = rts_dataset_arn))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete dataset group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.wait_till_delete(lambda: forecast.delete_dataset_group(DatasetGroupArn = dataset_group_arn))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete IAM role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# util.delete_iam_role( role_name )" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 2 }