# Bring Your Own Model with Amazon Redshift ML
_**On a Redshift Cluster, Deploy and Run Predictions on a model trained and validated on Amazon Sagemaker**_

---

---
## Contents
1. [Introduction](#Introduction)
2. [Setup](#Setup)
3. [Training the XGBOOST Model on Sagemaker](#Training-the-XGBOOST-Model-on-Sagemaker)   
4. [Create Model and run predictions on Redshift](#Create-Model-and-run-predictions-on-Redshift)
    1. [Configure Redshift Data Api](#Configure-Redshift-Data-Api)
    2. [Create BYOM Model](#Create-BYOM-Model)
    3. [Data Preparation](#Data-preparation)
    4. [Inference](#Inference)
    5. [Evaluation](#Evaluation)
5. [Cleanup](#Cleanup)

---
## Introduction

In this notebook, we have a working example to create "bring your own model" (BYOM) using Amazon Redshift ML. BYOM uses a pre-trained model which is trained and validated on Amazon Sagemaker.  In this usecase, the model that Redshift ML BYOM uses, is built using the XGBOOST linear regression problem type, which is used to predict a numerical outcome, like the price of a house or how many people will use a cityâ€™s bike rental service or age of an abalone.

Notebook used in this use case demonstrates how to train and validate a XGBOOST linear regression machine learning model on Amazon Sagemaker and then how to import the trained model into Redshift ML and run inference locally on Redshift Cluster. 

With BYOM method - you can bring models of type XGBOOST and MLP to Redshift ML. Once pre-trained model is deployed onto Redshift ML, inferences can be run locally on Redshift without using Sagemaker endpoint or Sagemaker Studio.  This makes really easy for Data analysts to run inference on new data against models created outside of Redshift with out worrying about access to Sagemaker Services/endpoint.  This method helps data scientiest quickly deliver Machine Learnings built outside of Reddshift to Data team in short span. Since Redshift ML is interacted with native Redshift SQL, the data team user experience is consistent with other data anlaysis work that they do on data warehouse.  

The content in this notebook can be treated as two sections

  * Train and Validate XGboost algorithm on Sagemaker
  
  * Create BYOM Local Inference on Redshift Cluster

--------
## Use Case
We use the [Abalone data](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html) originally from the UCI data repository [1]. More details about the original dataset can be found [here](https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.names).  In the libsvm converted [version](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html), the nominal feature (Male/Female/Infant) has been converted into a real valued feature. Age of abalone is to be predicted from eight physical measurements. Dataset is already processed and stored on S3. Scripts used for processing the data can be found in the [Appendix](#Appendix). These include downloading the data, splitting into train, validation and test, and uploading to S3 bucket. These steps do not need to run again, they are provided for reference. 

>[1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

-------
## Setup

This notebook was tested in Amazon SageMaker Studio on a ml.t3.medium instance with Python 3 (Data Science) kernel. Redshift Cluster is 2 node ra3.xlplus cluster. Connection to Redshift Server made using Redshift Data Api.

Let's start by specifying:

1. S3_BUCKET: The S3 bucket that you want to use for saving the model and where training data is located. This should be within the same region as the Notebook Instance, training, and hosting. This bucket also stores intermediates results genereated by Redsfhit ML.

1. REDSHIFT_IAM_ROLE: The IAM role arn attached to Redshift Cluster.

1. REDSHIFT_USER: Database users to run SQL commands

1. REDSHIFT_ENDPOINT: Redshift Cluster end point.

1. CUSTOMER_PRETRAINED_MODEL_DATA: If you are planning to use your pretrained XGBOOST Model, then provide the location into this parameter.

**NOTE**:  Except for the last parameter, the first four parameters are found under the output tab of Cloud Formation stack.  Please copy and paste them in below cell. 


In [None]:
REDSHIFT_ENDPOINT = 'redshift-cluster-1.cwt0y36legsj.us-east-1.redshift.amazonaws.com:5439/dev'
REDSHIFT_USER="demo"
REDSHIFT_IAM_ROLE='arn:aws:iam::822469723147:role/byom-2-RedshiftMLIAMRole-822469723147'
S3_BUCKET='byom-2-redshiftmlbucket-hwcr2e0x844p'
CUSTOMER_PRETRAINED_MODEL_DATA = '' #<<S3 path to your model.tar.gz file>>. Use this if you are skipping part 1


In [None]:
%%time

import os
import boto3
import re
import sagemaker
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


role = sagemaker.get_execution_role()
region = boto3.Session().region_name

# S3 bucket where the training data is located.
# Feel free to specify a different bucket and prefix
data_bucket = f"jumpstart-cache-prod-{region}"
data_prefix = "1p-notebooks-datasets/abalone/libsvm"
data_bucket_path = f"s3://{data_bucket}"

# S3 bucket for saving code and model artifacts.
# Feel free to specify a different bucket and prefix
#output_bucket = sagemaker.Session().default_bucket()
output_bucket=S3_BUCKET
output_prefix = "sagemaker/DEMO-xgboost-abalone-default"
output_bucket_path = f"s3://{output_bucket}"


## Training the XGBOOST Model on Sagemaker

After setting training parameters, we kick off training, and poll for status until training is completed, which in this example, takes between 5 and 6 minutes.

In [None]:
from sagemaker.amazon.amazon_estimator import image_uris
container = image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='1.0-1')


In [None]:
%%time
import boto3
from time import gmtime, strftime

job_name = f"DEMO-xgboost-regression-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
print("Training job", job_name)

# Ensure that the training and validation data folders generated above are reflected in the "InputDataConfig" parameter below.

create_training_params = {
    "AlgorithmSpecification": {"TrainingImage": container, "TrainingInputMode": "File"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": f"{output_bucket_path}/{output_prefix}/single-xgboost"},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.m5.2xlarge", "VolumeSizeInGB": 5},
    "TrainingJobName": job_name,
    "HyperParameters": {
        "max_depth": "5",
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "subsample": "0.7",
        "silent": "0",
        "objective": "reg:linear",
        "num_round": "50",
    },
    "StoppingCondition": {"MaxRuntimeInSeconds": 3600},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{data_bucket_path}/{data_prefix}/train",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{data_bucket_path}/{data_prefix}/validation",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
    ],
}


client = boto3.client("sagemaker", region_name=region)
client.create_training_job(**create_training_params)

import time

status = client.describe_training_job(TrainingJobName=job_name)["TrainingJobStatus"]
print(status)
while status != "Completed" and status != "Failed":
    time.sleep(60)
    status = client.describe_training_job(TrainingJobName=job_name)["TrainingJobStatus"]
    print(status)

Note that the "validation" channel has been initialized too. The SageMaker XGBoost algorithm actually calculates RMSE and writes it to the CloudWatch logs on the data passed to the "validation" channel.

## Set up hosting for the model
In order to set up hosting, we have to import the model from training to hosting. 

### Import model into hosting

Register the model with hosting. This allows the flexibility of importing models trained elsewhere.

In [None]:
%%time
import boto3
from time import gmtime, strftime

model_name = f"{job_name}-model"
print(model_name)

info = client.describe_training_job(TrainingJobName=job_name)
model_data = info["ModelArtifacts"]["S3ModelArtifacts"]
print(model_data)

primary_container = {"Image": container, "ModelDataUrl": model_data}

create_model_response = client.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response["ModelArn"])

Make sure model artifact is created. Print the model_data variable to find out where model data is saved. This is location that is passed to create model statement. 

In [None]:
#print model artifact location which is used later part in the notebook
print(model_data)


### Create endpoint configuration

SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way. In addition, the endpoint configuration describes the instance type required for model deployment.

In [None]:
from time import gmtime, strftime

endpoint_config_name = f"DEMO-XGBoostEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
print(endpoint_config_name)
create_endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": "ml.m5.xlarge",
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic",
        }
    ],
)

print(f"Endpoint Config Arn: {create_endpoint_config_response['EndpointConfigArn']}")

### Create endpoint
Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.

In [None]:
%%time
import time

endpoint_name = f'DEMO-XGBoostEndpoint-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print(endpoint_name)
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(create_endpoint_response["EndpointArn"])

resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
while status == "Creating":
    print(f"Status: {status}")
    time.sleep(60)
    resp = client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]

print(f"Arn: {resp['EndpointArn']}")
print(f"Status: {status}")

## Validate the model for use
Finally, the customer can now validate the model for use. They can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.


In [None]:
runtime_client = boto3.client("runtime.sagemaker", region_name=region)

Download test data

In [None]:
FILE_TEST = "abalone.test"
s3 = boto3.client("s3")
s3.download_file(data_bucket, f"{data_prefix}/test/{FILE_TEST}", FILE_TEST)

Start with a single prediction.

In [None]:
!head -1 abalone.test > abalone.single.test

In [None]:
%%time
import json
from itertools import islice
import math
import struct

file_name = "abalone.single.test"  # customize to your test file
with open(file_name, "r") as f:
    payload = f.read().strip()
response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="text/x-libsvm", Body=payload
)
result = response["Body"].read()
result = result.decode("utf-8")
result = result.split(",")
result = [math.ceil(float(i)) for i in result]
label = payload.strip(" ").split()[0]
print(f"Label: {label}\nPrediction: {result[0]}")

In [None]:
print(endpoint_name)

OK, a single prediction works. Let's do a whole batch to see how good is the predictions accuracy.

In [None]:
import sys
import math


def do_predict(data, endpoint_name, content_type):
    payload = "\n".join(data)
    response = runtime_client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=payload
    )
    result = response["Body"].read()
    result = result.decode("utf-8")
    result = result.split(",")
    preds = [float((num)) for num in result]
    preds = [math.ceil(num) for num in preds]
    return preds


def batch_predict(data, batch_size, endpoint_name, content_type):
    items = len(data)
    arrs = []

    for offset in range(0, items, batch_size):
        if offset + batch_size < items:
            results = do_predict(data[offset : (offset + batch_size)], endpoint_name, content_type)
            arrs.extend(results)
        else:
            arrs.extend(do_predict(data[offset:items], endpoint_name, content_type))
        sys.stdout.write(".")
    return arrs

The following helps us calculate the Median Absolute Percent Error (MdAPE) on the batch dataset. 

In [None]:
%%time
import json
import numpy as np

with open(FILE_TEST, "r") as f:
    payload = f.read().strip()

labels = [int(line.split(" ")[0]) for line in payload.split("\n")]
test_data = [line for line in payload.split("\n")]
preds = batch_predict(test_data, 100, endpoint_name, "text/x-libsvm")

print(
    "\n Median Absolute Percent Error (MdAPE) = ",
    np.median(np.abs(np.array(labels) - np.array(preds)) / np.array(labels)),
)

Now we have successfully completed Part 1 i.e Create XGBOOST Machine Learning model using sagemaker, validated it.  Now we move onto sescond Part where we show you how to deploy the already trained model and run predictions on Redshift ML.

------
# Create Model and run predictions on Redshift
## Configure Redshift Data Api

Setup Run SQL function using Redshift Data API to get SQL query output directly into pandas dataframe

In this step, we are creating function run_sql, which we will use to get SQL query output directly into pandas dataframe. We will also use this function to run DDL statements

In [None]:
import boto3
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

session = boto3.session.Session()
region = session.region_name


def run_sql(sql_text):
    client = boto3.client("redshift-data")
    res = client.execute_statement(Database=REDSHIFT_ENDPOINT.split('/')[1], DbUser=REDSHIFT_USER, Sql=sql_text,
                                   ClusterIdentifier=REDSHIFT_ENDPOINT.split('.')[0])
    query_id = res["Id"]
    done = False
    while not done:
        time.sleep(1)
        status_description = client.describe_statement(Id=query_id)
        status = status_description["Status"]
        if status == "FAILED":
            raise Exception('SQL query failed:' + query_id + ": " + status_description["Error"])
        elif status == "FINISHED":
            if status_description['ResultRows']>0:
                results = client.get_statement_result(Id=query_id)
                column_labels = []
                for i in range(len(results["ColumnMetadata"])): column_labels.append(results["ColumnMetadata"][i]['label'])
                records = []
                for record in results.get('Records'):
                    records.append([list(rec.values())[0] for rec in record])
                df = pd.DataFrame(np.array(records), columns=column_labels)
                return df
            else:
                
                return query_id

## Create BYOM Model

### Create Model
   Run the below create model command to create get above model into Redshift ML. If you notice the "create model" syntax, it has "FROM" clause where we are going to pass path of training artifact created by Sagemaker. This value is coming through model_data parameter.  Below command will take about 5 minutes to run.  Create Model BYOM also expects Redshift IAM role and S3_bucket as parameters values.  S3 Bucket is Amazon S3 location that is used to store intermediate results by Redshift ML. <br><br>
**NOTE** 
#### Use your own pre-trained model 
* Set the CUSTOMER_PRETRAINED_MODEL_DATA parameter in first cell of this notebook
* Update feature list and data types accordingly in the "Function" clause.
* Update table name and columns passed to predict function accordingly in Select statement

In [None]:
#check if Customer using pre created machine learning model, if so then update the model_data
if CUSTOMER_PRETRAINED_MODEL_DATA !='':
    MODEL_PATH = CUSTOMER_PRETRAINED_MODEL_DATA
else:
    MODEL_PATH=model_data
print(MODEL_PATH)

In [None]:
sql_text=("drop model if exists predict_abalone_age; \
 CREATE MODEL predict_abalone_age \
FROM '{}' \
FUNCTION predict_abalone_age ( int, int, float, float,float,float,float,float,float) \
RETURNS int \
IAM_ROLE '{}' \
settings( S3_BUCKET '{}') \
")
df=run_sql(sql_text.format(model_data,REDSHIFT_IAM_ROLE, S3_BUCKET))
print(df)

Once create model statement is finished. You can use "Show Model" command to see model status. 

In [None]:
df=run_sql(" show model byom_abalone_xgboost_local_inference;"          
          );
df

## Data preparation
Load test data from S3 bucket to Redshift Table.  

In [None]:
sql_text="""drop table if exists abalone_test; 
           create table abalone_test 
(Rings int, sex int,Length_ float, Diameter float, Height float, 
WholeWeight float, ShuckedWeight float,  VisceraWeight float,  ShellWeight float ); 
copy abalone_test 
from 's3://jumpstart-cache-prod-us-east-1/1p-notebooks-datasets/abalone/text-csv/test/' 
IAM_ROLE '{}' 
csv ; \
""" 

df=run_sql(sql_text.format(REDSHIFT_IAM_ROLE))
print(df)

Sample the test table to make sure data is loaded.

In [None]:

sql_text=(" select * from abalone_test limit 10;" )

df=run_sql(sql_text.format(REDSHIFT_IAM_ROLE))
print(df)


## Inference
Now call prediction function which was created as part of the Create Model. 

In [None]:
sql_text="""Select original_age, predicted_age, original_age-predicted_age as Error 
From( 
select predict_abalone_age(Rings,sex, 
Length_ , 
Diameter ,  
Height , 
WholeWeight ,
ShuckedWeight ,  
VisceraWeight , 
ShellWeight ) predicted_age, rings as original_age 
from abalone_test ); """

df=run_sql(sql_text)
print(df.head(10))

## Evaluation 
If you choose to find out (R<sup>2</sup>) or RMSE values on predicted value, you can calculate using below code


In [None]:
df=run_sql("SELECT 1-(SUM(POWER(rings - (predict_abalone_age(Rings,sex, \
Length_ , \
Diameter , \
Height , \
WholeWeight , \
ShuckedWeight , \
VisceraWeight , \
ShellWeight )),2)))/(SUM(POWER(rings - (SELECT AVG(rings) FROM abalone_test),2))) R2_Value FROM abalone_test; \
")
print(df)

In [None]:
sql_text="""SELECT SQRT(Avg( POWER(rings - (predict_abalone_age(Rings,sex, 
Length_ , 
Diameter , 
Height , 
WholeWeight , 
ShuckedWeight ,  
VisceraWeight ,  
ShellWeight )) , 2) ) ) as abalone_age_rmse FROM abalone_test;"""

df=run_sql(sql_text)
print(df.head(10))

# Cleanup

#### Drop Model and Table
Clean up objects created on your Redshift Cluster

In [None]:
sql_text="""drop table if exists abalone_test; 
drop model if exists byom_abalone_xgboost_local_inference;
""" 

df=run_sql(sql_text.format(REDSHIFT_IAM_ROLE))
print(df)

#### Delete Endpoint
Once you are done using the endpoint, you can use the following to delete it. 

In [None]:
client.delete_endpoint(EndpointName=endpoint_name)

# Appendix
Hidden not used

### Data split and upload

Following methods split the data into train/test/validation datasets and upload files to S3.

In [None]:
import io
import boto3
import random


def data_split(
    FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST
):
    data = [l for l in open(FILE_DATA, "r")]
    train_file = open(FILE_TRAIN, "w")
    valid_file = open(FILE_VALIDATION, "w")
    tests_file = open(FILE_TEST, "w")

    num_of_data = len(data)
    num_train = int((PERCENT_TRAIN / 100.0) * num_of_data)
    num_valid = int((PERCENT_VALIDATION / 100.0) * num_of_data)
    num_tests = int((PERCENT_TEST / 100.0) * num_of_data)

    data_fractions = [num_train, num_valid, num_tests]
    split_data = [[], [], []]

    rand_data_ind = 0

    for split_ind, fraction in enumerate(data_fractions):
        for i in range(fraction):
            rand_data_ind = random.randint(0, len(data) - 1)
            split_data[split_ind].append(data[rand_data_ind])
            data.pop(rand_data_ind)

    for l in split_data[0]:
        train_file.write(l)

    for l in split_data[1]:
        valid_file.write(l)

    for l in split_data[2]:
        tests_file.write(l)

    train_file.close()
    valid_file.close()
    tests_file.close()


def write_to_s3(fobj, bucket, key):
    return (
        boto3.Session(region_name=region).resource("s3").Bucket(bucket).Object(key).upload_fileobj(fobj)
    )


def upload_to_s3(bucket, channel, filename):
    fobj = open(filename, "rb")
    key = f"{prefix}/{channel}"
    url = f"s3://{bucket}/{key}/{filename}"
    print(f"Writing to {url}")
    write_to_s3(fobj, bucket, key)

### Data ingestion

Next, we read the dataset from the existing repository into memory, for preprocessing prior to training. This processing could be done *in situ* by Amazon Athena, Apache Spark in Amazon EMR, Amazon Redshift, etc., assuming the dataset is present in the appropriate location. Then, the next step would be to transfer the data to S3 for use in training. For small datasets, such as this one, reading into memory isn't onerous, though it would be for larger datasets.

In [None]:
%%time
import urllib.request

bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/DEMO-xgboost-abalone-default"
# Load the dataset
FILE_DATA = "abalone"
urllib.request.urlretrieve(
    "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/abalone", FILE_DATA
)

# split the downloaded data into train/test/validation files
FILE_TRAIN = "abalone.train"
FILE_VALIDATION = "abalone.validation"
FILE_TEST = "abalone.test"
PERCENT_TRAIN = 70
PERCENT_VALIDATION = 15
PERCENT_TEST = 15
data_split(
    FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST
)

# upload the files to the S3 bucket
upload_to_s3(bucket, "train", FILE_TRAIN)
upload_to_s3(bucket, "validation", FILE_VALIDATION)
upload_to_s3(bucket, "test", FILE_TEST)