{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Forecasting Electricity Demand with DeepAR\n", "\n", "\n", "This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR cluster with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.\n", "\n", "In this notebook, will see how to:\n", "* Prepare and process a dataset using a remote distributed Spark Cluster\n", "* Use the SageMaker Python SDK to train a DeepAR model and deploy it\n", "* Make requests to the deployed model to obtain forecasts interactively\n", "\n", "\n", "Connect to the **ExistingCluster** by following the same directions as Task 1. You should see a cell similar to the below for initiating the connection. \n", "```\n", "%load_ext sagemaker_studio_analytics_extension.magics\n", "%sm_analytics emr connect --cluster-id j-xxxxxxxxxxxx --auth-type None \n", "```\n", "Wait for the cluster to be connected to execute the remaining cells in this notebook." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dataset\n", "\n", "We'll use a ~700MB dataset of energy consumption by 370 clients over time. This [dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014) comes from the UCI Machine Learning Repostiory and was used in the academic papers [[1](https://media.nips.cc/nipsbooks/nipspapers/paper_files/nips29/reviews/526.html)] and [[2](https://arxiv.org/abs/1704.04110)]. The dataset comes in the following format:\n", "\n", "| | date | client | value |\n", "|---:|:--------------------|:---------|--------:|\n", "| 0 | 2011-01-01 00:15:00 | MT_001 | 0 |\n", "| 1 | 2011-01-01 00:30:00 | MT_001 | 0 |\n", "| 2 | 2011-01-01 00:45:00 | MT_001 | 0 |\n", "| 3 | 2011-01-01 01:00:00 | MT_001 | 0 |\n", "| 4 | 2011-01-01 01:15:00 | MT_001 | 0 |\n", "\n", "The first column contains the timestamp of the observation in 15 min increments. The `client` column uniquely identifies each timeseries (i.e. the customer), and the `value` column provides the electricity (kW) usage for that interval.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook Scoped Dependencies\n", "Notebook-scoped libraries provide you the following benefits:\n", "\n", "* Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.\n", "* Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.\n", "* Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{ \"conf\":{\n", " \"spark.pyspark.python\": \"python3\",\n", " \"spark.pyspark.virtualenv.enabled\": \"true\",\n", " \"spark.pyspark.virtualenv.type\":\"native\",\n", " \"spark.pyspark.virtualenv.bin.path\":\"/usr/bin/virtualenv\"\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check Pre-Installed Python Packages from Bootstrap\n", "sc.list_packages()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install pyarrow to run vectorized UDFs\n", "sc.install_pypi_package(\"pyarrow\") " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Initial Setup\n", "In the following cells we'll performa some preliminary setup steps including:\n", "1. Importing the sagemaker SDK library\n", "2. Setting up variables for the execution role, bucket, and S3 location of our data and artifacts\n", "3. Creating a schema that will be used by spark when reading the data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "\n", "# Utilize SageMaker Studio instance to run commands locally on notebook\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "\n", "\n", "role = get_execution_role()\n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "key_prefix = \"forecasting-electricity\"\n", "s3_processed_data_location = f\"s3://{bucket}/{key_prefix}/data/processed/\" # location where spark will write the processed data for training\n", "\n", "s3_input_data_location = \"s3://ee-assets-prod-us-east-1/modules/183f0dce72fc496f85c6215965998db5/v1/deep-ar-electricity/LD2011_2014.csv\"\n", "schema = \"date TIMESTAMP, client STRING, value FLOAT\" # source data schema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have all we need to preprocess the data with spark. We'll send to spark cluster the location of the input data, the S3 location of where we'd like the output to go, and the schema information" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i s3_input_data_location -t str -n s3_input_data_location" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i s3_processed_data_location -t str -n s3_processed_data_location" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i schema -t str -n schema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data preprocessing with Apache Spark\n", "\n", "For DeepAR we'll need to transform the timeseries data into a json lines format where each line contains a json object representing each client and having the following schema:
\n", "`{\"start\": ..., \"target\": [0, 0, 0, 0], \"dynamic_feat\": [[0, 1, 1, 0]], \"cat\": [0, 0]}`
\n", "We'll only use the `start` attribute which contains the start date for the timesries, the `target` attribute which contains the observations, and the `cat` attribute with which will encode each client as a category. DeepAR supports providing additional categorical and continuous features to improve the quality of the forecast\n", "\n", "Here we will read the data from S3, and then use a compination of PySpark and PandasUDFs to get the data into the right format" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "import random\n", "import pyspark.sql.functions as fn\n", "from pyspark.sql.functions import pandas_udf, PandasUDFType\n", "from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType, IntegerType" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = (spark\n", " .read\n", " .schema(schema)\n", " .options(sep =',', header=True, mode=\"FAILFAST\", timestampFormat=\"yyyy-MM-dd HH:mm:ss\")\n", " .csv(s3_input_data_location)\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Cache for faster performance\n", "data.cache() " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%pretty\n", "data.show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Resample from 15min intervals to one hour to speed up training\n", "data = (data.groupBy(fn.date_trunc(\"HOUR\", fn.col(\"date\")).alias(\"date\"),\n", " fn.col(\"client\"))\n", " .agg(fn.mean(\"value\").alias(\"value\"))\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a dictionary to Integer encode each client\n", "client_list = data.select(\"client\").distinct().collect()\n", "client_list = [rec[\"client\"] for rec in client_list]\n", "client_encoder = dict(zip(client_list, range(len(client_list)))) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Let's visualize the timeseries data for a random subset of clients" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_client_list = random.sample(client_list, 6)\n", "\n", "random_clients_pandas_df = (data.where(fn.col(\"client\")\n", " .isin(random_client_list)) \n", " .groupBy(\"date\")\n", " .pivot(\"client\").max().toPandas()\n", " )\n", "random_clients_pandas_df.set_index(\"date\", inplace=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.clf()\n", "axes = random_clients_pandas_df.resample(\"1D\").max().plot(subplots=True,\n", " figsize=(14, 6),\n", " layout=(3, 2),\n", " title=random_client_list,\n", " legend=False,\n", " rot=0,\n", " lw=1, \n", " color=\"k\")\n", "for ax in axes.flatten():\n", " ax.set_xlabel('')\n", "\n", "plt.suptitle(\"Electricity Demand\")\n", "plt.gcf().tight_layout()\n", "\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "DeepAR requires no gaps in your data. So for example if you have data that only comes in Monday to Friday (e.g. stock trading activity), we'd have to insert NaN data points to account for Saturdays and Sundays. A quick way to check if our data has any gaps is to aggregate by the day of the week. Running the commands below we can see that the difference between the count and the lowest count is 24 Hours which is ok as it just means that the last datapoint falls midweek. Also the counts match across all customers so it appears that this dataset does not have any gaps" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "weekday_counts = (data\n", " .withColumn(\"dayofweek\", fn.dayofweek(\"date\"))\n", " .groupBy(\"client\")\n", " .pivot(\"dayofweek\")\n", " .count()\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%pretty\n", "weekday_counts.show(5) # show aggregates for several clients\n", "weekday_counts.agg(*[fn.min(col) for col in weekday_counts.columns[1:]]).show() # show minimum counts of observations across all clients\n", "weekday_counts.agg(*[fn.max(col) for col in weekday_counts.columns[1:]]).show() # show maximum counts of observations across all clients" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Split our timeseries datasets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_start_date = data.select(fn.min(\"date\").alias(\"date\")).collect()[0][\"date\"]\n", "test_start_date = \"2014-01-01\"\n", "end_date = data.select(fn.max(\"date\").alias(\"date\")).collect()[0][\"date\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"overall date span: {train_start_date} to {end_date}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# split the data into train and test set\n", "train_data = data.where(fn.col(\"date\") < test_start_date)\n", "test_data = data.where(fn.col(\"date\") >= test_start_date)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# pandasUDFs require an output schema. This one matches the format required for DeepAR\n", "deep_ar_schema = StructType([StructField(\"target\", ArrayType(DoubleType())),\n", " StructField(\"cat\", ArrayType(IntegerType())),\n", " StructField(\"start\", StringType())\n", " ])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pandas_udf(deep_ar_schema, PandasUDFType.GROUPED_MAP)\n", "def prep_deep_ar(df):\n", " \n", " df = df.sort_values(by=\"date\")\n", " client_name = df.loc[0, \"client\"]\n", " targets = df[\"value\"].values.tolist()\n", " cat = [client_encoder[client_name]]\n", " start = str(df.loc[0,\"date\"])\n", " \n", " return pd.DataFrame([[targets, cat, start]], columns=[\"target\", \"cat\", \"start\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_data = train_data.groupBy(\"client\").apply(prep_deep_ar)\n", "train_data.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set flag so that _SUCCESS meta files are not written to S3\n", "# DeepAR actually skips these files anyway, but it's a good practice when using directories as inputs to algorithms\n", "spark.conf.set(\"mapreduce.fileoutputcommitter.marksuccessfuljobs\", \"false\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# data is ready for DeepAR an can be written to the specified output destination\n", "train_data.write.mode(\"overwrite\").json(s3_processed_data_location)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "sagemaker.s3.S3Downloader().list(s3_processed_data_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Train model with SageMaker DeepAR\n", "Switching back to the local notebook, we can now configure a DeepAR training job
\n", "We need to provide the location of the training data and specify several hyperparameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "from sagemaker import image_uris\n", "image_uri = image_uris.retrieve(\"forecasting-deepar\", sess.boto_region_name)\n", "freq = '1H' # 1 hour frequency\n", "prediction_length = 168 # predict one week forward\n", "context_length = 168 # look at the past week of data\n", "s3_output_path = f\"s3://{bucket}/{key_prefix }/output\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "hyperparameters = {\n", " \"time_freq\": freq,\n", " \"context_length\": str(context_length),\n", " \"prediction_length\": str(prediction_length),\n", " \"num_cells\": \"40\",\n", " \"num_layers\": \"3\",\n", " \"likelihood\": \"gaussian\",\n", " \"epochs\": \"5\",\n", " \"mini_batch_size\": \"32\",\n", " \"learning_rate\": \"0.001\",\n", " \"dropout_rate\": \"0.05\",\n", " \"early_stopping_patience\": \"10\",\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "deepar_estimator = sagemaker.estimator.Estimator(\n", " sagemaker_session=sess,\n", " image_uri=image_uri,\n", " role=role,\n", " instance_count=1,\n", " instance_type=\"ml.c5.2xlarge\",\n", " base_job_name=\"deepar-electricity-demand\",\n", " hyperparameters=hyperparameters,\n", " output_path=s3_output_path\n", ")\n", "\n", "deepar_estimator.fit({\"train\": s3_processed_data_location})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run Batch Inference \n", "Now that we have a trained model, let's setup a batch transform job. We will provide the final month of our training data (December 2013) as the input and have DeepAR forecast the first week of the test data. We will then compare the prediction against the actual values
\n", "Note for DeepAR, we need to provide at a minimum of `context_length` worth of data points to get a forecast for the `prediction_length`. Providing more data during inference (ideally the enitre timeseries) could result in better predictions as DeepAR is better able to account for longer term trends" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "s3_batch_transform_input = f\"s3://{bucket}/{key_prefix}/bt_input\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i s3_batch_transform_input -t str -n s3_batch_transform_input" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# to avoid having to recreate a DeepAR input, we'll get the index of the start date of the batch transform input and slice the target column in the train_data\n", "bt_input_start = \"2013-12-01 00:00:00\"\n", "date_range = pd.date_range(train_start_date, end_date, freq=\"1H\") # date range for the entire dataset\n", "bt_start_index = date_range.get_loc(bt_input_start)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bt_input_data = train_data.select(fn.lit(bt_input_start).alias(\"start\"), \n", " fn.col(\"cat\"),\n", " fn.slice(\"target\", bt_start_index, 10_000).alias(\"target\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bt_input_data.write.mode(\"overwrite\").json(s3_batch_transform_input)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "s3_bt_output_path = f\"s3://{bucket}/{key_prefix}/bt_output\"\n", "\n", "deepar_transformer = deepar_estimator.transformer(instance_count=1,\n", " instance_type=\"ml.m5.xlarge\",\n", " strategy=\"SingleRecord\",\n", " assemble_with=\"Line\",\n", " accept=\"application/jsonlines\",\n", " output_path= s3_bt_output_path\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "deepar_transformer.transform(s3_batch_transform_input, \n", " content_type=\"application/jsonlines\", \n", " join_source=\"Input\", \n", " split_type=\"Line\",\n", " logs=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i s3_bt_output_path -t str -n s3_bt_output_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bt_output = spark.read.json(s3_bt_output_path) # read batch transform output from S3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize Forecast Results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot_forecast(client, actual, time_range, predictions):\n", " \n", " \n", " p10 = predictions['quantiles']['0.1']\n", " p25 = predictions['quantiles']['0.2']\n", " p50 = predictions['quantiles']['0.5']\n", " p75 = predictions['quantiles']['0.8']\n", " p90 = predictions['quantiles']['0.9']\n", " prediction = predictions['mean']\n", " \n", " fig, ax = plt.subplots(figsize=(14,6))\n", " ts = ax.plot(time_range[-prediction_length:], prediction, label=\"Prediction\", marker=\"o\")\n", " ts = ax.plot(time_range[:len(actual)], actual, label=\"Actual\", marker=\"X\")\n", " ax.fill_between(time_range[-prediction_length:], p10, p25, alpha=0.5, label=\"P10-P20\", color=\"#2A9D8F\")\n", " ax.fill_between(time_range[-prediction_length:], p25, p75, alpha=0.5, label=\"P20-P80\", color=\"#E9C46A\")\n", " ax.fill_between(time_range[-prediction_length:], p75, p90, alpha=0.5, label=\"P80-P90\", color=\"#E76F51\")\n", " ax.legend(loc=\"best\")\n", " \n", " ax.set(title=f\"{client} Electricity Demand Forecast\", xlabel=\"date\", ylabel=\"demand\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# let's visualize the forecast of a single random customer and compare against the actual values\n", "rnd_client, rnd_client_enc = random.choice(list(client_encoder.items()))\n", "forecast = bt_output.filter(fn.col(\"cat\")[0] == rnd_client_enc).collect()[0][\"SageMakerOutput\"].asDict()\n", "prediction_length = len(forecast[\"mean\"])\n", "\n", "forecast_start_index = date_range.get_loc(test_start_date).start\n", "forecast_end_index = forecast_start_index + prediction_length\n", "forecast_date_range = date_range[forecast_start_index:forecast_end_index]\n", "forecast_date_range_str = list(map(str, forecast_date_range.to_list()))\n", "actual_values = (data\n", " .where((fn.col(\"client\") == rnd_client) & \n", " (fn.col(\"date\").isin(forecast_date_range_str)))\n", " .orderBy(\"date\")\n", " .toPandas()\n", " )[\"value\"].values.tolist()\n", "\n", "plt.clf()\n", "plot_forecast(rnd_client, actual_values, forecast_date_range, forecast)\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Terminate Livy sessions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%cleanup -f" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }