In [None]:
# MDM demos

Make sure replace the following variables to the corresponding ones in your environment.
- athena_output_bucket
- APIUrl
- ML_endpoint

In [None]:
from pyathena import connect 
import pandas as pd 
import matplotlib.pyplot as plt
import altair as alt
from vega_datasets import data
import json
import urllib3

athena_output_bucket = 'ml-prediction-pipeline-athenaquerybucket-xxxxxxxx'
region = 'us-east-1'

connection = connect(s3_staging_dir='s3://{}/'.format(athena_output_bucket), region_name=region) 
APIUrl = 'https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/{}'
ML_endpoint = "ml-endpoint-weather-0731"

## Real-time forecast for a specific meter

In [None]:
# Invoke API gateway to send forecast request via Lambda to Sagemaker endpoint
# if using notebook, Sagemaker role needs to have API gateway invoke permission
def get_forecast(meter_id, start, end):
 # Access API to get cluster endpoint name and temporary credentials
 http = urllib3.PoolManager()
 endpoint = "forecast/{}?data_start={}&data_end={}&ml_endpoint_name={}".format(meter_id, start, end, ML_endpoint)
 forecast_api_url = APIUrl.format(endpoint)

 response = http.request('GET', forecast_api_url)
 return response.data.decode()

resp = get_forecast('MAC004734', "2013-05-01", "2013-10-01")

# convert response to dataframe and visualize
df = pd.read_json(resp)
df.plot()

## Get forecast from batch forecast result, can be one or many meters

```python
meter_range = ['MAC000002', 'MAC000010']
query = '''select meter_id, datetime, consumption from "meter-data".forecast
 where meter_id between {} and {};'''.format(meter_range[0], meter_range[1])
df = pd.read_sql(query, connection)
```

## Get Anomaly for a specific meter 
This visualization example requires weather data although the API supports w/o weather data.

In [None]:

def plot_anomalies_wo_weather(forecasted):
 interval = alt.Chart(forecasted).mark_area(interpolate="basis", color = '#7FC97F').encode(
 x=alt.X('ds:T', title ='date'),
 y='yhat_upper',
 y2='yhat_lower',
 tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper']
 ).interactive().properties(
 title='Anomaly Detection'
 )

 fact = alt.Chart(forecasted).mark_line(color = '#774009').encode(
 x='ds:T',
 y=alt.Y('consumption', title='consumption')
 ).interactive()

 #apparenttemperature = alt.Chart(forecasted).mark_line(color = '#40F9F9').encode(
 # x='ds:T',
 # y='apparenttemperature'
 #)

 anomalies = alt.Chart(forecasted[forecasted.anomaly!=0]).mark_circle(size=30, color = 'Red').encode(
 x='ds:T',
 y=alt.Y('consumption', title='consumption'),
 tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper'],
 size = alt.Size( 'importance', legend=None)
 ).interactive()

 return alt.layer(interval, fact, anomalies)\
 .properties(width=870, height=450)\
 .configure_title(fontSize=20)

def plot_anomalies(forecasted):
 interval = alt.Chart(forecasted).mark_area(interpolate="basis", color = '#7FC97F').encode(
 x=alt.X('ds:T', title ='date'),
 y='yhat_upper',
 y2='yhat_lower',
 tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper', 'temperature', 'apparenttemperature']
 ).interactive().properties(
 title='Anomaly Detection'
 )

 fact = alt.Chart(forecasted).mark_line(color = '#774009').encode(
 x='ds:T',
 y=alt.Y('consumption', title='consumption')
 ).interactive()

 apparenttemperature = alt.Chart(forecasted).mark_line(color = '#40F9F9').encode(
 x='ds:T',
 y='apparenttemperature'
 )

 anomalies = alt.Chart(forecasted[forecasted.anomaly!=0]).mark_circle(size=30, color = 'Red').encode(
 x='ds:T',
 y=alt.Y('consumption', title='consumption'),
 tooltip=['ds', 'consumption', 'yhat_lower', 'yhat_upper', 'temperature', 'apparenttemperature'],
 size = alt.Size( 'importance', legend=None)
 ).interactive()

 return alt.layer(interval, fact, apparenttemperature, anomalies)\
 .properties(width=870, height=450)\
 .configure_title(fontSize=20)

def get_forecast(meter_id, start, end, outlier_only):
 # Access API to get cluster endpoint name and temporary credentials
 http = urllib3.PoolManager()
 endpoint = "anomaly/{}?data_start={}&data_end={}&outlier_only=0".format(meter_id, start, end)
 anomaly_api_url = APIUrl.format(endpoint)


 response = http.request('GET', anomaly_api_url)

 return response.data.decode()

# Call rest API to get anomaly
resp = get_forecast('MAC000005', "2013-01-01", "2013-12-31", 0)

# convert response to dataframe and visualize
df = pd.read_json(resp)
plot_anomalies_wo_weather(df)

## Get outage

In [None]:
def get_outage(start, end):
 # Access API to get cluster endpoint name and temporary credentials
 http = urllib3.PoolManager()
 endpoint = "outage?start_date_time={}&end_date_time={}".format(start, end)
 outageAPIUrl = APIUrl.format(endpoint)

 response = http.request('GET', outageAPIUrl)

 return response.data

# Call rest API to get outages
resp = get_outage("2013-01-03 09:00:01", "2013-01-03 10:59:59")
data = json.loads(resp)
df = pd.DataFrame(data['Items']) 
df_result = df[['meter_id', 'lat', 'long']].drop_duplicates()
df_result

In [None]:
from vega_datasets import data

counties = alt.topo_feature(data.us_10m.url, 'counties')

# New York state background
# County id code starts with state id. 36 is NY state
map_newyork =(
 alt.Chart(data = counties)
 .mark_geoshape(
 stroke='black',
 strokeWidth=1
 )
 .transform_calculate(state_id = "(datum.id / 1000)|0")
 .transform_filter((alt.datum.state_id)==36)
 .encode(color=alt.value('lightgray'))
 .properties(
 width=800,
 height=640
 )
)

# meter positions on background
points = alt.Chart(df_result.head(500)).mark_circle().encode(
 longitude='long:Q',
 latitude='lat:Q',
 color=alt.value('orange'),
 tooltip=['meter_id']
).properties(
 title='Power outage in New York'
)

map_newyork + points