{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### NY Taxi Train Amazon Forecast with Weather Index\n", "\n", "Our goal is to predict the number of NY City yellow taxi pickups in the next 7 days for each of 260 pickup zones.
\n", "\n", "To do this, we will use Amazon Forecast with 1 hour frequency and 7 day forecast horizon. For the demo, we'll use 8 months of historical data for training, and we will use the built-in Weather Index feature of Amazon Forecast that reads in 14-day weather predictions as related data.
\n", "\n", "\n", "\n", "This notebook covers inputting data into Amazon Forecast and training a model using the Weather Index. \n", "
  • See blog announcement how to use weather as built-in related time series to forecasting.
  • \n", "\n", "\n", "When you're ready, take a look at our Automation Solution that uses S3 trigger, AWS Lambda, Step Functions and Amazon QuickSight for BI visualization to integrate Amazon Forecast with upstream/downstream Data applications. Once you've run through a notebook once or twice, you'll probably decide you don't want to call Forecast APIs anymore via notebook! More efficient use of your time is to call APIs from automation. \n", "
  • See our automation solution Improving Forecast Accuracy
  • \n", "\n", "\n", "
    \n", "\n", "\n", "# Table of Contents\n", "\n", "* Step 0: [Setting up](#setup)\n", "* Step 1: [Read raw data and visualize](#read)\n", "* Step 2: [Importing the Data into Forecast](#import)\n", " * Step 2a: [Creating a Dataset Group](#create)\n", " * Step 2b: [Creating a Target Dataset](#target)\n", " * Step 2c: [Update the Dataset Group](#update)\n", " * Step 2d: [Creating a Target Time Series Dataset Import Job](#targetImport)\n", "* Step 3: [Train a predictor](#train)\n", "* Step 4: [Export backtest files](#backtest)\n", "* Step 5: [Visualize accuracy of predictors with and without weather](#visualize)\n", "* Step 6: [Cleaning up your Resources](#cleanup)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# First let us setup Amazon Forecast\n", "\n", "This section sets up the permissions and relevant endpoints." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import os\n", "import shutil\n", "import datetime\n", "\n", "import pandas as pd\n", "print('pandas: {}'.format(pd.__version__))\n", "import numpy as np\n", "print('numpy: {}'.format(np.__version__))\n", "\n", "# get region from boto3\n", "import boto3\n", "REGION = boto3.Session().region_name\n", "\n", "# importing forecast notebook utility from notebooks/common directory\n", "sys.path.insert( 0, os.path.abspath(\"../../common\") )\n", "import util\n", "\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline \n", "plt.rcParams['figure.figsize'] = (15.0, 5.0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#########\n", "# Function to concat .part files in predictor backtest file export folder\n", "#########\n", "\n", "def read_backtest_predictions(BUCKET_NAME, s3_path):\n", " \"\"\"Read predictor backtest predictions export files\n", " Inputs: \n", " BUCKET_NAME = S3 bucket name\n", " s3_path = S3 path to Predictor.part files\n", " , everything after \"s3://BUCKET_NAME/\" in S3 URI path to your .part files\n", " Return: Pandas dataframe with all .part files concatenated row-wise\n", " \"\"\"\n", " # set s3 path\n", " s3 = boto3.resource('s3')\n", " s3_bucket = boto3.resource('s3').Bucket(BUCKET_NAME)\n", " s3_depth = s3_path.split(\"/\")\n", " s3_depth = len(s3_depth) - 1\n", " \n", " # set local path\n", " local_write_path = \"backtest_exports\"\n", " if (os.path.exists(local_write_path) and os.path.isdir(local_write_path)):\n", " shutil.rmtree('backtest_exports')\n", " if not(os.path.exists(local_write_path) and os.path.isdir(local_write_path)):\n", " os.makedirs(local_write_path)\n", " \n", " # concat part files\n", " part_filename = \"\"\n", " part_files = list(s3_bucket.objects.filter(Prefix=s3_path))\n", " print(f\"Number .part files found: {len(part_files)}\")\n", " for file in part_files:\n", " # There will be a collection of CSVs if the forecast is large, modify this to go get them all\n", " if \"csv\" in file.key:\n", " part_filename = file.key.split('/')[s3_depth]\n", " window_object = s3.Object(BUCKET_NAME, file.key)\n", " file_size = window_object.content_length\n", " if file_size > 0:\n", " s3.Bucket(BUCKET_NAME).download_file(file.key, local_write_path+\"/\"+part_filename)\n", " \n", " # Read from local dir and combine all the part files\n", " temp_dfs = []\n", " for entry in os.listdir(local_write_path):\n", " if os.path.isfile(os.path.join(local_write_path, entry)):\n", " df = pd.read_csv(os.path.join(local_write_path, entry), index_col=None, header=0)\n", " temp_dfs.append(df)\n", "\n", " # Return assembled .part files as pandas Dataframe\n", " fcst_df = pd.concat(temp_dfs, axis=0, ignore_index=True, sort=False)\n", " return fcst_df\n", "\n", "\n", "#########\n", "# Functions to classify items as \"top movers\" or not\n", "#########\n", "\n", "def get_velocity_per_item(df, timestamp_col, item_id_col=\"item_id\"):\n", " \"\"\"Calculate item velocity as item demand per hour. \n", " \"\"\"\n", " df[timestamp_col] = pd.to_datetime(df[timestamp_col], format='%Y-%m-%d %H:%M:%S')\n", " \n", " max_time_df = df.groupby([item_id_col], as_index=False).max()[[item_id_col, timestamp_col]]\n", " max_time_df.columns = [item_id_col, 'max_time']\n", " \n", " min_time_df = df.groupby([item_id_col], as_index=False).min()[[item_id_col, timestamp_col]]\n", " min_time_df.columns = [item_id_col, 'min_time']\n", " \n", " df = df.merge(right=max_time_df, on=item_id_col)\n", " df = df.merge(right=min_time_df, on=item_id_col)\n", " \n", " df['time_span'] = df['max_time'] - df['min_time']\n", " df['time_span'] = df['time_span'].apply(lambda x: x.seconds / 3600 + 1) # add 1 to include start datetime and end datetime\n", " df = df.groupby([item_id_col], as_index=False).agg({'time_span':'mean', 'target_value':'sum'})\n", " df['velocity'] = df['target_value'] / df['time_span']\n", " return df\n", "\n", "\n", "def get_top_moving_items(gt_df\n", " , timestamp_col\n", " , target_value_col\n", " , item_id_col=\"item_id\"):\n", " \"\"\"Calculate mean velocity over all items as \"criteria\".\n", " Assign each item into category \"top\" or not depending on whether its velocity > criteria.\n", " \"\"\"\n", " gt_df_velocity = gt_df[[item_id_col, timestamp_col, target_value_col]].copy().reset_index(drop=True)\n", " gt_df_velocity = get_velocity_per_item(gt_df_velocity, timestamp_col, item_id_col)\n", " criteria = gt_df_velocity['velocity'].mean()\n", " gt_df_velocity['top_moving'] = gt_df_velocity['velocity'] > criteria\n", " print('average velocity of all items:', criteria)\n", " \n", " top_moving_items = gt_df_velocity[gt_df_velocity['top_moving'] == True][item_id_col].to_list()\n", " slow_moving_items = gt_df_velocity[gt_df_velocity['top_moving'] == False][item_id_col].to_list()\n", " return top_moving_items, slow_moving_items\n", "\n", "\n", "###########\n", "# Functions to calculate metrics\n", "###########\n", "\n", "def truncate_negatives_to_zero(the_df, target_value_col, quantile_cols):\n", " \"\"\"In case you are expecting positive numbers for actuals and predictions,\n", " round negative values up to zero.\n", " \n", " Be careful that this is acceptable treatment of negatives for your use case.\n", " \"\"\"\n", " \n", " df = the_df.copy()\n", " \n", " for q in quantile_cols:\n", " num_neg_predictions = df[q].lt(0).sum()\n", " print(f\"Num negative {q} predictors: {num_neg_predictions}\")\n", "\n", " # replace\n", " df[q] = df[q].mask(df[q] < 0, 0)\n", "\n", " # check you did the right thing\n", " num_neg_predictions = df[q].lt(0).sum()\n", " print(f\"Num negative {q} predictors: {num_neg_predictions}\")\n", "\n", " # truncate negative actuals\n", " num_neg_actuals = df[target_value_col].lt(0).sum()\n", " print(f\"Num negative actuals: {num_neg_actuals}\")\n", "\n", " # replace\n", " df[target_value_col] = df[target_value_col].mask(df[target_value_col] < 0, 0)\n", "\n", " # check you did the right thing\n", " num_neg_actuals = df[target_value_col].lt(0).sum()\n", " print(f\"Num negative actuals: {num_neg_actuals}\")\n", " \n", " return df\n", "\n", "\n", "def calc_quantile_loss(actual, pred, quantile):\n", " \"\"\"Calculate weighted quantile loss for a specific quantile and window\n", " Input: single numbers for actual and forecast\n", " Output: wql = floating point number\n", " \"\"\"\n", " denom = sum(np.abs(actual))\n", " num = sum([(1-quantile) * abs(y_hat-y) if y_hat > y\n", " else quantile * abs(y_hat-y) for y_hat, y in zip(pred, actual)])\n", " if denom != 0:\n", " return 2 * num / denom\n", " else:\n", " return None\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Configure the S3 bucket name and region name for this lesson.\n", "\n", "- If you don't have an S3 bucket, create it first on S3.\n", "- Although we have set the region to us-west-2 as a default value below, you can choose any of the regions that the service is available in." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Get user inputs for S3 bucket name and region\n", "\n", "default_bucket = \"taxi-demo\" #default taxi-demo\n", "bucket_name = input(\"S3 bucket name [enter to accept default]: \") or default_bucket\n", "default_region = REGION\n", "REGION = input(f\"region [enter to accept default]: {default_region} \") or default_region " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# Connect API session\n", "session = boto3.Session(region_name=REGION) \n", "forecast = session.client(service_name='forecast') \n", "#forecast_query = session.client(service_name='forecastquery') #not used in this notebook\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the role to provide to Amazon Forecast.\n", "role_name = \"ForecastNotebookRole-Weather\"\n", "print(f\"Creating Role {role_name} ...\")\n", "default_role = util.get_or_create_iam_role( role_name = role_name )\n", "role_arn = default_role\n", "\n", "## echo user inputs without account\n", "print(f\"Success! Created role arn = {role_arn.split('/')[1]}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 1: Read raw data and visualize" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_raw = pd.read_csv(\"data/TTS.csv\")\n", "print(df_raw.shape)\n", "\n", "# correct data types\n", "df_raw.item_id = df_raw.item_id.astype(str)\n", "df_raw.timestamp = pd.to_datetime(df_raw.timestamp\n", " , format=\"%Y-%m-%d %H:%M:%S\", errors='coerce')\n", "print(df_raw.dtypes)\n", "\n", "# check size and number items\n", "print(f\"num items: {df_raw.item_id.nunique()}\")\n", "start_time = df_raw.timestamp.min()\n", "end_time = df_raw.timestamp.max()\n", "print(f\"start time: {start_time}\")\n", "print(f\"end time: {end_time}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us plot overall time series first." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_raw.plot(x='timestamp', y='actual_value', figsize=(15, 8))\n", "\n", "# hmm, according to news, there was a cold snap and tornado on Nov 3rd:\n", "# https://www.nbcnewyork.com/news/local/overnight-temperatures-to-dip-into-30s-snow-potential-friday/2084847/\n", "# https://newyork.cbslocal.com/2019/11/03/national-weather-service-confirmed-tornado-new-jersey/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's drill down into a few time series" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_raw.loc[(df_raw.actual_value >= 1400), :]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# out of curiosity, we can lookup locations 79 and 148 \n", "\n", "locations = pd.read_csv(\"data/taxi_zones.csv\")\n", "locations.LocationID = locations.LocationID.astype(str)\n", "locations.loc[(locations.LocationID.isin(['141', '151', '79', '148'])), :]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# assemble data for plotting actual values\n", "\n", "random_items = ['141', '151', '79', '148']\n", "df_plot = df_raw.loc[(df_raw.item_id.isin(random_items)), ['item_id', 'timestamp', 'actual_value']].copy()\n", "df_plot = df_plot.merge(locations[['LocationID', 'zone']], how=\"left\"\n", " , left_on=\"item_id\", right_on=\"LocationID\")\n", "df_plot.set_index('timestamp', inplace=True)\n", "df_plot.head(2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "fig, axs = plt.subplots(len(random_items), 1, figsize=(15, 15), sharex=True)\n", "\n", "# select backtest window range when there was a weather event\n", "zoomed = df_plot.copy()\n", "zoomed = zoomed.loc[\"2019-10-28\":\"2020-01-01\"].copy()\n", "\n", "\n", "for i in range(len(random_items)):\n", " \n", " item = random_items[i]\n", " zoomed2 = zoomed.loc[(zoomed['item_id']==item), :]\n", " zone = zoomed2.zone[0]\n", "\n", " zoomed2[['actual_value']].plot(ax=axs[i])\n", " \n", " axs[i].set_title(f\"{zone}\") \n", " fig.text(0.04, 0.5, 'Hourly demand', va='center', rotation='vertical')\n", " \n", " # format the x ticks\n", " axs[i].set_xlabel(\"Datetime\") \n", " \n", " # format the grid\n", " axs[i].grid(False)\n", " axs[i].grid(which='major', axis='x')\n", "\n", "plt.plot();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Above, we can see the regular effects of weekends. For irregular effects, we can see Thanksgiving, Christmas, and New Years holidays. Looking at the Nov 3rd spike, it appears another irregular effect was weather, since google search on Nov3, 2019 turns up no special events except a large winter storm. \n", "\n", "Conveniently, both weather and holidays are built into Amazon Forecast for you to choose if you think they are applicable to your use case and could improve your forecast accuracy. Amazon Forecast does the work of gathering the data, featurizing holidays and weather data, and loading into Related Time Series. As long as forecast horizon is under 2 weeks away, we also take care of the overhead customers usually struggle with, keeping Related Time Series a forecast horizon ahead of the Target Time Series, since we use 14-day weather forecasts. \n", "\n", "For an intro to Amazon Forecast, see our \n", "
  • Documentation page.
  • \n", "\n", "
    \n", "
    " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 2. Importing the Data\n", "\n", "Now we are ready to import the datasets into the Forecast service. As an example, a retailer normally records the transaction record such as timestamp, item_id, target_value.\n", "\n", "
    \n", " \n", "In order to use Weather Index, you need to add to the above, a geolocation-type column. The geolocation-type column connects your location column with geolocations either through postal codes or with latitude_longitude and Timezone. For more details, see:\n", "
    \n", " \n", "For an example of a geolocation column that uses Lat_Lon data, run the cell below and look at the Target Time Series included in the data folder of this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## View the geolocation column \"location\"\n", "df_raw.sample(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Define a dataset group name and version number for naming purposes\n", "project = \"nyctaxi_demo\"\n", "idx = 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2a. Creating a Dataset Group\n", "First let's create a dataset group and then update it later to add our datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below, we specify key input data and forecast parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_group = f\"{project}_{idx}\"\n", "dataset_arns = []\n", "create_dataset_group_response = forecast.create_dataset_group(\n", " Domain=\"CUSTOM\",\n", " DatasetGroupName=dataset_group,\n", " DatasetArns=dataset_arns)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below, we specify key input data and forecast parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "freq = \"H\"\n", "forecast_horizon = 168\n", "timestamp_format = \"yyyy-MM-dd HH:mm:ss\"\n", "delimiter = ','" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f'Creating dataset group {dataset_group}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_group_arn = create_dataset_group_response['DatasetGroupArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2b. Creating a Target Dataset\n", "In this example, we will define a target time series. This is a required dataset to use the service." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below we specify the target time series name af_demo_ts_4." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_dataset_name = f\"{project}_tts_{idx}\"\n", "print(ts_dataset_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we specify the schema of our dataset below. Make sure the order of the attributes (columns) matches the raw data in the files. We follow the same three attribute format as the above example." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_schema_val = [{\"AttributeName\": \"timestamp\", \"AttributeType\": \"timestamp\"},\n", " {\"AttributeName\": \"item_id\", \"AttributeType\": \"string\"},\n", " {\"AttributeName\": \"target_value\", \"AttributeType\": \"float\"},\n", " {\"AttributeName\": \"location\", \"AttributeType\": \"geolocation\"} ]\n", "ts_schema = {\"Attributes\": ts_schema_val}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f'Creating target dataset {ts_dataset_name}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_dataset(\n", " Domain=\"CUSTOM\",\n", " DatasetType='TARGET_TIME_SERIES',\n", " DatasetName=ts_dataset_name,\n", " DataFrequency=freq,\n", " Schema=ts_schema\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_dataset_arn = response['DatasetArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset(DatasetArn=ts_dataset_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2c. Updating the dataset group with the datasets we created\n", "You can have multiple datasets under the same dataset group. Update it with the datasets we created before." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# verify status 200\n", "dataset_arns = []\n", "dataset_arns.append(ts_dataset_arn)\n", "forecast.update_dataset_group(DatasetGroupArn=dataset_group_arn, DatasetArns=dataset_arns)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2d. Creating a Target Time Series Dataset Import Job\n", " \n", "Below, we save the Target Time Series to your bucket on S3, since Amazon Forecast expects to be able to import the data from S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Save FILE to S3\n", "local_file = \"data/TTS.csv\"\n", "\n", "# Save local file to S3\n", "key = f\"{project}/data/TTS.csv\"\n", "boto3.Session().resource('s3').Bucket(bucket_name).Object(key).upload_file(local_file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_s3_data_path = f\"s3://{bucket_name}/{project}/data/TTS.csv\"\n", "print(ts_s3_data_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_dataset_import_job_response = forecast.create_dataset_import_job(\n", " DatasetImportJobName=dataset_group,\n", " DatasetArn=ts_dataset_arn,\n", " DataSource= {\n", " \"S3Config\" : {\n", " \"Path\": ts_s3_data_path,\n", " \"RoleArn\": role_arn\n", " } \n", " },\n", " TimestampFormat=timestamp_format,\n", " GeolocationFormat=\"LAT_LONG\",\n", " TimeZone=\"America/New_York\"\n", " )\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts_dataset_import_job_arn=ts_dataset_import_job_response['DatasetImportJobArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn))\n", "assert status" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3. Train a predictor with related time series Weather \n", " \n", "Once the datasets are specified with the corresponding schema, Amazon Forecast will automatically aggregate all the relevant pieces of information for each item, such as holidays, weather, historical sales (or historical target_value), price, promotions, as well as categorical attributes, and generate the desired dataset. \n", "
    \n", "\n", "In addition to choosing an algorithm, you can also choose from among two built-in Supplementary Features - Holidays and Weather. \n", " \n", "\n", "
    \n", "\n", "\n", " \n", "Note: If you decide to create your own Related Time Series that utilizes geolocation, you will need to make sure you include the geolocation-type column in the Related Time Series data before you upload it to Amazon forecast." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3a. Train a model without Weather\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Quantiles for which you want forecasts generated\n", "forecast_types = [\"0.50\", \"0.60\", \"0.70\"]\n", "\n", "WEATHER_DATASET = [{\n", " 'Name': 'weather',\n", " 'Configuration': {\n", " 'Value': ['true']\n", " }\n", "}]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_name_no_weather = f'no_weather_predictor_{idx}'\n", "\n", "print(f'[{predictor_name_no_weather}] Creating predictor {predictor_name_no_weather} ...')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = forecast.create_auto_predictor(PredictorName = predictor_name_no_weather,\n", " ForecastHorizon = forecast_horizon,\n", " ForecastFrequency = freq,\n", " ForecastTypes = forecast_types,\n", " DataConfig = {\n", " 'DatasetGroupArn': dataset_group_arn\n", " }\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_arn_no_weather = response['PredictorArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn_no_weather))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_response_no_weather = forecast.describe_auto_predictor(PredictorArn=predictor_arn_no_weather)\n", "print(f\"\\n\\nThe Predictor with ARN {predictor_arn_no_weather} is now {describe_response_no_weather['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3b. Train a model with Weather" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_name_weather = f'weather_predictor_{idx}'\n", "\n", "print(f'[{predictor_name_weather}] Creating predictor {predictor_name_weather} ...')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response= \\\n", " forecast.create_auto_predictor(PredictorName = predictor_name_weather,\n", " ForecastHorizon = forecast_horizon,\n", " ForecastFrequency = freq,\n", " ForecastTypes = forecast_types,\n", " DataConfig = {\n", " 'DatasetGroupArn': dataset_group_arn, \n", " 'AdditionalDatasets':WEATHER_DATASET\n", " })" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor_arn_weather = response['PredictorArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn_weather))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_predictor_weather = forecast.describe_auto_predictor(PredictorArn=predictor_arn_weather)\n", "print(f\"\\n\\nThe Predictor with ARN {predictor_arn_weather} is now {describe_predictor_weather['Status']}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 4. Export Backtest Predictions
    " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After creating the predictors, we can export the Backtest Predictions to evaluate the performance of the algorithm, using custom item-level metrics if desired. \"Backtesting\" is a cross-validation technique for time series that uses multiple train/test splits that keep time order of the data. Using multiple train-test splits (i.e. more than 1 backtest window) will result in more models being trained, and in turn, a more robust estimate how the model will perform on unseen data. \n", "
    \n", "
    \n", "Inspecting backtest predictions is the step \"Inspect ML model\" which comes between steps Train ML model and Host ML model, see overview image at beginning of this notebook. \n", "\n", "\n", "\n", "
  • More details about backtesting on the Amazon Forecast documentation page.
  • \n", "
  • Example notebook that uses raw item-level backtest forecasts to calculate custom ML model metrics.
  • \n", "\n", "\n", "In the next cell, we use your Predictor name and export backtest files to your S3 bucket.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Export No Weather Predictor backtest files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# No Weather Predictor, decide automatically where to save exported files\n", "datetime_string = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M\")\n", "\n", "default_base_export_path = \\\n", " f's3://{bucket_name}/{project}/backtest_exports/{predictor_arn_no_weather}_{datetime_string}/'\n", "print(f\"exporting to: {default_base_export_path}\")\n", "\n", "no_weather_export_path = default_base_export_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "## Call CreatePredictorBacktestExportJob using predictor Arn and S3 export path\n", "\n", "backtestExportJobName = predictor_name_no_weather\n", "backtest_export_job_response = forecast.create_predictor_backtest_export_job(\n", " PredictorBacktestExportJobName=backtestExportJobName,\n", " PredictorArn=predictor_arn_no_weather,\n", " Destination= {\n", " \"S3Config\" : {\n", " \"Path\":no_weather_export_path,\n", " \"RoleArn\": role_arn\n", " } \n", " })" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check for HTTPStatusCode 200\n", "backtest_export_job_arn_no_weather = backtest_export_job_response['PredictorBacktestExportJobArn']\n", "backtest_export_job_response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(\n", " lambda: forecast.describe_predictor_backtest_export_job(\n", " PredictorBacktestExportJobArn=backtest_export_job_arn_no_weather))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## No Weather Predictor, decide automatically where to save backtest forecast files\n", "\n", "# path to files is everything after BUCKET_NAME/, it should end in \"/\"\n", "s3_path_to_files = no_weather_export_path.split(bucket_name)[1][1:]\n", "s3_path_to_files = s3_path_to_files + \"forecasted-values/\"\n", "print(f\"path to files: {s3_path_to_files}\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "## READ THE No Weather BACKTEST FORECASTS FILE\n", "\n", "# Assemble and read no weather predictor backtest forecasts\n", "df_no_weather = read_backtest_predictions(bucket_name, s3_path_to_files)\n", "\n", "# correct data types\n", "df_no_weather.item_id = df_no_weather.item_id.astype(str)\n", "df_no_weather.target_value = pd.to_numeric(df_no_weather.target_value, errors='coerce')\n", "df_no_weather.timestamp = pd.to_datetime(df_no_weather.timestamp\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "df_no_weather['backtestwindow_start_time'] = pd.to_datetime(df_no_weather['backtestwindow_start_time']\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "df_no_weather['backtestwindow_end_time'] = pd.to_datetime(df_no_weather['backtestwindow_end_time']\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "# convert UTC timestamp to timezone unaware\n", "df_no_weather.timestamp = df_no_weather.timestamp.dt.tz_localize(None)\n", "\n", "# drop duplicates\n", "print(df_no_weather.shape)\n", "df_no_weather.drop_duplicates(inplace=True)\n", "print(df_no_weather.shape)\n", "\n", "# check\n", "num_items = len(df_no_weather['item_id'].value_counts(normalize=True, dropna=False))\n", "print(f\"Num items: {num_items}\")\n", "print()\n", "print(\"Backtest Window Start Dates\")\n", "print(df_no_weather.backtestwindow_start_time.unique())\n", "\n", "print(df_no_weather.dtypes)\n", "df_no_weather.sample(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Save FILE\n", "# local_file = \"tts.csv\"\n", "# # Save merged file locally\n", "# df_no_weather.to_csv(local_file, header=True, index=False)\n", "\n", "# # Save merged file to S3\n", "# key = f\"{project}/demo/no_weather_holidays_{datetime_string}.csv\"\n", "# boto3.Session().resource('s3').Bucket(bucket_name).Object(key).upload_file(local_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Export Weather Predictor backtest files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Weather Predictor, decide automatically where to save exported files\n", "\n", "default_weather_export_path = \\\n", " f's3://{bucket_name}/{project}/backtest_exports/{predictor_name_weather}_{datetime_string}/'\n", "print(f\"exporting to: {default_weather_export_path}\")\n", "\n", "weather_export_path = default_weather_export_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "## Call CreatePredictorBacktestExportJob using predictor Arn and S3 export path\n", "\n", "backtestExportJobName = predictor_name_weather\n", "backtest_export_job_response = forecast.create_predictor_backtest_export_job(\n", " PredictorBacktestExportJobName=backtestExportJobName,\n", " PredictorArn=predictor_arn_weather,\n", " Destination= {\n", " \"S3Config\" : {\n", " \"Path\":weather_export_path,\n", " \"RoleArn\": role_arn\n", " } \n", " })" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check for HTTPStatusCode 200\n", "backtest_export_job_arn_weather = backtest_export_job_response['PredictorBacktestExportJobArn']\n", "backtest_export_job_response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = util.wait(\n", " lambda: forecast.describe_predictor_backtest_export_job(\n", " PredictorBacktestExportJobArn=backtest_export_job_arn_weather))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Weather Predictor, decide automatically where to save backtest forecast files\n", "\n", "# path to files is everything after BUCKET_NAME/, it should end in \"/\"\n", "s3_path_to_files = weather_export_path.split(bucket_name)[1][1:]\n", "s3_path_to_files = s3_path_to_files + \"forecasted-values/\"\n", "print(f\"path to files: {s3_path_to_files}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## READ THE Weather BACKTEST FORECASTS FILE\n", "\n", "# Assemble and read weather predictor backtest forecasts\n", "df = read_backtest_predictions(bucket_name, s3_path_to_files)\n", "\n", "# correct data types\n", "df.item_id = df.item_id.astype(str)\n", "df.target_value = pd.to_numeric(df.target_value, errors='coerce')\n", "df.timestamp = pd.to_datetime(df.timestamp\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "df['backtestwindow_start_time'] = pd.to_datetime(df['backtestwindow_start_time']\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "df['backtestwindow_end_time'] = pd.to_datetime(df['backtestwindow_end_time']\n", " , format=\"%Y-%m-%dT%H:%M:%S\", errors='coerce')\n", "# convert UTC timestamp to timezone unaware\n", "df.timestamp = df.timestamp.dt.tz_localize(None)\n", "\n", "# drop duplicates\n", "print(df.shape)\n", "df.drop_duplicates(inplace=True)\n", "print(df.shape)\n", "\n", "# check\n", "num_items = len(df['item_id'].value_counts(normalize=True, dropna=False))\n", "print(f\"Num items: {num_items}\")\n", "print()\n", "print(\"Backtest Window Start Dates\")\n", "print(df.backtestwindow_start_time.unique())\n", "\n", "print(df.dtypes)\n", "df.sample(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Save FILE\n", "# local_file = \"tts.csv\"\n", "# # Save merged file locally\n", "# df.drop(\"velocity\", axis=1).to_csv(local_file, header=True, index=False)\n", "\n", "# # Save merged file to S3\n", "# key = f\"{project}/demo/weather_holidays_{datetime_string}.csv\"\n", "# boto3.Session().resource('s3').Bucket(bucket_name).Object(key).upload_file(local_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 5. Compare Predictors with and without Weather\n", "\n", "Now we use the backtest export files to compare the trained Forecast models with and without weather. This is equivalent to the train/valid inspection step in traditional Machine Learning.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get quantile columns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Map column names in your data to expected key words\n", "item_id = \"item_id\"\n", "target_value = \"target_value\"\n", "timestamp = \"timestamp\"\n", "location_id = \"item_id\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# set predictor dimensions from forecast df\n", "predictor_cols = ['item_id', 'timestamp', 'location'\n", " , 'backtestwindow_start_time', 'backtestwindow_end_time']\n", "# exclude cols to automatically find quantiles\n", "exclude_cols = predictor_cols.copy()\n", "exclude_cols.append(target_value)\n", "\n", "# get quantile columns from forecast dataframe\n", "quantile_cols = [c for c in df.columns if c not in exclude_cols] \n", "\n", "# check for \"velocity\" if found remove it\n", "if \"velocity\" in quantile_cols:\n", " df.drop(\"velocity\", axis=1, inplace=True)\n", " quantile_cols = [c for c in df.columns if c not in exclude_cols] \n", "\n", "# save quantile cols for later\n", "num_quantiles = len(quantile_cols)\n", "print(f\"num quantiles: {num_quantiles}\")\n", "quantile_cols" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# correct data types weather forecasts\n", "for q in quantile_cols:\n", " df[q] = pd.to_numeric(df[q], errors='coerce')\n", "print(df.dtypes)\n", "df.sample(5)\n", "\n", "\n", "# correct data types no weather forecasts\n", "for q in quantile_cols:\n", " df_no_weather[q] = pd.to_numeric(df_no_weather[q], errors='coerce')\n", "print(df_no_weather.dtypes)\n", "df_no_weather.sample(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " #### Before calling error calcs, truncate negative actuals and predictions to 0\n", "If you are not expecting negatives, such as for counts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### Before calling error calcs, truncate negative actuals and predictions to 0\n", "print(\"truncating weather predictions...\")\n", "df = truncate_negatives_to_zero(df\n", " , target_value_col=target_value\n", " , quantile_cols=quantile_cols)\n", "print()\n", "print(\"truncating no weather predictions...\")\n", "df_no_weather = truncate_negatives_to_zero(df_no_weather\n", " , target_value_col=target_value\n", " , quantile_cols=quantile_cols)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## CALCULATE DEMAND VOLUME OF ITEMS\n", "\n", "# categorize items as \"top movers\" or not\n", "top_moving_items, slow_moving_items = get_top_moving_items(df, timestamp, target_value, item_id)\n", "\n", "# assign item to top movers or not\n", "df['velocity'] = \"slow\"\n", "df.loc[(df.item_id.isin(top_moving_items)), 'velocity'] = 'top'\n", "\n", "# checkit\n", "print(df.velocity.value_counts(normalize=True, dropna=False))\n", "df.sample(5)\n", "\n", "\n", "## Display breakdown: how many top-moving items\n", "total_items_cnt = len(top_moving_items) + len(slow_moving_items)\n", "print(f\"number of top moving items: {len(top_moving_items)}, ratio:{len(top_moving_items) / total_items_cnt}\")\n", "print(f\"number of slow moving items: {len(slow_moving_items)}, ratio: {len(slow_moving_items) / total_items_cnt}\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize item-level accuracy\n", "\n", "Choose items that have above average sales volume for the rest of the analysis. \n", "\n", "The reason is typically item sales have negative binomial distribution - meaning about 20% of the items account for about 80% of sales. We want to do our analysis around the top-selling items; otherwise we might do analysis around mostly items with 0 sales." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Assemble plot data\n", "winning_quantile = \"p60\" \n", "random_items = ['161', '162', '163', '170', '142']\n", "\n", "# assemble no_weather and weather values\n", "df_plot = df.loc[(df.item_id.isin(random_items)), [item_id, timestamp, target_value, winning_quantile]].copy()\n", "temp = df_no_weather.loc[(df_no_weather.item_id.isin(random_items)), [item_id, timestamp, winning_quantile]].copy()\n", "df_plot = df_plot.merge(temp, how=\"inner\", on=[\"item_id\", timestamp])\n", "df_plot.columns = [item_id, timestamp, \"actual_value\"\n", " , f\"{winning_quantile}_weather\", f\"{winning_quantile}_no_weather\"]\n", "df_plot.set_index(timestamp, inplace=True)\n", "df_plot.head(2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## VISUALIZE BASE AND WEATHER FORECASTS\n", "\n", "\n", "# Note to self: if plot below shows in scrolling window, select Cell > Current Outputs > Toggle scrolling\n", "# np.warnings.filterwarnings('ignore') \n", "fig, axs = plt.subplots(len(random_items), 1, figsize=(15, 15), sharex=True)\n", "\n", "# Adjust the scaling factor to fit your legend text completely outside the plot\n", "# (smaller value results in more space being made for the legend)\n", "plt.subplots_adjust(right=0.85)\n", "\n", "# select backtest window range when there was a weather event\n", "zoomed = df_plot.copy()\n", "zoomed = zoomed.loc[\"2020-02-23\":\"2020-02-29\"].copy()\n", "\n", "\n", "for i in range(len(random_items)):\n", " \n", " item = random_items[i]\n", " zoomed2 = zoomed.loc[(zoomed[item_id]==item), :].copy()\n", "\n", " zoomed2[['actual_value']].plot(ax=axs[i], color='k')\n", " colors = ['mediumpurple', 'orange', 'deepskyblue']\n", " \n", " zoomed2[[f\"{winning_quantile}_weather\"]].plot(ax=axs[i], color=colors[2])\n", " zoomed2[[f\"{winning_quantile}_no_weather\"]].plot(ax=axs[i], color=colors[1])\n", " \n", " axs[i].set_title(f\"item_id={item}\") \n", " # set common Y label \n", " fig.text(0.04, 0.5, 'Hourly demand', va='center', rotation='vertical')\n", " \n", " # format the x ticks\n", " axs[i].set_xlabel(\"Datetime\") \n", " \n", " # remove each individual subplot legend\n", " axs[i].get_legend().remove()\n", " # create single legend outside plot and show only 1x\n", " if i == 0:\n", " fig.legend(loc=\"center right\",\n", " edgecolor='g', framealpha=0.5,\n", " borderaxespad=2)\n", " \n", " # format the grid\n", " axs[i].grid(False)\n", " axs[i].grid(which='minor', axis='x')\n", " \n", "plt.plot();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Visualize items - zoom in to see hours\n", "\n", "fig, axs = plt.subplots(len(random_items), 1, figsize=(15, 15), sharex=True)\n", "\n", "# Adjust the scaling factor to fit your legend text completely outside the plot\n", "# (smaller value results in more space being made for the legend)\n", "plt.subplots_adjust(right=0.85)\n", "\n", "# select backtest window range when there was a weather event\n", "zoomed = df_plot.copy()\n", "zoomed = zoomed.loc[\"2020-02-26\":\"2020-02-27\"].copy()\n", "\n", "\n", "for i in range(len(random_items)):\n", " \n", " item = random_items[i]\n", " zoomed2 = zoomed.loc[(zoomed[item_id]==item), :]\n", "\n", " zoomed2[['actual_value']].plot(ax=axs[i], color='k')\n", " colors = ['mediumpurple', 'orange', 'deepskyblue']\n", " \n", " zoomed2[[f\"{winning_quantile}_weather\"]].plot(ax=axs[i], color=colors[2])\n", " zoomed2[[f\"{winning_quantile}_no_weather\"]].plot(ax=axs[i], color=colors[1])\n", " \n", " axs[i].set_title(f\"Item_id={item}\") \n", " # set common Y label instead \n", " fig.text(0.04, 0.5, 'Hourly demand', va='center', rotation='vertical')\n", " \n", " # format the x ticks\n", " axs[i].set_xlabel(\"Datetime\") \n", " \n", " # remove each individual subplot legend\n", " axs[i].get_legend().remove()\n", " # create single legend outside plot and show only 1x\n", " if i == 0:\n", " fig.legend(loc=\"center right\",\n", " edgecolor='g', framealpha=0.5,\n", " borderaxespad=2)\n", " \n", " # format the grid\n", " axs[i].grid(False)\n", " axs[i].grid(which='minor', axis='x')\n", "\n", "plt.plot();\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more details about item-level accuracy metrics see blogs and notebook/\n", "\n", " \n", "For now, we'll calculate overall Weighted Quantile Loss (wQL) for the Weather and No Weather models. Then we'll decide which model to deploy based on which model has lowest wQL. \n", "\n", "In Amazon Forecast, \"deploying a model\" means generate Forecasts from the selected Predictor model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### CALCULATE WQL FOR THE NO WEATHER MODEL\n", "\n", "metrics = df_no_weather.loc[(df_no_weather.item_id.isin(top_moving_items)), :].copy()\n", "all_items = metrics.item_id.unique()\n", "g = metrics.groupby([item_id]).sum()\n", "metrics = g.copy()\n", "metrics = metrics.reset_index(inplace=True)\n", "\n", "for q in quantile_cols:\n", " # parse the quantile\n", " if q == \"mean\":\n", " len_quantile_cols = len(quantile_cols) - 1\n", " continue #skip mean since it's not a quantile\n", " else:\n", " quantile = pd.to_numeric(q.split('p')[1]) / 100\n", " \n", " agg = pd.DataFrame()\n", " for i in all_items:\n", " temp = g.copy()\n", " temp.reset_index(inplace=True)\n", " temp = temp.loc[(temp.item_id==i), :]\n", "\n", " # wql at item level\n", " temp[f\"wql_no_weather_{q}\"] = calc_quantile_loss(temp[target_value]\n", " , temp[q]\n", " , quantile)\n", " agg = pd.concat([agg, temp], axis=0)\n", "\n", " # align indexes for low memory merging\n", " agg.set_index([item_id], inplace=True)\n", " metrics = pd.concat([metrics, agg[[f\"wql_no_weather_{q}\"]]], axis=1) # join='inner'\n", "\n", "# metrics.drop(quantile_cols, axis=1, inplace=True)\n", "print(metrics.shape)\n", "\n", "# Show mean wQL for all top-selling items\n", "np.mean(metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### CALCULATE WQL FOR THE WEATHER MODEL\n", "\n", "metrics = df.loc[(df.velocity==\"top\"), :].copy()\n", "all_items = metrics.item_id.unique()\n", "g = metrics.groupby([item_id]).sum()\n", "metrics = g.copy()\n", "metrics = metrics.reset_index(inplace=True)\n", "\n", "for q in quantile_cols:\n", " # parse the quantile\n", " if q == \"mean\":\n", " len_quantile_cols = len(quantile_cols) - 1\n", " continue #skip mean since it's not a quantile\n", " else:\n", " quantile = pd.to_numeric(q.split('p')[1]) / 100\n", " \n", " agg = pd.DataFrame()\n", " for i in all_items:\n", " temp = g.copy()\n", " temp.reset_index(inplace=True)\n", " temp = temp.loc[(temp.item_id==i), :]\n", "\n", " # wql at item level\n", " temp[f\"wql_weather_{q}\"] = calc_quantile_loss(temp[target_value]\n", " , temp[q]\n", " , quantile)\n", " agg = pd.concat([agg, temp], axis=0)\n", "\n", " # align indexes for low memory merging\n", " agg.set_index([item_id], inplace=True)\n", " metrics = pd.concat([metrics, agg[[f\"wql_weather_{q}\"]]], axis=1) # join='inner'\n", "\n", "# metrics.drop(quantile_cols, axis=1, inplace=True)\n", "print(metrics.shape)\n", "\n", "# Show mean wQL for all top-selling items\n", "np.mean(metrics)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since the weather model has lowest Weighted Quantile Loss (wQL), we would select the Weather Index Predictor and create Forecasts from that predictor.\n", "\n", "We'll skip this step for now. You can see examples how to create forecasts, query, and delete forecasts by API calls in other sample notebooks, for example \n", "
  • Amazon forecast notebook using related time series\n", "
  • \n", "
    " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 6. Cleaning up your Resources" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once we have completed the above steps, we can start to cleanup the resources we created. All delete jobs, except for `delete_dataset_group` are asynchronous, so we have added the helpful `wait_till_delete` function. \n", "Resource Limits documented here. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### For clean-up, this should be uncommented." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete Backtest Export File jobs for both algorithms\n", "\n", "# util.wait_till_delete(lambda: \\\n", "# forecast.delete_predictor_backtest_export_job(\n", "# PredictorBacktestExportJobArn = backtest_export_job_arn_weather)\n", "# )\n", "# util.wait_till_delete(lambda: \\\n", "# forecast.delete_predictor_backtest_export_job(\n", "# PredictorBacktestExportJobArn = backtest_export_job_arn_no_weather)\n", "# )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete predictors\n", "# util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn = predictor_arn_weather))\n", "# util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn = predictor_arn_no_weather))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete the target time series import job\n", "# util.wait_till_delete(lambda: forecast.delete_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete the target time series datasets\n", "# util.wait_till_delete(lambda: forecast.delete_dataset(DatasetArn=ts_dataset_arn))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete dataset group\n", "# forecast.delete_dataset_group(DatasetGroupArn=dataset_group_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Delete IAM role\n", "# util.delete_iam_role(role_name )" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }