# Prepare data for forecasting Electricity Demand with Glue


This notebook is tested using `Studio SparkAnalytics 1.0 - Glue PySpark Kernel` running on a `ml.t3.medium` instance and connected to a Serverless Spark cluster running using the AWS Glue Managed service. Please ensure that you see `Glue PySpark (SparkAnalytics 1.0)` in the top right on your notebook.

In this notebook, will see how to:
* Prepare and process a dataset using a remote distributed Spark Cluster
* Save processed data to S3 for model building

## Dataset

We'll use a ~700MB dataset of energy consumption by 370 clients over time. This [dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) comes from the UCI Machine Learning Repostiory and was used in the academic papers [[1](https://media.nips.cc/nipsbooks/nipspapers/paper_files/nips29/reviews/526.html)] and [[2](https://arxiv.org/abs/1704.04110)].  The dataset comes in the following format:

|    | date                | client   |   value |
|---:|:--------------------|:---------|--------:|
|  0 | 2011-01-01 00:15:00 | MT_001   |       0 |
|  1 | 2011-01-01 00:30:00 | MT_001   |       0 |
|  2 | 2011-01-01 00:45:00 | MT_001   |       0 |
|  3 | 2011-01-01 01:00:00 | MT_001   |       0 |
|  4 | 2011-01-01 01:15:00 | MT_001   |       0 |

The first column contains the timestamp of the observation in 15 min increments. The `client` column uniquely identifies each timeseries (i.e. the customer), and the `value` column provides the electricity (kW) usage for that interval.


In [None]:
%help

%session_id_prefix aim313-task4
%glue_version 3.0
%idle_timeout 60

In [None]:
spark

# Initial Setup
In the following cells we'll performa some preliminary setup steps including:
1. Run the commands to describe the stack and get the bucket name to store outputs to
2. In the next cell set this value to the bucket variable

In [None]:
import boto3

sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]

region = boto3.session.Session().region_name

bucket = f"sagemaker-{region}-{account_id}"

In [None]:
key_prefix = "forecasting-electricity"
s3_processed_data_location = f"s3://{bucket}/{key_prefix}/data/processed/" # location where spark will write the processed data for training

s3_input_data_location = "s3://ee-assets-prod-us-east-1/modules/183f0dce72fc496f85c6215965998db5/v1/deep-ar-electricity/LD2011_2014.csv"
schema = "date TIMESTAMP, client STRING, value FLOAT" # source data schema

In [None]:
from pyspark.sql.functions import split, lower, hour
print(spark.version)
day_to_analyze = "2022-01-05"
df = spark.read.json(f"s3://openaq-fetches/realtime-gzipped/{day_to_analyze}/1641409725.ndjson.gz")
df_air = spark.read.schema(df.schema).json(f"s3://openaq-fetches/realtime-gzipped/{day_to_analyze}/*")

In [None]:
df_city = df_air.filter(lower((df_air.city)).contains('delhi')).filter(df_air.parameter == "no2").cache()
df_avg = df_city.withColumn("Hour", hour(df_city.date.utc)).groupBy("Hour").avg("value").withColumnRenamed("avg(value)", "no2_avg")

df_avg.sort("Hour").show(10)

In [None]:
# Examples of reading / writing to other data stores: 
# https://github.com/aws-samples/aws-glue-samples/tree/master/examples/notebooks

df_avg.write.parquet(f"s3://{bucket}/runs2/{day_to_analyze}.parquet")

In [None]:
df_avg.show(10)

# Data preprocessing with Apache Spark

For DeepAR we'll need to transform the timeseries data into a json lines format where each line contains a json object representing each client and having the following schema: <br>
`{"start": ..., "target": [0, 0, 0, 0], "dynamic_feat": [[0, 1, 1, 0]], "cat": [0, 0]}` <br>
We'll only use the `start` attribute which contains the start date for the timesries, the `target` attribute which contains the observations, and the `cat` attribute with which will encode each client as a category. DeepAR supports providing additional categorical and continuous features to improve the quality of the forecast

Here we will read the data from S3, and then use a compination of PySpark and PandasUDFs to get the data into the right format

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

import random
import pyspark.sql.functions as fn
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType, IntegerType

In [None]:
data = (spark
        .read
        .schema(schema)
        .options(sep =',', header=True, mode="FAILFAST", timestampFormat="yyyy-MM-dd HH:mm:ss")
        .csv(s3_input_data_location)
       )

In [None]:
# Cache for faster performance
data.cache() 

In [None]:
data.show(10)

In [None]:
# Resample from 15min intervals to one hour to speed up training
data = (data.groupBy(fn.date_trunc("HOUR", fn.col("date")).alias("date"),
                    fn.col("client"))
 .agg(fn.mean("value").alias("value"))
)

In [None]:
# Create a dictionary to Integer encode each client
client_list = data.select("client").distinct().collect()
client_list = [rec["client"] for rec in client_list]
client_encoder = dict(zip(client_list, range(len(client_list)))) 
len(client_encoder)

In [None]:
random_client_list = random.sample(client_list, 6)

random_clients_pandas_df = (data.where(fn.col("client")
                                            .isin(random_client_list)) 
                                 .groupBy("date")
                                 .pivot("client").max().toPandas()
                                )
random_clients_pandas_df.set_index("date", inplace=True)

DeepAR requires no gaps in your data. So for example if you have data that only comes in Monday to Friday (e.g. stock trading activity), we'd have to insert NaN data points to account for Saturdays and Sundays. A quick way to check if our data has any gaps is to aggregate by the day of the week. Running the commands below we can see that the difference between the count and the lowest count is 24 Hours which is ok as it just means that the last datapoint falls midweek. Also the counts match across all customers so it appears that this dataset does not have any gaps

In [None]:
weekday_counts = (data
 .withColumn("dayofweek", fn.dayofweek("date"))
 .groupBy("client")
 .pivot("dayofweek")
 .count()
)

In [None]:
weekday_counts.show(5) # show aggregates for several clients
weekday_counts.agg(*[fn.min(col) for col in weekday_counts.columns[1:]]).show() # show minimum counts of observations across all clients
weekday_counts.agg(*[fn.max(col) for col in weekday_counts.columns[1:]]).show() # show maximum counts of observations across all clients

## Split our timeseries datasets

In [None]:
train_start_date = data.select(fn.min("date").alias("date")).collect()[0]["date"]
test_start_date = "2014-01-01"
end_date = data.select(fn.max("date").alias("date")).collect()[0]["date"]

In [None]:
print(f"overall date span: {train_start_date} to {end_date}")

In [None]:
# split the data into train and test set
train_data = data.where(fn.col("date") < test_start_date)
test_data = data.where(fn.col("date") >= test_start_date)

In [None]:
# pandasUDFs require an output schema. This one matches the format required for DeepAR
deep_ar_schema = StructType([StructField("target", ArrayType(DoubleType())),
                             StructField("cat", ArrayType(IntegerType())),
                             StructField("start", StringType())
                            ])

In [None]:
@pandas_udf(deep_ar_schema, PandasUDFType.GROUPED_MAP)
def prep_deep_ar(df):
    
    df = df.sort_values(by="date")
    client_name = df.loc[0, "client"]
    targets = df["value"].values.tolist()
    cat = [client_encoder[client_name]]
    start = str(df.loc[0,"date"])
    
    return pd.DataFrame([[targets, cat, start]], columns=["target", "cat", "start"])

In [None]:
train_data = train_data.groupBy("client").apply(prep_deep_ar)
train_data.show(5)

In [None]:
# Set flag so that _SUCCESS meta files are not written to S3
# DeepAR actually skips these files anyway, but it's a good practice when using directories as inputs to algorithms
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [None]:
# data is ready for DeepAR an can be written to the specified output destination
train_data.write.mode("overwrite").json(s3_processed_data_location)

In [None]:
print(f"Preprocessed data written to s3: {s3_processed_data_location}")

In this notebook, you have performed the same preprocessing as Task-2, but using a Glue interactive session, i.e., you have no cluster to provision or manage the infrastructure for.

Now, you can run the same training job as in Task 2, using DeepAR built-in algorithm, to train your model.

# Cleanup

Since the session is serverless, the Glue session will auto terminate after the specified minutes (60 minutes) of inactivity
