{ "cells": [ { "source": [ "# MDM demos" ], "cell_type": "code", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make sure replace the following variables to the corresponding ones in your environment.\n", "- athena_output_bucket\n", "- APIUrl\n", "- ML_endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyathena import connect \n", "import pandas as pd \n", "import matplotlib.pyplot as plt\n", "import altair as alt\n", "from vega_datasets import data\n", "import json\n", "import urllib3\n", "\n", "athena_output_bucket = 'ml-prediction-pipeline-athenaquerybucket-xxxxxxxx'\n", "region = 'us-east-1'\n", "\n", "connection = connect(s3_staging_dir='s3://{}/'.format(athena_output_bucket), region_name=region) \n", "APIUrl = 'https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/{}'\n", "ML_endpoint = \"ml-endpoint-weather-0731\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Real-time forecast for a specific meter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Invoke API gateway to send forecast request via Lambda to Sagemaker endpoint\n", "# if using notebook, Sagemaker role needs to have API gateway invoke permission\n", "def get_forecast(meter_id, start, end):\n", " # Access API to get cluster endpoint name and temporary credentials\n", " http = urllib3.PoolManager()\n", " endpoint = \"forecast/{}?data_start={}&data_end={}&ml_endpoint_name={}\".format(meter_id, start, end, ML_endpoint)\n", " forecast_api_url = APIUrl.format(endpoint)\n", "\n", " response = http.request('GET', forecast_api_url)\n", " return response.data.decode()\n", "\n", "resp = get_forecast('MAC004734', \"2013-05-01\", \"2013-10-01\")\n", "\n", "# convert response to dataframe and visualize\n", "df = pd.read_json(resp)\n", "df.plot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get forecast from batch forecast result, can be one or many meters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "meter_range = ['MAC000002', 'MAC000010']\n", "query = '''select meter_id, datetime, consumption from \"meter-data\".forecast\n", " where meter_id between {} and {};'''.format(meter_range[0], meter_range[1])\n", "df = pd.read_sql(query, connection)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get Anomaly for a specific meter \n", "This visualization example requires weather data although the API supports w/o weather data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "def plot_anomalies_wo_weather(forecasted):\n", " interval = alt.Chart(forecasted).mark_area(interpolate=\"basis\", color = '#7FC97F').encode(\n", " x=alt.X('ds:T', title ='date'),\n", " y='yhat_upper',\n", " y2='yhat_lower',\n", " tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper']\n", " ).interactive().properties(\n", " title='Anomaly Detection'\n", " )\n", "\n", " fact = alt.Chart(forecasted).mark_line(color = '#774009').encode(\n", " x='ds:T',\n", " y=alt.Y('consumption', title='consumption')\n", " ).interactive()\n", "\n", " #apparenttemperature = alt.Chart(forecasted).mark_line(color = '#40F9F9').encode(\n", " # x='ds:T',\n", " # y='apparenttemperature'\n", " #)\n", "\n", " anomalies = alt.Chart(forecasted[forecasted.anomaly!=0]).mark_circle(size=30, color = 'Red').encode(\n", " x='ds:T',\n", " y=alt.Y('consumption', title='consumption'),\n", " tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper'],\n", " size = alt.Size( 'importance', legend=None)\n", " ).interactive()\n", "\n", " return alt.layer(interval, fact, anomalies)\\\n", " .properties(width=870, height=450)\\\n", " .configure_title(fontSize=20)\n", "\n", "def plot_anomalies(forecasted):\n", " interval = alt.Chart(forecasted).mark_area(interpolate=\"basis\", color = '#7FC97F').encode(\n", " x=alt.X('ds:T', title ='date'),\n", " y='yhat_upper',\n", " y2='yhat_lower',\n", " tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper', 'temperature', 'apparenttemperature']\n", " ).interactive().properties(\n", " title='Anomaly Detection'\n", " )\n", "\n", " fact = alt.Chart(forecasted).mark_line(color = '#774009').encode(\n", " x='ds:T',\n", " y=alt.Y('consumption', title='consumption')\n", " ).interactive()\n", "\n", " apparenttemperature = alt.Chart(forecasted).mark_line(color = '#40F9F9').encode(\n", " x='ds:T',\n", " y='apparenttemperature'\n", " )\n", "\n", " anomalies = alt.Chart(forecasted[forecasted.anomaly!=0]).mark_circle(size=30, color = 'Red').encode(\n", " x='ds:T',\n", " y=alt.Y('consumption', title='consumption'),\n", " tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper', 'temperature', 'apparenttemperature'],\n", " size = alt.Size( 'importance', legend=None)\n", " ).interactive()\n", "\n", " return alt.layer(interval, fact, apparenttemperature, anomalies)\\\n", " .properties(width=870, height=450)\\\n", " .configure_title(fontSize=20)\n", "\n", "def get_forecast(meter_id, start, end, outlier_only):\n", " # Access API to get cluster endpoint name and temporary credentials\n", " http = urllib3.PoolManager()\n", " endpoint = \"anomaly/{}?data_start={}&data_end={}&outlier_only=0\".format(meter_id, start, end)\n", " anomaly_api_url = APIUrl.format(endpoint)\n", "\n", "\n", " response = http.request('GET', anomaly_api_url)\n", "\n", " return response.data.decode()\n", "\n", "# Call rest API to get anomaly\n", "resp = get_forecast('MAC000005', \"2013-01-01\", \"2013-12-31\", 0)\n", "\n", "# convert response to dataframe and visualize\n", "df = pd.read_json(resp)\n", "plot_anomalies_wo_weather(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get outage" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_outage(start, end):\n", " # Access API to get cluster endpoint name and temporary credentials\n", " http = urllib3.PoolManager()\n", " endpoint = \"outage?start_date_time={}&end_date_time={}\".format(start, end)\n", " outageAPIUrl = APIUrl.format(endpoint)\n", "\n", " response = http.request('GET', outageAPIUrl)\n", "\n", " return response.data\n", "\n", "# Call rest API to get outages\n", "resp = get_outage(\"2013-01-03 09:00:01\", \"2013-01-03 10:59:59\")\n", "data = json.loads(resp)\n", "df = pd.DataFrame(data['Items']) \n", "df_result = df[['meter_id', 'lat', 'long']].drop_duplicates()\n", "df_result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from vega_datasets import data\n", "\n", "counties = alt.topo_feature(data.us_10m.url, 'counties')\n", "\n", "# New York state background\n", "# County id code starts with state id. 36 is NY state\n", "map_newyork =(\n", " alt.Chart(data = counties)\n", " .mark_geoshape(\n", " stroke='black',\n", " strokeWidth=1\n", " )\n", " .transform_calculate(state_id = \"(datum.id / 1000)|0\")\n", " .transform_filter((alt.datum.state_id)==36)\n", " .encode(color=alt.value('lightgray'))\n", " .properties(\n", " width=800,\n", " height=640\n", " )\n", ")\n", "\n", "# meter positions on background\n", "points = alt.Chart(df_result.head(500)).mark_circle().encode(\n", " longitude='long:Q',\n", " latitude='lat:Q',\n", " color=alt.value('orange'),\n", " tooltip=['meter_id']\n", ").properties(\n", " title='Power outage in New York'\n", ")\n", "\n", "map_newyork + points" ] } ], "metadata": { "kernelspec": { "display_name": "Meter_demo", "language": "python", "name": "meter_demo" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }