# Forecasting Electricity Demand with DeepAR


This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.

In this notebook, will see how to:
* Prepare and process a dataset using a remote distributed Spark Cluster
* Use the SageMaker Python SDK to train a DeepAR model and deploy it
* Make requests to the deployed model to obtain forecasts interactively


## Dataset

We'll use a ~700MB dataset of energy consumption by 370 clients over time. This [dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) comes from the UCI Machine Learning Repostiory and was used in the academic papers [[1](https://media.nips.cc/nipsbooks/nipspapers/paper_files/nips29/reviews/526.html)] and [[2](https://arxiv.org/abs/1704.04110)].  The dataset comes in the following format:

|    | date                | client   |   value |
|---:|:--------------------|:---------|--------:|
|  0 | 2011-01-01 00:15:00 | MT_001   |       0 |
|  1 | 2011-01-01 00:30:00 | MT_001   |       0 |
|  2 | 2011-01-01 00:45:00 | MT_001   |       0 |
|  3 | 2011-01-01 01:00:00 | MT_001   |       0 |
|  4 | 2011-01-01 01:15:00 | MT_001   |       0 |

The first column contains the timestamp of the observation in 15 min increments. The `client` column uniquely identifies each timeseries (i.e. the customer), and the `value` column provides the electricity (kW) usage for that interval.


### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. 

For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms
![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)

## Notebook Scoped Dependencies
Notebook-scoped libraries provide you the following benefits:

* Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.
* Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.
* Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster.

In [None]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [None]:
# Check Pre-Installed Python Packages from Bootstrap
sc.list_packages()

In [None]:
# Install pyarrow to run vectorized UDFs
sc.install_pypi_package("pyarrow") 

# Initial Setup
In the following cells we'll performa some preliminary setup steps including:
1. Importing the sagemaker SDK library
2. Setting up variables for the execution role, bucket, and S3 location of our data and artifacts
3. Creating a schema that will be used by spark when reading the data

In [None]:
%%local

# Utilize SageMaker Studio instance to run commands locally on notebook
import sagemaker
from sagemaker import get_execution_role


role = get_execution_role()
sess = sagemaker.Session()
bucket = sess.default_bucket()
key_prefix = "forecasting-electricity"
s3_processed_data_location = f"s3://{bucket}/{key_prefix}/data/processed/" # location where spark will write the processed data for training

s3_input_data_location = "s3://ws-assets-prod-iad-r-iad-ed304a55c2ca1aee/9e2e09b0-7142-4ab8-8b89-531349b817b9/deep-ar-electricity/LD2011_2014.csv.gz"
schema = "date TIMESTAMP, client STRING, value FLOAT" # source data schema

Now we have all we need to preprocess the data with spark. We'll send to spark cluster the location of the input data, the S3 location of where we'd like the output to go, and the schema information

In [None]:
%%send_to_spark -i s3_input_data_location -t str -n s3_input_data_location

In [None]:
%%send_to_spark -i s3_processed_data_location -t str -n s3_processed_data_location

In [None]:
%%send_to_spark -i schema -t str -n schema

# Data preprocessing with Apache Spark

For DeepAR we'll need to transform the timeseries data into a json lines format where each line contains a json object representing each client and having the following schema: <br>
`{"start": ..., "target": [0, 0, 0, 0], "dynamic_feat": [[0, 1, 1, 0]], "cat": [0, 0]}` <br>
We'll only use the `start` attribute which contains the start date for the timesries, the `target` attribute which contains the observations, and the `cat` attribute with which will encode each client as a category. DeepAR supports providing additional categorical and continuous features to improve the quality of the forecast

Here we will read the data from S3, and then use a compination of PySpark and PandasUDFs to get the data into the right format

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

import random
import pyspark.sql.functions as fn
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType, IntegerType

In [None]:
data = (spark
        .read
        .schema(schema)
        .options(sep =',', header=True, mode="FAILFAST", timestampFormat="yyyy-MM-dd HH:mm:ss")
        .csv(s3_input_data_location)
       )

In [None]:
# Cache for faster performance
data.cache() 

In [None]:
%%pretty
data.show(10)

In [None]:
# Resample from 15min intervals to one hour to speed up training
data = (data.groupBy(fn.date_trunc("HOUR", fn.col("date")).alias("date"),
                    fn.col("client"))
 .agg(fn.mean("value").alias("value"))
)

In [None]:
# Create a dictionary to Integer encode each client
client_list = data.select("client").distinct().collect()
client_list = [rec["client"] for rec in client_list]
client_encoder = dict(zip(client_list, range(len(client_list)))) 

## Let's visualize the timeseries data for a random subset of clients

In [None]:
random_client_list = random.sample(client_list, 6)

random_clients_pandas_df = (data.where(fn.col("client")
                                            .isin(random_client_list)) 
                                 .groupBy("date")
                                 .pivot("client").max().toPandas()
                                )
random_clients_pandas_df.set_index("date", inplace=True)

In [None]:
plt.clf()
axes = random_clients_pandas_df.resample("1D").max().plot(subplots=True,
                               figsize=(14, 6),
                               layout=(3, 2),
                               title=random_client_list,
                               legend=False,
                               rot=0,
                               lw=1, 
                               color="k")
for ax in axes.flatten():
    ax.set_xlabel('')

plt.suptitle("Electricity Demand")
plt.gcf().tight_layout()

%matplot plt

DeepAR requires no gaps in your data. So for example if you have data that only comes in Monday to Friday (e.g. stock trading activity), we'd have to insert NaN data points to account for Saturdays and Sundays. A quick way to check if our data has any gaps is to aggregate by the day of the week. Running the commands below we can see that the difference between the count and the lowest count is 24 Hours which is ok as it just means that the last datapoint falls midweek. Also the counts match across all customers so it appears that this dataset does not have any gaps

In [None]:
weekday_counts = (data
 .withColumn("dayofweek", fn.dayofweek("date"))
 .groupBy("client")
 .pivot("dayofweek")
 .count()
)

In [None]:
%%pretty
weekday_counts.show(5) # show aggregates for several clients
weekday_counts.agg(*[fn.min(col) for col in weekday_counts.columns[1:]]).show() # show minimum counts of observations across all clients
weekday_counts.agg(*[fn.max(col) for col in weekday_counts.columns[1:]]).show() # show maximum counts of observations across all clients

## Split our timeseries datasets

In [None]:
train_start_date = data.select(fn.min("date").alias("date")).collect()[0]["date"]
test_start_date = "2014-01-01"
end_date = data.select(fn.max("date").alias("date")).collect()[0]["date"]

In [None]:
print(f"overall date span: {train_start_date} to {end_date}")

In [None]:
# split the data into train and test set
train_data = data.where(fn.col("date") < test_start_date)
test_data = data.where(fn.col("date") >= test_start_date)

In [None]:
# pandasUDFs require an output schema. This one matches the format required for DeepAR
deep_ar_schema = StructType([StructField("target", ArrayType(DoubleType())),
                             StructField("cat", ArrayType(IntegerType())),
                             StructField("start", StringType())
                            ])

In [None]:
@pandas_udf(deep_ar_schema, PandasUDFType.GROUPED_MAP)
def prep_deep_ar(df):
    
    df = df.sort_values(by="date")
    client_name = df.loc[0, "client"]
    targets = df["value"].values.tolist()
    cat = [client_encoder[client_name]]
    start = str(df.loc[0,"date"])
    
    return pd.DataFrame([[targets, cat, start]], columns=["target", "cat", "start"])

In [None]:
train_data = train_data.groupBy("client").apply(prep_deep_ar)
train_data.show(5)

In [None]:
# Set flag so that _SUCCESS meta files are not written to S3
# DeepAR actually skips these files anyway, but it's a good practice when using directories as inputs to algorithms
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [None]:
# data is ready for DeepAR an can be written to the specified output destination
train_data.write.mode("overwrite").json(s3_processed_data_location)

In [None]:
%%local
sagemaker.s3.S3Downloader().list(s3_processed_data_location)

# Train model with SageMaker DeepAR
Switching back to the local notebook, we can now configure a DeepAR training job <br>
We need to provide the location of the training data and specify several hyperparameters

In [None]:
%%local
from sagemaker import image_uris
image_uri = image_uris.retrieve("forecasting-deepar", sess.boto_region_name)
freq = '1H' # 1 hour frequency
prediction_length = 168  # predict one week forward
context_length = 168 # look at the past week of data
s3_output_path = f"s3://{bucket}/{key_prefix }/output"

In [None]:
%%local
hyperparameters = {
    "time_freq": freq,
    "context_length": str(context_length),
    "prediction_length": str(prediction_length),
    "num_cells": "40",
    "num_layers": "3",
    "likelihood": "gaussian",
    "epochs": "5",
    "mini_batch_size": "32",
    "learning_rate": "0.001",
    "dropout_rate": "0.05",
    "early_stopping_patience": "10",
}

In [None]:
%%local
deepar_estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sess,
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    base_job_name="deepar-electricity-demand",
    hyperparameters=hyperparameters,
    output_path=s3_output_path
)

deepar_estimator.fit({"train": s3_processed_data_location})

# Run Batch Inference 
Now that we have a trained model, let's setup a batch transform job. We will provide the final month of our training data (December 2013) as the input and have DeepAR forecast the first week of the test data. We will then compare the prediction against the actual values <br>
Note for DeepAR, we need to provide at a minimum of `context_length` worth of data points to get a forecast for the `prediction_length`. Providing more data during inference (ideally the enitre timeseries) could result in better predictions as DeepAR is better able to account for longer term trends

In [None]:
%%local
s3_batch_transform_input = f"s3://{bucket}/{key_prefix}/bt_input"

In [None]:
%%send_to_spark -i s3_batch_transform_input -t str -n s3_batch_transform_input

In [None]:
# to avoid having to recreate a DeepAR input, we'll get the index of the start date of the batch transform input and slice the target column in the train_data
bt_input_start = "2013-12-01 00:00:00"
date_range = pd.date_range(train_start_date, end_date, freq="1H") # date range for the entire dataset
bt_start_index = date_range.get_loc(bt_input_start)

In [None]:
bt_input_data = train_data.select(fn.lit(bt_input_start).alias("start"), 
                                  fn.col("cat"),
                                  fn.slice("target", bt_start_index, 10_000).alias("target"))

In [None]:
bt_input_data.write.mode("overwrite").json(s3_batch_transform_input)

In [None]:
%%local
s3_bt_output_path = f"s3://{bucket}/{key_prefix}/bt_output"

deepar_transformer = deepar_estimator.transformer(instance_count=1,
                                                  instance_type="ml.m5.xlarge",
                                                  strategy="SingleRecord",
                                                  assemble_with="Line",
                                                  accept="application/jsonlines",
                                                  output_path= s3_bt_output_path
                                                 )

In [None]:
%%local
deepar_transformer.transform(s3_batch_transform_input, 
                             content_type="application/jsonlines", 
                             join_source="Input", 
                             split_type="Line",
                             logs=False)

In [None]:
%%send_to_spark -i s3_bt_output_path -t str -n s3_bt_output_path

In [None]:
bt_output = spark.read.json(s3_bt_output_path) # read batch transform output from S3

## Visualize Forecast Results

In [None]:
def plot_forecast(client, actual, time_range, predictions):
    
    
    p10 = predictions['quantiles']['0.1']
    p25 = predictions['quantiles']['0.2']
    p50 = predictions['quantiles']['0.5']
    p75 = predictions['quantiles']['0.8']
    p90 = predictions['quantiles']['0.9']
    prediction = predictions['mean']
    
    fig, ax = plt.subplots(figsize=(14,6))
    ts = ax.plot(time_range[-prediction_length:], prediction,  label="Prediction", marker="o")
    ts = ax.plot(time_range[:len(actual)], actual, label="Actual", marker="X")
    ax.fill_between(time_range[-prediction_length:], p10, p25, alpha=0.5, label="P10-P20", color="#2A9D8F")
    ax.fill_between(time_range[-prediction_length:], p25, p75, alpha=0.5, label="P20-P80", color="#E9C46A")
    ax.fill_between(time_range[-prediction_length:], p75, p90, alpha=0.5, label="P80-P90", color="#E76F51")
    ax.legend(loc="best")
    
    ax.set(title=f"{client} Electricity Demand Forecast", xlabel="date", ylabel="demand")

In [None]:
# let's visualize the forecast of a single random customer and compare against the actual values
rnd_client, rnd_client_enc = random.choice(list(client_encoder.items()))
forecast = bt_output.filter(fn.col("cat")[0] == rnd_client_enc).collect()[0]["SageMakerOutput"].asDict()
prediction_length = len(forecast["mean"])

forecast_start_index = date_range.get_loc(test_start_date).start
forecast_end_index = forecast_start_index + prediction_length
forecast_date_range = date_range[forecast_start_index:forecast_end_index]
forecast_date_range_str = list(map(str, forecast_date_range.to_list()))
actual_values = (data
     .where((fn.col("client") == rnd_client) & 
            (fn.col("date").isin(forecast_date_range_str)))
     .orderBy("date")
     .toPandas()
    )["value"].values.tolist()

plt.clf()
plot_forecast(rnd_client, actual_values, forecast_date_range, forecast)
%matplot plt

# Cleanup


In [None]:
%%cleanup -f