# Introduction

Welcome to your Amazon SageMaker notebook instance! 

This is a fully managed AWS environment that provides you a Jupyter Notebook to work with data. To learn more about Amazon SageMake notebook instances, check out our [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html).

## Summary

We're looking to build a linear model to help with our Wild Rydes machine learning challenge. A linear model is a supervised learning algorithm used to solving either classification or regression problems. This notebook will help us to:

1. Setup serverless querying of data in S3 via [Amazon Athena](https://aws.amazon.com/athena/).
2. Prepare dataframes using [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org).
3. Build and train a machline learning model via the [Amazon SageMaker Python SDK](https://docs.aws.amazon.com/sagemaker/latest/dg/frameworks.html).

To get started, let's input the name of the S3 bucket you created earlier in this workshop:

In [None]:
# ACTION: provide the data bucket NAME you are using for this workshop
data_bucket = '' 

We'll now use your S3 bucket to help interact with data.

### Objective: Enable serverless querying via Amazon Athena

The next thing we'll do is install [PyAthena](https://pypi.org/project/PyAthena/) so we can use Amazon Athena. Athena is a serverless query solution that will let use query data in S3 as if it were loaded into a database. Run the next cell to install the package we need to get started with Athena.

In [None]:
%%capture 
## above line surpresses installation output from this cell


## Install PyAthena to enable our notebook instance to use Athena
import sys
!{sys.executable} -m pip install PyAthena

PyAthena gives us the ability to connect to Amazon Athena so we can start defining data objects in S3. In the next cell, let's go ahead and load up the PyAthena, pandas, and boto3 library so we can start sending SQL queries to Athena. Our first query will be to establish a serverless database for use in this workshop.

In [None]:
# load necessary libraries
from pyathena import connect
import pandas as pd
import boto3

# create place to store athena query results
athena_query_results = ("s3://%s/results/" % data_bucket)

# establish athena connection in the same region as our S3 bucket
data_bucket_region = boto3.client('s3').get_bucket_location(Bucket=data_bucket)['LocationConstraint']

conn = connect(s3_staging_dir=athena_query_results,
 region_name=data_bucket_region)

# create a serverless database
athena_database = "wildrydesworkshop"

sql_stmt_create_db = ("CREATE DATABASE IF NOT EXISTS %s" % athena_database)

pd.read_sql(sql_stmt_create_db, conn)

# list all databases to confirm our new database exists
pd.read_sql(("SHOW DATABASES LIKE '%s'" % athena_database), conn)

We now have a serverless database that we can create external tables in. External tables are objects where the data is stored away from the related compute service. In this case, Athena is our query service and our data is stored separately in S3.

In the Data Processing portion of this workshop, we processed some telemetry data and saved the results in S3. Let's go ahead and create an external table of this data so we can query in later.

In [None]:
# create an external table based on the process ride telemetry data
processed_table = "ridetelemetry"

# drop the table if we're rerunning this cell
pd.read_sql("DROP TABLE IF EXISTS %s.%s" % (athena_database, processed_table),conn)

# create an external table based on the schema output from our data procressing module
sql_stmt_create_tb = """
CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
 distance double
,healthpoints bigint
,latitude double
,longitude double
,magicpoints bigint
,name string
,statustime string
,fieldservice bigint
,groundstation string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
"field.delim" = ","
,"skip.header.line.count" = "1"
)
LOCATION 's3://%s/processed/'
""" % (athena_database, processed_table, data_bucket)

pd.read_sql(sql_stmt_create_tb, conn)

# confirm that our new table exists
pd.read_sql(("SHOW TABLES IN %s" % athena_database), conn)

We now have an external table in Athena based on our telemetry data. That's great but want to know what's even better? We can also make external tables of public datasets published to Amazon S3! That means all the historical weather data that NOAA stores in S3 can also be an external table for us to use. Let's go ahead and create an external table of it.

In [None]:
# create an external table based on NOAA's weather files
weather_table = "weather"

# drop the table if we're rerunning this cell
pd.read_sql("DROP TABLE IF EXISTS %s.%s" % (athena_database, weather_table),conn)

# create an external table based on the data schema @ https://docs.opendata.aws/noaa-ghcn-pds/readme.html
# 'year_date' isn't expressed as a typical datetime type, so we'll worry about it later.
sql_stmt_create_tb = """
CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
 id string
,year_date string
,element string
,data_value double
,m_flag string
,q_flag string
,s_flag string
,obs_time string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
"field.delim" = ","
)
LOCATION 's3://noaa-ghcn-pds/csv/'
""" % (athena_database, weather_table)

# send the create table query statement to our Athena connection
pd.read_sql(sql_stmt_create_tb, conn)

# confirm that our new table exists
pd.read_sql(("SHOW TABLES IN %s" % athena_database), conn)

### Checkpoint reached: Enabled querying data in S3

We now have the ability to query both our telemetry data and NOAA data directly from S3 via Athena. Let's run a couple queries to see how large these tables are.

In [None]:
# count telemetry records
pd.read_sql("""
SELECT '%s' as tablename, COUNT(*) records FROM %s.%s
UNION ALL
SELECT '%s' as tablename, COUNT(*) records FROM %s.%s
""" % (processed_table, athena_database, processed_table,
 weather_table, athena_database, weather_table
 ), conn)

### Objective: Create dataframe objects for our linear model

Our telemetry data looks pretty managable but check out the size of the weather data! We probably don't need the vast majority of that. Let's setup our telemetry data as a pandas dataframe so can we figure out how much weather data we really need.

In [None]:
# create a Python panda dataframe with our processed ride telemetry data
df_telemetry = pd.read_sql("SELECT * FROM %s.%s" % (athena_database, processed_table), conn)

# we need to describe the datetime format in order for our telemetry data to use
# a datatime data type. Let's update our 'statustime' field to a datetime data type.
df_telemetry['statustime'] = pd.to_datetime(df_telemetry['statustime'], format='%Y-%m-%d %H:%M:%S.%f')

# let's add a date field that doesn't have time so we can easily join to weather data later
df_telemetry['year_date'] = pd.to_datetime(df_telemetry['statustime'].dt.strftime('%Y-%m-%d'))

# here's the descriptive summary of the telemetry dataframe
df_telemetry.describe(include='all')

Thinking about how to filter down our weather data, the above telemetry dataframe description tells us:
- There are only a couple groundstation ids
- Our ride dates start in 2017

Keeping those facts in mind, let's work on creating our weather dataframe next.

In [None]:
# first let's get our interested ground stations ids from our telemetry data
unique_gs = tuple(df_telemetry.groundstation.unique())

# weather data goes back far in time. let's only grab data for the years we need
start_year = pd.to_datetime(df_telemetry.statustime.min()).year

# let's create our query statement
weather_query = """
SELECT * FROM %s.%s
WHERE q_flag = ''
AND id IN %s
AND year(date_parse(year_date, '%%Y%%m%%d')) >= %s
""" % (athena_database, weather_table, unique_gs, start_year)

# now we'll pass the query to Athena to get back our interested weather data.
# Athena will scan over 2 billion records (90+ GB) in just over 30 seconds.
df_weather = pd.read_sql(weather_query, conn)

# we want to make sure the 'data_value' field is a numeric column
df_weather['data_value'] = pd.to_numeric(df_weather['data_value'])

# let's also make sure 'year_date' is a proper date field
df_weather['year_date'] = pd.to_datetime(df_weather['year_date'])

#
df_weather.describe(include='all')

In [None]:
# we want some interesting weather facts as features, so let's pivot the data
df_weather_pivot = pd.DataFrame(df_weather, columns = ['id','year_date','element', 'data_value']) \
 .query('element in ("TMIN", "TMAX", "PRCP")') \
 .pivot_table(index=['id','year_date'], columns='element', values='data_value') \
 .reset_index()

# element definitions from https://docs.opendata.aws/noaa-ghcn-pds/readme.html
## PRCP = Precipitation (tenths of mm)
## TMAX = Maximum temperature (tenths of degrees C)
## TMIN = Minimum temperature (tenths of degrees C)

# now our weather data looks like this
df_weather_pivot.describe(include='all')

### Checkpoint reached: created dataframes as inputs into our linear model

We crafted two dataframes: ride telemetry & pivoted weather facts. Now let's merge them into a single dataframe so we can start preparing our model.

In [None]:
# Let's merge our telemetry data with weather so we can include weather elements as model features
merge_df = pd.merge(df_telemetry, df_weather_pivot
 , left_on=['groundstation','year_date']
 , right_on=['id', 'year_date']
 , how='left'
 )

### Objective: Pre-Processing the Data
Now that we have data ready for modeling, let's shape it into inputs for our model.

Our linear learner is expecting numpy arrays. We'll first load the data into numpy arrays, and randomly split it into train set and test set with a 90/10 split.

In [None]:
import numpy as np
import os

processed_subdir = "standardized"
folder = "/home/ec2-user/SageMaker/"
train_features_file = os.path.join(folder, processed_subdir, "train/csv/features.csv")
train_labels_file = os.path.join(folder, processed_subdir, "train/csv/labels.csv")
test_features_file = os.path.join(folder, processed_subdir, "test/csv/features.csv")
test_labels_file = os.path.join(folder, processed_subdir, "test/csv/labels.csv")

raw = merge_df[['distance','healthpoints','magicpoints','TMIN','TMAX','PRCP','fieldservice']].to_numpy(dtype=np.float32)

# split into train/test with a 90/10 split
np.random.seed(0)
np.random.shuffle(raw)
train_size = int(0.9 * raw.shape[0])
train_features = raw[:train_size, :-1]
train_labels = raw[:train_size, -1]
test_features = raw[train_size:, :-1]
test_labels = raw[train_size:, -1]

print('train_features shape = ', train_features.shape)
print('train_labels shape = ', train_labels.shape)
print('test_features shape = ', test_features.shape)
print('test_labels shape = ', test_labels.shape)

Let's write the training data to Amazon S3 in recordio-protobuf format. This allows us to persist the data in case we need to return to this exact training set later.

We first create an io buffer wrapping the data, next we upload it to Amazon S3. Notice that the choice of bucket and prefix should change for different users and different datasets.

In [None]:
import sagemaker.amazon.common as smac
import io

train_prefix = 'train'
key = 'recordio-pb-data'

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, train_features, train_labels)
buf.seek(0)

boto3.resource('s3').Bucket(data_bucket).Object(os.path.join(train_prefix, key)).upload_fileobj(buf)
s3_train_data = 's3://{}/{}/{}'.format(data_bucket, train_prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

We should save our testing dataset to S3, too. This way we can evaluate the performance of the model from the training logs alongside the actual test data.

In [None]:
test_prefix = 'test'

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, test_features, test_labels)
buf.seek(0)

boto3.resource('s3').Bucket(data_bucket).Object(os.path.join(test_prefix, key)).upload_fileobj(buf)
s3_test_data = 's3://{}/{}/{}'.format(data_bucket, test_prefix, key)
print('uploaded test data location: {}'.format(s3_test_data))

### Checkpoint reached: data is ready for our linear learner model

### Objective: build and train our linear learner model

We take a moment to explain at a high level, how Machine Learning training and prediction works in Amazon SageMaker. First, we need to train a model. This is a process that given a labeled dataset and hyper-parameters guiding the training process, outputs a model. Once the training is done, we set up what is called an **endpoint**. An endpoint is a web service that given a request containing an unlabeled data point, or mini-batch of data points, returns a prediction(s).

In Amazon SageMaker the training is done via an object called an **estimator**. When setting up the estimator we specify the location (in Amazon S3) of the training data, the path (again in Amazon S3) to the output directory where the model will be serialized, generic hyper-parameters such as the machine type to use during the training process, and specific hyper-parameters such as the index type, etc. Once the estimator is initialized, we can call its **fit** method in order to do the actual training.

Now that we are ready for training, we start with a convenience function that starts a training job.

In [None]:
import matplotlib.pyplot as plt

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer, json_deserializer
from sagemaker.amazon.amazon_estimator import get_image_uri


def trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):
 """
 Create an Estimator from the given hyperparams, fit to training data, 
 and return a deployed predictor
 
 """
 # set up the estimator
 linear = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "linear-learner"),
 get_execution_role(),
 train_instance_count=1,
 train_instance_type='ml.m5.2xlarge',
 output_path=output_path,
 sagemaker_session=sagemaker.Session())
 linear.set_hyperparameters(**hyperparams)
 
 # train a model. fit_input contains the locations of the train and test data
 fit_input = {'train': s3_train_data}
 if s3_test_data is not None:
 fit_input['test'] = s3_test_data
 linear.fit(fit_input)
 return linear

Now, we run the actual training job. For now, we stick to default parameters.

In [None]:
import math

hyperparams = {
 'feature_dim': int(train_features.shape[1]),
 'mini_batch_size': int(0.1 * train_features.shape[0]),
 'predictor_type': 'binary_classifier' 
}

output_path = 's3://' + data_bucket
linear_estimator = trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, 
 s3_test_data=s3_test_data)

Notice that we mentioned a test set in the training job. When a test set is provided the training job doesn't just produce a model but also applies it to the test set and reports the accuracy. In the logs you can view the accuracy of the model on the test set.

## Conclusion

We now have a trained model living in S3. Instead of creating a SageMaker Endpoint, we will use Lambda to make inferences against the model.

If you want to test the model using a SageMaker Endpoint before moving on, check out our documentation:
https://docs.aws.amazon.com/sagemaker/latest/dg/ex1-deploy-model.html#ex1-deploy-model-boto

At this point you can close out of the SageMaker notebook and continue with the workshop instructions.