{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Use AWS Glue Databrew from within Jupyter notebooks to prepare data for ML models \n", "\n", "\n", "---\n", "\n", "This notebook walks through the steps to configure and use open source Jupyterlab extension for AWS Glue Databrew to prepare data for a sample anomaly detection model.\n", "\n", "The [electricity consumption dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014#) is used in this notebook. A subset of original dataset with 4 customer datapoints is used as a starting point. A series of DataBrew transformations are applied on the dataset to prepare it for Random Cut Forests anomaly detection model. On the prepared dataset, a RCF model is trained and deployed in SageMaker\n", "\n", "\n", "Please make sure the kernel is set to 'python3'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install the packages needed to run this notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install awswrangler\n", "!pip install --upgrade sagemaker\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Import the packages " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker as sm\n", "from sagemaker import *\n", "import awswrangler as wr\n", "import matplotlib.pyplot as plt\n", "import os\n", "import pandas as pd\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### S3 bucket where the raw and transformed data will be stored and the role details" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "session = sm.Session()\n", "### **** 'data_bucket' should point to bucket name you are using DataBrew and model Training ***** #### \n", "data_bucket=session.default_bucket() \n", "#s3_bucket=#input_s3_bucket#\n", "role_arn=session.get_caller_identity_arn()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data Preparation using AWS Glue DataBrew" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Exploring the prepared data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pc_processed_path=os.path.join('s3://',data_bucket,'prefix_where_DataBrew_output_is_stored')\n", "columns=['timestamp','client_id','hourly_consumption']\n", "pc_processed_df = wr.s3.read_csv(path=pc_processed_path)\n", "pc_processed_df=pc_processed_df [columns]\n", "#columns[0]='timestamp'\n", "#pc_processed_df.columns=columns\n", "pc_processed_df.client_id.unique()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### plotting the raw timeseries electricity consumption" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure, axes = plt.subplots(3, 1)\n", "figure.set_figheight(8)\n", "figure.set_figwidth(15)\n", "pc_processed_df[pc_processed_df['client_id']=='MT_012'].plot(ax=axes[0],title='MT_012') \n", "pc_processed_df[pc_processed_df['client_id']=='MT_013'].plot(ax=axes[1],title='MT_013')\n", "pc_processed_df[pc_processed_df['client_id']=='MT_132'].plot(ax=axes[2],title='MT_132')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Lets train our model with ***MT_132*** consumption data. Since RCF requires one time series and integer values, lets filter and convert the consumption data to inetger data type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df=pc_processed_df[(pc_processed_df['client_id']=='MT_132') & (pc_processed_df['timestamp']<'2014-11-01')]\n", "train_df=train_df.drop(['timestamp','client_id'],axis=1)\n", "train_df.hourly_consumption=train_df.hourly_consumption.astype('int32')\n", "train_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train RCF Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_train_path=os.path.join('s3://',data_bucket,'databrew_rcf','training','train.csv')\n", "s3_model_path=os.path.join('s3://',data_bucket,'databrew_rcf','model')\n", "\n", "wr.s3.to_csv(df=train_df,path=s3_train_path,header=False,index=False)\n", "training_channel=sm.inputs.TrainingInput(s3_data=s3_train_path,content_type='text/csv;label_size=0',distribution='ShardedByS3Key')\n", "channels={'train':training_channel}\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rcf_algo_uri=image_uris.retrieve('randomcutforest',session.boto_region_name)\n", "rcf_estimator= sm.estimator.Estimator(rcf_algo_uri,role=role_arn,instance_count=1,instance_type='ml.m5.large',output_path=s3_model_path)\n", "rcf_estimator.set_hyperparameters(feature_dim=1)\n", "rcf_estimator.fit(channels)\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the trained model " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rcf_endpoint_name='databrew-rcf-demo-endpoint'\n", "rcf_predictor=rcf_estimator.deploy(endpoint_name=rcf_endpoint_name,instance_type='ml.t2.medium',initial_instance_count=1,serializer=serializers.CSVSerializer(),deserializer=deserializers.JSONDeserializer())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Predictions and Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from statistics import mean,stdev\n", "test_df=pc_processed_df[(pc_processed_df['client_id']=='MT_012') & (pc_processed_df['timestamp'] >= '2014-01-01') &(pc_processed_df['hourly_consumption'] != 0)]\n", "test_df=test_df.tail(500)\n", "test_df_values=test_df['hourly_consumption'].astype('str').tolist()\n", "response=rcf_predictor.predict(test_df_values)\n", "scores = [datum[\"score\"] for datum in response[\"scores\"]]\n", "scores_mean=mean(scores)\n", "scores_std=stdev(scores)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### plot the prediction scores taking mean + or - 2*standard_deviation as the baseline " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_df['hourly_consumption'].plot(figsize=(40,10))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.figure(figsize=(40,10))\n", "plt.plot(scores)\n", "plt.autoscale(tight=True)\n", "plt.axhline(y=scores_mean+2*scores_std,color='red')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up by deleting the endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rcf_predictor.delete_endpoint()" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }