import boto3, time, s3fs, json, warnings, os
import urllib.request
from datetime import date, timedelta
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import sagemaker
from sagemaker.predictor import Predictor
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter
from sagemaker.model import Model
from multiprocessing import Pool
from IPython.display import display, HTML, Javascript
from bokeh.plotting import curdoc, figure, output_notebook, show
from bokeh.layouts import column, row, layout
from bokeh.models import GeoJSONDataSource, ColumnDataSource
from bokeh.models import Select, Slider
from bokeh.models.callbacks import CustomJS
from bokeh.tile_providers import CARTODBPOSITRON, get_provider
from bokeh.palettes import inferno
# the train test split date is used to split each time series into train and test sets
train_test_split_date = date.today() - timedelta(days=30)#).strftime('%y-%m-%d 00:00:00')
# the sampling frequency determines the number of hours per sample
# and is used for aggregating and filling missing values
frequency = '1' # hours
# prediction length is how many hours into future to predict values for
prediction_length = 48
# context length is how many prior time steps the predictor needs to make a prediction
context_length = 3
# the file to save predictions to
predictions_file = 'data/predictions.pkl'
# quantiles that will be predicted
quantiles = list(range(1,10))
quantile_names = [f'0.{q}' for q in quantiles]
warnings.filterwarnings('ignore')
session = boto3.Session()
region = session.region_name
account_id = session.client('sts').get_caller_identity().get('Account')
bucket_name = f"{account_id}-openaq-lab"
console_s3_uri= 'https://s3.console.aws.amazon.com/s3/object/'
s3 = boto3.client('s3', region_name = region)
os.makedirs('model', exist_ok=True)
urllib.request.urlretrieve('https://d8pl0xx4oqh22.cloudfront.net/model.tar.gz', 'model/model.tar.gz')
try:
if 'us-east-1' == region:
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
except:
pass
s3.upload_file('model/model.tar.gz', bucket_name, 'sagemaker/model/model.tar.gz')
def athena_create_table(query_file, wait=None):
create_table_uri = athena_execute(query_file, 'txt', wait)
return create_table_uri
def athena_query_table(query_file, wait=None):
results_uri = athena_execute(query_file, 'csv', wait)
return results_uri
def athena_execute(query_file, ext, wait):
with open(query_file) as f:
query_str = f.read()
display(HTML(f'Executing query:
{query_str}
'))
athena = boto3.client('athena')
s3_dest = f's3://{bucket_name}/athena/results/'
query_id = athena.start_query_execution(
QueryString= query_str,
ResultConfiguration={'OutputLocation': s3_dest}
)['QueryExecutionId']
results_uri = f'{s3_dest}{query_id}.{ext}'
start = time.time()
while wait == None or wait == 0 or time.time() - start < wait:
result = athena.get_query_execution(QueryExecutionId=query_id)
status = result['QueryExecution']['Status']['State']
if wait == 0 or status == 'SUCCEEDED':
break
elif status in ['QUEUED','RUNNING']:
continue
else:
raise Exception(f'query {query_id} failed with status {status}')
time.sleep(3)
console_url = f'{console_s3_uri}{bucket_name}/athena/results/{query_id}.{ext}?region={region}&tab=overview'
display(HTML(f'results are located at {results_uri}'))
return results_uri
def display_hpo_tuner_advice(hpo_tuner):
display(HTML(f'''
The hyperparameter tuning job "{hpo_tuner.latest_tuning_job.name}" is now running.
To view it in the console click
here.
'''))
def display_training_job_advice(training_job):
display(HTML(f'''
The training job "{training_job.name}" is now running.
To view it in the console click
here.
'''))
def display_endpoint_advice(session, endpoint_name, wait=False):
display(HTML(f'''
The end point "{endpoint_name}" is now being deployed.
To view it in the console click
here.
'''))
if wait:
session.wait_for_endpoint(endpoint_name)
display(HTML(f'The end point "{endpoint_name}" is now deployed.'))
def filter_dates(df, min_time, max_time, frequency):
min_time = None if min_time is None else pd.to_datetime(min_time)
max_time = None if max_time is None else pd.to_datetime(max_time)
interval = pd.Timedelta(frequency)
def _filter_dates(r):
if min_time is not None and r['start'] < min_time:
start_idx = int((min_time - r['start']) / interval)
r['target'] = r['target'][start_idx:]
r['start'] = min_time
end_time = r['start'] + len(r['target']) * interval
if max_time is not None and end_time > max_time:
end_idx = int((end_time - max_time) / interval)
r['target'] = r['target'][:-end_idx]
return r
filtered = df.apply(_filter_dates, axis=1)
filtered = filtered[filtered['target'].str.len() > 0]
return filtered
def get_tests(features, split_dates, frequency, context_length, prediction_length):
tests = []
end_date_delta = pd.Timedelta(f'{frequency} hour') * context_length
prediction_id = 0
for split_date in split_dates:
context_end = split_date + end_date_delta
test = filter_dates(features, split_date, context_end, f'{frequency}H')
test['prediction_start'] = context_end
test['prediction_id'] = prediction_id
test['start'] = test['start'].dt.strftime('%Y-%m-%d %H:%M:%S')
tests.append(test)
prediction_id += 1
tests = pd.concat(tests).reset_index().set_index(['id', 'prediction_id', 'prediction_start']).sort_index()
return tests
def init_predict_process(func, endpoint_name, quantiles):
func.predictor = Predictor(
endpoint_name,
serializer=sagemaker.serializers.JSONSerializer(),
deserializer=sagemaker.deserializers.JSONDeserializer()
)
func.quantiles = quantiles
def call_endpoint(feature):
request = dict(
instances= [feature],
configuration= dict(
num_samples= 20,
output_types= ['quantiles'],
quantiles= call_endpoint.quantiles
)
)
response = call_endpoint.predictor.predict(request)
raw_quantiles = response['predictions'][0]['quantiles']
return {q: [[np.around(v, 2) for v in l]] for q,l in raw_quantiles.items()}
def predict(endpoint_name, samples, quantiles, processes=10):
features = samples[['start', 'target', 'cat']].to_dict(orient='records')
quantile_strs = [f'0.{q}' for q in quantiles]
with Pool(processes, init_predict_process, [call_endpoint, endpoint_name, quantile_strs]) as pool:
inferences = pool.map(call_endpoint, features)
df = pd.concat([pd.DataFrame(inference) for inference in inferences], ignore_index=True)
df = df[sorted(df.columns.values)]
df.set_index(samples.index, inplace=True)
df.index.names = ['id', 'prediction_id', 'start']
df.reset_index(level=2, inplace=True)
return df
def plot_error(actuals, predictions, horizon=None):
def error_for_prediction(predictions):
location_id = predictions.index[0][0]
actual = actuals.loc[location_id]
prediction = predictions.iloc[0]
offset = int((prediction.start - actual.start)/pd.Timedelta('1 hour'))
def error(F):
if horizon and horizon < len(F):
offset_end = offset + horizon
else:
offset_end = offset + len(F)
A = np.array(actual.target[offset: offset_end])
A_mean = A.mean()
F = np.array(F[:len(A)])
error = np.abs((A - F)/A_mean)
return error
return prediction[quantile_names].apply(error)
MAPE = predictions.groupby(level=['id', 'prediction_id']).apply(error_for_prediction)
max_length = MAPE['0.1'].apply(len).max()
MAPE = MAPE[MAPE['0.1'].map(len) == max_length]
return MAPE.mean().plot()
def moving_average(values, averaging_period=24, roundby=2):
return pd.Series(values)\
.rolling(averaging_period, min_periods=1)\
.mean()\
.round(roundby)\
.to_numpy()
def add_data_to_indexdb(actuals, predictions):
with open('javascript/create_table.js', 'r') as f:
create_table_script = f.read()
display(Javascript(create_table_script))
with open('javascript/index_data.js', 'r') as f:
index_script = f.read()
def exec_js(actual_strs, prediction_strs):
actual_str = '['+','.join(actual_strs)+']'
prediction_str = '['+','.join(prediction_strs)+']'
display(Javascript(f"""
{index_script}
index_data('openaq','actuals', {actual_str});
index_data('openaq','predictions', {prediction_str});
"""))
actual_strs = []
prediction_strs = []
last_actual_lid = None
for index, row in predictions.reset_index().iterrows():
lid = row['id']
if last_actual_lid != lid:
actual_row = actuals.loc[lid]
start = actual_row['start'].strftime('%Y-%m-%dT%H:%M:%SZ')
target_str = json.dumps(actual_row['target'])
ma_str = json.dumps(actual_row['ma'].tolist())
actual_strs.append('{' + f'id:"{lid}",start:"{start}",target:{target_str}, ma:{ma_str}' + '}')
last_actual_lid = lid
pid = row['prediction_id']
start = row['start'].strftime('%Y-%m-%dT%H:%M:%SZ')
quantile_results = []
for q in range(1,10):
quantile = f'0.{q}'
if quantile in row:
values = json.dumps(row[quantile])
quantile_results.append(f'"{quantile}":{values}')
prediction_str = ','.join(quantile_results)
prediction_strs.append('{' + f'id:"{lid}:{pid}",start:"{start}",{prediction_str}' + '}')
if len(prediction_strs) == 100:
exec_js(actual_strs, prediction_strs)
actual_strs = []
prediction_strs = []
if len(actual_strs):
exec_js(actual_strs, prediction_strs)
def create_analysis_chart(metadata, actuals, predictions):
add_data_to_indexdb(actuals, predictions)
######################
# create actuals plot
actuals_source = ColumnDataSource(dict(id=[], start=[], target=[], ma=[])) # empty
filtered_predictions = predictions.reset_index(level=1)
predictions_source = ColumnDataSource(dict(id=[], start=[], **{q:[] for q in quantile_names}))
# create the plot
predictions_plot = figure(
title='',
plot_width=800, plot_height=400,
x_axis_label='date/time',
y_axis_label='pm10 ',
x_axis_type='datetime',
y_range= [0, max(predictions['0.5'].max())],
tools=''
)
# plot vertical areas for the quantiles
predictions_plot.varea_stack(
stackers=quantile_names,
x='start',
color= inferno(len(quantiles)),
legend_label=quantile_names,
source=predictions_source,
alpha=1,
)
# plot actual values
predictions_plot.line(
x= "start", y= "target",
color= 'red',
source= actuals_source
)
# plot actual values
predictions_plot.line(
x= "start", y= "ma",
color= 'black',
source= actuals_source
)
# add a legend
predictions_plot.legend.items.reverse()
#############################
# Create location selector
options = metadata.reset_index()[['id', 'country', 'city', 'location']].astype('str').agg('-'.join, axis=1).tolist()
location_select = Select(title='Select Location:', value=options[0], options=options)
################################
# Create prediction start slider
start_min = filtered_predictions.reset_index()['start'].min()
start_slider = Slider(
start=0,
end= predictions.index.get_level_values(1).unique().max(),
value=0,
step=1,
title=f'prediction time delta'
)
#############################
# Create javascript callback
# The javascript callback function connects all the plots and
# gui components together so changes will update the plots.
callback_args=dict(
actuals= actuals_source,
predictions= predictions_source,
location_select= location_select,
start_slider= start_slider
)
with open('javascript/plot_update_callback.js', 'r') as f:
callback_code = f.read()
plot_update_callback = CustomJS(code=callback_code, args=callback_args)
location_select.js_on_change('value', plot_update_callback)
start_slider.js_on_change('value', plot_update_callback)
return column(location_select, predictions_plot, start_slider)