{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Prepare data for forecasting Electricity Demand with Glue\n",
"\n",
"\n",
"This notebook is tested using `Studio SparkAnalytics 1.0 - Glue PySpark Kernel` running on a `ml.t3.medium` instance and connected to a Serverless Spark cluster running using the AWS Glue Managed service. Please ensure that you see `Glue PySpark (SparkAnalytics 1.0)` in the top right on your notebook.\n",
"\n",
"In this notebook, will see how to:\n",
"* Prepare and process a dataset using a remote distributed Spark Cluster\n",
"* Save processed data to S3 for model building"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dataset\n",
"\n",
"We'll use a ~700MB dataset of energy consumption by 370 clients over time. This [dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) comes from the UCI Machine Learning Repostiory and was used in the academic papers [[1](https://media.nips.cc/nipsbooks/nipspapers/paper_files/nips29/reviews/526.html)] and [[2](https://arxiv.org/abs/1704.04110)]. The dataset comes in the following format:\n",
"\n",
"| | date | client | value |\n",
"|---:|:--------------------|:---------|--------:|\n",
"| 0 | 2011-01-01 00:15:00 | MT_001 | 0 |\n",
"| 1 | 2011-01-01 00:30:00 | MT_001 | 0 |\n",
"| 2 | 2011-01-01 00:45:00 | MT_001 | 0 |\n",
"| 3 | 2011-01-01 01:00:00 | MT_001 | 0 |\n",
"| 4 | 2011-01-01 01:15:00 | MT_001 | 0 |\n",
"\n",
"The first column contains the timestamp of the observation in 15 min increments. The `client` column uniquely identifies each timeseries (i.e. the customer), and the `value` column provides the electricity (kW) usage for that interval.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"%help\n",
"\n",
"%session_id_prefix aim313-task4\n",
"%glue_version 3.0\n",
"%idle_timeout 60"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Initial Setup\n",
"In the following cells we'll performa some preliminary setup steps including:\n",
"1. Run the commands to describe the stack and get the bucket name to store outputs to\n",
"2. In the next cell set this value to the bucket variable"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import boto3\n",
"\n",
"sts = boto3.client(\"sts\")\n",
"account_id = sts.get_caller_identity()[\"Account\"]\n",
"\n",
"region = boto3.session.Session().region_name\n",
"\n",
"bucket = f\"sagemaker-{region}-{account_id}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"key_prefix = \"forecasting-electricity\"\n",
"s3_processed_data_location = f\"s3://{bucket}/{key_prefix}/data/processed/\" # location where spark will write the processed data for training\n",
"\n",
"s3_input_data_location = \"s3://ee-assets-prod-us-east-1/modules/183f0dce72fc496f85c6215965998db5/v1/deep-ar-electricity/LD2011_2014.csv\"\n",
"schema = \"date TIMESTAMP, client STRING, value FLOAT\" # source data schema"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from pyspark.sql.functions import split, lower, hour\n",
"print(spark.version)\n",
"day_to_analyze = \"2022-01-05\"\n",
"df = spark.read.json(f\"s3://openaq-fetches/realtime-gzipped/{day_to_analyze}/1641409725.ndjson.gz\")\n",
"df_air = spark.read.schema(df.schema).json(f\"s3://openaq-fetches/realtime-gzipped/{day_to_analyze}/*\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_city = df_air.filter(lower((df_air.city)).contains('delhi')).filter(df_air.parameter == \"no2\").cache()\n",
"df_avg = df_city.withColumn(\"Hour\", hour(df_city.date.utc)).groupBy(\"Hour\").avg(\"value\").withColumnRenamed(\"avg(value)\", \"no2_avg\")\n",
"\n",
"df_avg.sort(\"Hour\").show(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"# Examples of reading / writing to other data stores: \n",
"# https://github.com/aws-samples/aws-glue-samples/tree/master/examples/notebooks\n",
"\n",
"df_avg.write.parquet(f\"s3://{bucket}/runs2/{day_to_analyze}.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_avg.show(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data preprocessing with Apache Spark\n",
"\n",
"For DeepAR we'll need to transform the timeseries data into a json lines format where each line contains a json object representing each client and having the following schema:
\n",
"`{\"start\": ..., \"target\": [0, 0, 0, 0], \"dynamic_feat\": [[0, 1, 1, 0]], \"cat\": [0, 0]}`
\n",
"We'll only use the `start` attribute which contains the start date for the timesries, the `target` attribute which contains the observations, and the `cat` attribute with which will encode each client as a category. DeepAR supports providing additional categorical and continuous features to improve the quality of the forecast\n",
"\n",
"Here we will read the data from S3, and then use a compination of PySpark and PandasUDFs to get the data into the right format"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"\n",
"import random\n",
"import pyspark.sql.functions as fn\n",
"from pyspark.sql.functions import pandas_udf, PandasUDFType\n",
"from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType, IntegerType"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data = (spark\n",
" .read\n",
" .schema(schema)\n",
" .options(sep =',', header=True, mode=\"FAILFAST\", timestampFormat=\"yyyy-MM-dd HH:mm:ss\")\n",
" .csv(s3_input_data_location)\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cache for faster performance\n",
"data.cache() "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data.show(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Resample from 15min intervals to one hour to speed up training\n",
"data = (data.groupBy(fn.date_trunc(\"HOUR\", fn.col(\"date\")).alias(\"date\"),\n",
" fn.col(\"client\"))\n",
" .agg(fn.mean(\"value\").alias(\"value\"))\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a dictionary to Integer encode each client\n",
"client_list = data.select(\"client\").distinct().collect()\n",
"client_list = [rec[\"client\"] for rec in client_list]\n",
"client_encoder = dict(zip(client_list, range(len(client_list)))) \n",
"len(client_encoder)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"random_client_list = random.sample(client_list, 6)\n",
"\n",
"random_clients_pandas_df = (data.where(fn.col(\"client\")\n",
" .isin(random_client_list)) \n",
" .groupBy(\"date\")\n",
" .pivot(\"client\").max().toPandas()\n",
" )\n",
"random_clients_pandas_df.set_index(\"date\", inplace=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"DeepAR requires no gaps in your data. So for example if you have data that only comes in Monday to Friday (e.g. stock trading activity), we'd have to insert NaN data points to account for Saturdays and Sundays. A quick way to check if our data has any gaps is to aggregate by the day of the week. Running the commands below we can see that the difference between the count and the lowest count is 24 Hours which is ok as it just means that the last datapoint falls midweek. Also the counts match across all customers so it appears that this dataset does not have any gaps"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"weekday_counts = (data\n",
" .withColumn(\"dayofweek\", fn.dayofweek(\"date\"))\n",
" .groupBy(\"client\")\n",
" .pivot(\"dayofweek\")\n",
" .count()\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"weekday_counts.show(5) # show aggregates for several clients\n",
"weekday_counts.agg(*[fn.min(col) for col in weekday_counts.columns[1:]]).show() # show minimum counts of observations across all clients\n",
"weekday_counts.agg(*[fn.max(col) for col in weekday_counts.columns[1:]]).show() # show maximum counts of observations across all clients"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split our timeseries datasets"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_start_date = data.select(fn.min(\"date\").alias(\"date\")).collect()[0][\"date\"]\n",
"test_start_date = \"2014-01-01\"\n",
"end_date = data.select(fn.max(\"date\").alias(\"date\")).collect()[0][\"date\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"overall date span: {train_start_date} to {end_date}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# split the data into train and test set\n",
"train_data = data.where(fn.col(\"date\") < test_start_date)\n",
"test_data = data.where(fn.col(\"date\") >= test_start_date)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# pandasUDFs require an output schema. This one matches the format required for DeepAR\n",
"deep_ar_schema = StructType([StructField(\"target\", ArrayType(DoubleType())),\n",
" StructField(\"cat\", ArrayType(IntegerType())),\n",
" StructField(\"start\", StringType())\n",
" ])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@pandas_udf(deep_ar_schema, PandasUDFType.GROUPED_MAP)\n",
"def prep_deep_ar(df):\n",
" \n",
" df = df.sort_values(by=\"date\")\n",
" client_name = df.loc[0, \"client\"]\n",
" targets = df[\"value\"].values.tolist()\n",
" cat = [client_encoder[client_name]]\n",
" start = str(df.loc[0,\"date\"])\n",
" \n",
" return pd.DataFrame([[targets, cat, start]], columns=[\"target\", \"cat\", \"start\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_data = train_data.groupBy(\"client\").apply(prep_deep_ar)\n",
"train_data.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set flag so that _SUCCESS meta files are not written to S3\n",
"# DeepAR actually skips these files anyway, but it's a good practice when using directories as inputs to algorithms\n",
"spark.conf.set(\"mapreduce.fileoutputcommitter.marksuccessfuljobs\", \"false\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# data is ready for DeepAR an can be written to the specified output destination\n",
"train_data.write.mode(\"overwrite\").json(s3_processed_data_location)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"print(f\"Preprocessed data written to s3: {s3_processed_data_location}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this notebook, you have performed the same preprocessing as Task-2, but using a Glue interactive session, i.e., you have no cluster to provision or manage the infrastructure for.\n",
"\n",
"Now, you can run the same training job as in Task 2, using DeepAR built-in algorithm, to train your model."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cleanup\n",
"\n",
"Since the session is serverless, the Glue session will auto terminate after the specified minutes (60 minutes) of inactivity\n"
]
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Glue PySpark (SparkAnalytics 1.0)",
"language": "python",
"name": "conda-env-sm_glue_is-glue_pyspark__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkanalytics-v1"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "Python_Glue_Session",
"pygments_lexer": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}