# Now you can train on a global dataset and produce forecasts on selected items

We are excited to announce that [Amazon Forecast](https://aws.amazon.com/forecast/) now offers a new feature that allows customers to generate forecasted data points on a selected subset of time-series items. With this release, customers can import a global set of data, train a predictor (model) on the global set, yet generate forecasted data points (inference) surgically on a subset of time-series items, as needed, using a new "TimeSeriesSelector" available in the [CreateForecast API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/forecast.html#ForecastService.Client.create_forecast).

This notebook follows patterns seen in other Amazon Forecast notebooks; however, the key differentiator in this example is in Step 4 and Step 5, where two CreateForecast jobs are executed, driven by item_id values inside a CSV input file, resident on S3. These two small forecasts are then exported to S3 where their content is shown as part of the notebook example.

The provided notebook is saved in an executed state, so you may review outputs without having to run each cell, unless you choose to do so.

For this exercise, yellow taxi trip records are used from [NYC Taxi and Limousine Commission (TLC)](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)


## Table of Contents
* [Initial Setup](#setup)
* Step 1: [Upload sample data to S3](#upload)
* Step 2: [Create a dataset, import data and dataset group](#dataset)
* Step 3: [Create predictor](#predictors)
* Step 4: [Create forecasts for subset of items](#forecasts)
* Step 5: [Export and view forecasted data](#view)
* Step 6: [Cleanup](#cleanup)


# Initial Setup

### Upgrade boto3

Before proceeding, ensure you have upgraded boto3.

Important: To use the feature highlighted in this notebook, you will need boto3 1.24.1 or greater. In order to receive this SDK update, you will need to use Python 3.7 or later as per [this blog](https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/). As mentioned, Python 3.6 ended on May 30, 2022.

In [1]:
!pip install boto3 --upgrade

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/mxnet_latest_p37/bin/python -m pip install --upgrade pip' command.[0m


### Setup Imports

In [2]:
import boto3
from time import sleep
import subprocess
import sys
import os
import pandas as pd
import calendar

sys.path.insert( 0, os.path.abspath("../../common") )

import json
import util

### Function to supressing printing account numbers

In [3]:
import re

def mask_arn(input_string):

 mask_regex = re.compile(':[0-9]{12}:')
 mask = mask_regex.search(input_string)
 
 while mask:
 input_string = input_string.replace(mask.group(),'X'*12)
 mask = mask_regex.search(input_string) 
 
 return input_string

### Create an instance of AWS SDK client for Amazon Forecast

In [4]:
# Set your region accordingly, us-east-1 as shown
region = 'us-east-1'
session = boto3.Session(region_name=region) 
forecast = session.client(service_name='forecast')

# Checking to make sure we can communicate with Amazon Forecast
assert forecast.list_forecasts()

### Setup IAM Role used by Amazon Forecast to access your data

In [5]:
role_name = "ForecastNotebookRole-Basic"
print(f"Creating Role {mask_arn(role_name)}...")
role_arn = util.get_or_create_iam_role( role_name = role_name )

# echo user inputs without account
print(f"Success! Created role = {mask_arn(role_arn).split('/')[1]}")

Creating Role ForecastNotebookRole-Basic...
The role ForecastNotebookRole-Basic already exists, skipping creation
Done.
Success! Created role = ForecastNotebookRole-Basic


# Step 1: Upload sample data to S3

The dataset has the following 3 columns:
- timestamp: Timetamp at which pick-ups are requested.
- item_id: Pick-up location ID.
- target_value: Number of pick-ups requested around the timestamp at the pick-up location.

In addition, two small files are uploaded called subset1 and subset2. These contain a small list of three locations each and will be used to drive targeted forecast generation.

Note: As delivered, this uses the sample file in the data folder relative to this notebook. Please take care to ensure this file is available to your notebook.

In [8]:
bucket_name = input("\nEnter S3 bucket name for uploading the data and hit ENTER key:")

s3 = boto3.resource('s3')
s3.meta.client.upload_file('./data/taxi-dec2017-jan2019.csv', bucket_name, 'taxi_dec2017_jan2019.csv')
s3.meta.client.upload_file('./data/subset1.csv', bucket_name, 'subset1.csv')
s3.meta.client.upload_file('./data/subset2.csv', bucket_name, 'subset2.csv')



Enter S3 bucket name for uploading the data and hit ENTER key:forecast-us-east-1-XXXXXXXXXXXX


# Step 2: Create a dataset, import data and dataset group

### Create Dataset

In [9]:
DATASET_FREQUENCY = "H"
TS_DATASET_NAME = "TAXI_TIME_FORECAST_SUBSET_DEMO"
TS_SCHEMA = {
 "Attributes":[
 {
 "AttributeName":"timestamp",
 "AttributeType":"timestamp"
 },
 {
 "AttributeName":"item_id",
 "AttributeType":"string"
 },
 {
 "AttributeName":"target_value",
 "AttributeType":"integer"
 }
 ]
} 

create_dataset_response = forecast.create_dataset(Domain="CUSTOM",
 DatasetType='TARGET_TIME_SERIES',
 DatasetName=TS_DATASET_NAME,
 DataFrequency=DATASET_FREQUENCY,
 Schema=TS_SCHEMA)

ts_dataset_arn = create_dataset_response['DatasetArn']
describe_dataset_response = forecast.describe_dataset(DatasetArn=ts_dataset_arn)

print(f"Dataset ARN {mask_arn(ts_dataset_arn)} is now {describe_dataset_response['Status']}.")

Dataset ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXdataset/TAXI_TIME_FORECAST_SUBSET_DEMO is now ACTIVE.


### Import the related time series
This TTS files contains data for more than 200 item_id values (locations); later we will forecast a total of 6 locations in two Forecast jobs.

In [10]:
TS_IMPORT_JOB_NAME = 'taxi_dec2017_jan2019'
TIMESTAMP_FORMAT = "yyyy-MM-dd hh:mm:ss"
ts_s3_path = f"s3://{bucket_name}/{TS_IMPORT_JOB_NAME}.csv"
TIMEZONE = "EST"

#frequency of poll event from API to check status of tasks
sleep_duration=60

ts_dataset_import_job_response = \
 forecast.create_dataset_import_job(DatasetImportJobName=TS_IMPORT_JOB_NAME,
 DatasetArn=ts_dataset_arn,
 DataSource= {
 "S3Config" : {
 "Path": ts_s3_path,
 "RoleArn": role_arn
 } 
 },
 TimestampFormat=TIMESTAMP_FORMAT,
 TimeZone = TIMEZONE)

ts_dataset_import_job_arn = ts_dataset_import_job_response['DatasetImportJobArn']

print(f"Waiting for Dataset Import Job with ARN {mask_arn(ts_dataset_import_job_arn)} to become ACTIVE.")

status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn), sleep_duration)
 

Waiting for Dataset Import Job with ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXdataset-import-job/TAXI_TIME_FORECAST_SUBSET_DEMO/taxi_dec2017_jan2019 to become ACTIVE.
CREATE_PENDING 
CREATE_IN_PROGRESS .........
ACTIVE 


### Create a dataset group

In [11]:
DATASET_GROUP_NAME = "TAXI_TIME_FORECAST_SUBSET_DEMO"
DATASET_ARNS = [ts_dataset_arn]

create_dataset_group_response = \
 forecast.create_dataset_group(Domain="CUSTOM",
 DatasetGroupName=DATASET_GROUP_NAME,
 DatasetArns=DATASET_ARNS)

dataset_group_arn = create_dataset_group_response['DatasetGroupArn']
describe_dataset_group_response = forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)

print(f"The DatasetGroup with ARN {mask_arn(dataset_group_arn)} is now {describe_dataset_group_response['Status']}.")

The DatasetGroup with ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXdataset-group/TAXI_TIME_FORECAST_SUBSET_DEMO is now ACTIVE.


# Step 3: Create predictor

This step creates a daily predictor with a short, three-day horizon and minimal features for this demo. The imported dataset, as delivered, is more than 260 unique item_ids which represents demand at clustered locations. Later in this notebook, you will see that forecasts are generated only for the locations desired. The benefit here is a model is trained on a global data set and forecasts can be targeted and produced as demanded.

In [12]:
FORECAST_HORIZON = 3
FORECAST_FREQUENCY = "D"

create_auto_predictor_response = \
 forecast.create_auto_predictor(PredictorName = 'TAXI_TIME_FORECAST_SUBSET_DEMO',
 ForecastHorizon = FORECAST_HORIZON,
 ForecastFrequency = FORECAST_FREQUENCY,
 DataConfig = {
 'DatasetGroupArn': dataset_group_arn
 },
 ExplainPredictor = False)

predictor_arn = create_auto_predictor_response['PredictorArn']
print(f"Waiting for Predictor ARN {mask_arn(predictor_arn)} to become ACTIVE.")

#Wait on the predictors to complete and become active
status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn), sleep_duration)

Waiting for Predictor ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXpredictor/TAXI_TIME_FORECAST_SUBSET_DEMO_01G4GJTP6HNPNVRQ0QVVTPR0T1 to become ACTIVE.
CREATE_PENDING 
CREATE_IN_PROGRESS ..................................................
ACTIVE 


# Step 4: Create forecasts for subset of items

This step is the primary emphasis of this notebook. CSV file subset1.csv contains a list of item_id (location) 101, 102, 103. Subset2 is similar in concept, yet it contains another set of item_id values.

From the local file system, let's view the subset1.csv. The same file was uploaded to S3 earlier in this notebook, so it can be read by Amazon Forecast. This is the file that is driving the TimeSeriesIdentifier data source in the actual Create Forecast API. If your use case has multiple dimensions, your input file should reflect them as a multi-valued delimited record. In addition, the schema attribute definition in the below API call should match your physical file structure.

In [13]:
!cat ./data/subset1.csv

101
102
103


The API calls below produce two forecasts, one for each "needed" subset of time-series, keeping inference to a minimum. Without the subset selection in place, the create_forecast API will produce inference for every time-series in the dataset used to train the predictor.

In [14]:
#locations 101, 102, 103
subset1_path = f"s3://{bucket_name}/subset1.csv"

create_forecast_response = \
 forecast.create_forecast(ForecastName= 'TAXI_TIME_FORECAST_SUBSET_1',
 ForecastTypes= ["0.5"],
 PredictorArn= predictor_arn,
 TimeSeriesSelector= {
 "TimeSeriesIdentifiers": {
 "DataSource": {
 "S3Config": {
 "Path": subset1_path,
 "RoleArn": role_arn
 } 
 },
 "Format": 'CSV',
 "Schema": {
 "Attributes": [
 {
 "AttributeName": "item_id",
 "AttributeType": "string"
 }
 ]
 }
 }
 }
 )

subset1_forecast_arn = create_forecast_response['ForecastArn']


#locations 201, 202, 203

subset2_path = f"s3://{bucket_name}/subset2.csv"

create_forecast_response = \
 forecast.create_forecast(ForecastName= 'TAXI_TIME_FORECAST_SUBSET_2',
 ForecastTypes= ["0.5"],
 PredictorArn= predictor_arn,
 TimeSeriesSelector= {
 "TimeSeriesIdentifiers": {
 "DataSource": {
 "S3Config": {
 "Path": subset2_path,
 "RoleArn": role_arn
 } 
 },
 "Format": 'CSV',
 "Schema": {
 "Attributes": [
 {
 "AttributeName": "item_id",
 "AttributeType": "string"
 }
 ]
 }
 }
 }
 )

subset2_forecast_arn = create_forecast_response['ForecastArn']


print(f"Waiting for Subset 1 Forecast with ARN {mask_arn(subset1_forecast_arn)} to become ACTIVE.")
print(f"Waiting for Subset 2 Forecast with ARN {mask_arn(subset2_forecast_arn)} to become ACTIVE.")

status = util.wait(lambda: forecast.describe_forecast(ForecastArn=subset1_forecast_arn), sleep_duration)
status = util.wait(lambda: forecast.describe_forecast(ForecastArn=subset2_forecast_arn), sleep_duration)


Waiting for Subset 1 Forecast with ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXforecast/TAXI_TIME_FORECAST_SUBSET_1 to become ACTIVE.
Waiting for Subset 2 Forecast with ARN arn:aws:forecast:us-east-1XXXXXXXXXXXXforecast/TAXI_TIME_FORECAST_SUBSET_2 to become ACTIVE.
CREATE_PENDING 
CREATE_IN_PROGRESS ....................
ACTIVE 
ACTIVE 


# Step 5: Export and view forecasted datapoints

Below, the notebook shows how to export the time-series forecasted data points (predictions) that were created in the two forecast jobs created in Step 4. When the job is complete, CSV files are available for each time-series, time step and quantile requested.

In [15]:
# create task for exporting subset1

subset1_export_path = "s3://" + bucket_name + "/subset1"
subset1_export_name = "subset1_forecast_export"
forecast_response = forecast.create_forecast_export_job(ForecastExportJobName=subset1_export_name,
 ForecastArn=subset1_forecast_arn,
 Destination={
 "S3Config": {
 "Path": subset1_export_path,
 "RoleArn": role_arn
 }
 })

subset1_forecast_export_arn = forecast_response['ForecastExportJobArn']


# create task for exporting subset2
subset2_export_path = "s3://" + bucket_name + "/subset2"
subset2_export_name = "subset2_forecast_export"
forecast_response = forecast.create_forecast_export_job(ForecastExportJobName=subset2_export_name,
 ForecastArn=subset2_forecast_arn,
 Destination={
 "S3Config": {
 "Path": subset2_export_path,
 "RoleArn": role_arn
 }
 })

subset2_forecast_export_arn = forecast_response['ForecastExportJobArn']


# Next, wait on the task to complete
print(f"Waiting for Subset 1 Forecast export with {mask_arn(subset1_forecast_export_arn)} to become ACTIVE.")
print(f"Waiting for Subset 2 Forecast export with {mask_arn(subset2_forecast_export_arn)} to become ACTIVE.\n\n")

status = util.wait(lambda: forecast.describe_forecast_export_job(ForecastExportJobArn=subset1_forecast_export_arn), sleep_duration)
status = util.wait(lambda: forecast.describe_forecast_export_job(ForecastExportJobArn=subset2_forecast_export_arn), sleep_duration)


Waiting for Subset 1 Forecast export with arn:aws:forecast:us-east-1XXXXXXXXXXXXforecast-export-job/TAXI_TIME_FORECAST_SUBSET_1/subset1_forecast_export to become ACTIVE.
Waiting for Subset 2 Forecast export with arn:aws:forecast:us-east-1XXXXXXXXXXXXforecast-export-job/TAXI_TIME_FORECAST_SUBSET_2/subset2_forecast_export to become ACTIVE.


CREATE_PENDING 
CREATE_IN_PROGRESS .
ACTIVE 
CREATE_IN_PROGRESS .........
ACTIVE 


### Download exported forecasts locally using CLI's sync feature

In [16]:
os.system('aws s3 sync '+subset1_export_path+' ./data/')
os.system('aws s3 sync '+subset2_export_path+' ./data/')

0

### View the Predictions from Subset1

In the next cells, observe the item_id values in the input subset*.csv files. The output forecasted data points contain the same item_id values -- as they were driven by the input CSV. You may notice multiple output files exist too. This isn't a 1:1 relationship between input item_id and output files; instead this is due to the forecast export job being done from a multi-threaded backend server. Your number of files and content (cardinality) within may differ.

In [22]:
!cat ./data/subset1.csv

101
102
103


In [23]:
!cat ./data/subset1_forecast_export*.csv

item_id,date,p50
item_id,date,p50
101,2019-02-01T00:00:00Z,5.088283521094429
101,2019-02-02T00:00:00Z,4.567379136327473
101,2019-02-03T00:00:00Z,4.6882656109336915
item_id,date,p50
102,2019-02-01T00:00:00Z,5.999995178848844
102,2019-02-02T00:00:00Z,5.398893783080317
102,2019-02-03T00:00:00Z,5.434501873982511
item_id,date,p50
103,2019-02-01T00:00:00Z,-6.300245287129642
103,2019-02-02T00:00:00Z,-6.499229937267954
103,2019-02-03T00:00:00Z,-6.617409566614679


### View the Predictions from Subset2


In [24]:
!cat ./data/subset2.csv

201
202
203


In [25]:
!cat ./data/subset2_forecast_export*

item_id,date,p50
item_id,date,p50
203,2019-02-01T00:00:00Z,8.769411924588741
203,2019-02-02T00:00:00Z,7.390189030461122
203,2019-02-03T00:00:00Z,6.6944046360308755
item_id,date,p50
202,2019-02-01T00:00:00Z,13.011395465473562
202,2019-02-02T00:00:00Z,14.803267553466718
202,2019-02-03T00:00:00Z,11.073257424886641
item_id,date,p50
201,2019-02-01T00:00:00Z,1.0144781856082785
201,2019-02-02T00:00:00Z,0.936826154303063
201,2019-02-03T00:00:00Z,0.7691473730429197


# Step 6: Cleanup

You will need to allow a few minutes for each of these steps to complete.


In [None]:
forecast.delete_resource_tree(ResourceArn=dataset_group_arn)

Once the dataset group has been deleted (allow a few minutes), you may proceed. The following code will allow you to test and determine when the dataset group has been deleted. When you run this next cell, you may see your dataset group. Allow a couple minutes, and try again. Once your dataset is deleted, you may proceed to next step.

In [None]:
forecast.list_dataset_groups()

Delete dataset import jobs with TAXI_TIME_FORECAST_SUBSET_DEMO in the job name.

In [None]:
response = forecast.list_dataset_import_jobs()

for i in response['DatasetImportJobs']:

 try:
 if i['DatasetImportJobArn'].index('TAXI_TIME_FORECAST_SUBSET_DEMO'):
 print('Deleting',i['DatasetImportJobName'])
 forecast.delete_dataset_import_job(DatasetImportJobArn=i['DatasetImportJobArn'])
 except:
 pass

It will take a few minutes to delete the dataset import jobs. Once that is complete, the dataset can be deleted as follows in the next cell.

In [None]:
forecast.delete_dataset(DatasetArn=ts_dataset_arn)