# Model Training with MLlib and Hosting on SageMaker

This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.


In [None]:
%%local
!pip install -U -q sagemaker

### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. 

For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms
![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)

## MLeap Dependency

In [None]:
%%configure -f
{ "conf":{
 "spark.jars.packages": "ml.combust.mleap:mleap-spark_2.12:0.20.0,ml.combust.mleap:mleap-spark-base_2.12:0.20.0",
 "spark.pyspark.python": "python3",
 "spark.pyspark.virtualenv.enabled": "true",
 "spark.pyspark.virtualenv.type":"native",
 "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
 }
}

## Overview

In this notebook, we'll use a remote EMR cluster to build a ML model using Spark MLLib and then we'll host that model on SageMaker


In [None]:
%%local
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
region = sess.boto_region_name

In [None]:
%%send_to_spark -i bucket -t str -n bucket


In [None]:
%%send_to_spark -i region -t str -n region

In [None]:
sc.install_pypi_package("sagemaker")

In [None]:
sc.install_pypi_package("mleap==0.20.0")

In [None]:
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

schema = StructType(
 [
 StructField("sex", StringType(), True),
 StructField("length", DoubleType(), True),
 StructField("diameter", DoubleType(), True),
 StructField("height", DoubleType(), True),
 StructField("whole_weight", DoubleType(), True),
 StructField("shucked_weight", DoubleType(), True),
 StructField("viscera_weight", DoubleType(), True),
 StructField("shell_weight", DoubleType(), True),
 StructField("rings", DoubleType(), True),
 ]
)

In [None]:
total_df = spark.read.csv(
 "s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv", header=False, schema=schema
)
total_df.show(5)
(train_df, validation_df) = total_df.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.feature import (
 StringIndexer,
 VectorIndexer,
 OneHotEncoder,
 VectorAssembler,
 IndexToString,
)


sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

sex_encoder = OneHotEncoder(inputCols=["indexed_sex"], outputCols=["sex_vec"])

assembler = VectorAssembler(
 inputCols=[
 "sex_vec",
 "length",
 "diameter",
 "height",
 "whole_weight",
 "shucked_weight",
 "viscera_weight",
 "shell_weight",
 ],
 outputCol="features",
)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="rings", featuresCol="features", maxDepth=6, numTrees=18)
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler, rf])
model = pipeline.fit(train_df)

In [None]:
transformed_train_df = model.transform(train_df)
transformed_validation_df = model.transform(validation_df)
transformed_validation_df.select("prediction").show(5)

------

## Optionally record your experiment using SageMaker Experiments. 

Note: This requires your network to be configured correctly. If you're running this as an AWS provided workshop then you're all set

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="rings", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(transformed_train_df)
validation_rmse = evaluator.evaluate(transformed_validation_df)
print("Train RMSE = %g" % train_rmse)
print("Validation RMSE = %g" % validation_rmse)

In [None]:
import boto3
from sagemaker.session import Session
from sagemaker.experiments import Run

boto_session = boto3.session.Session(region_name=region)
sagemaker_session = Session(boto_session=boto_session)

# The run name is an optional argument to `run.init()`
with Run(experiment_name = 'mllib-experiment', sagemaker_session=sagemaker_session) as run:
 run.log_metric(name = "Train RMSE", value = train_rmse)
 run.log_metric(name = "Val RMSE", value = validation_rmse)

print("Done!")

------
## Save Model to S3 then host on SageMaker

In [None]:
model.serializeToBundle("jar:file:/tmp/model.zip", transformed_validation_df)

In [None]:
import zipfile

with zipfile.ZipFile("/tmp/model.zip") as zf:
 zf.extractall("/tmp/model")

import tarfile

with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
 tar.add("/tmp/model/bundle.json", arcname="bundle.json")
 tar.add("/tmp/model/root", arcname="root")

In [None]:
# Please replace the bucket name with your bucket name where you want to upload the model
import os
s3 = boto3.resource("s3")
file_name = os.path.join("emr/abalone/mleap", "model.tar.gz")
s3.Bucket(bucket).upload_file("/tmp/model.tar.gz", file_name)

In [None]:
%%local
import json

schema = {
 "input": [
 {"name": "sex", "type": "string"},
 {"name": "length", "type": "double"},
 {"name": "diameter", "type": "double"},
 {"name": "height", "type": "double"},
 {"name": "whole_weight", "type": "double"},
 {"name": "shucked_weight", "type": "double"},
 {"name": "viscera_weight", "type": "double"},
 {"name": "shell_weight", "type": "double"},
 ],
 "output": {"name": "prediction", "type": "double"},
}
schema_json = json.dumps(schema, indent=2)


In [None]:
%%local
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sparkml.model import SparkMLModel

boto3_session = boto3.session.Session()
sagemaker_client = boto3.client("sagemaker")
sagemaker_runtime_client = boto3.client("sagemaker-runtime")

# Initialize sagemaker session
session = sagemaker.Session(
 boto_session=boto3_session,
 sagemaker_client=sagemaker_client,
 sagemaker_runtime_client=sagemaker_runtime_client,
)

role = get_execution_role()

In [None]:
%%local
# S3 location of where you uploaded your trained and serialized SparkML model
sparkml_data = "s3://{}/{}/{}".format(
 bucket, "emr/abalone/mleap", "model.tar.gz"
)
model_name = "sparkml-abalone-" + timestamp_prefix
sparkml_model = SparkMLModel(
 model_data=sparkml_data,
 role=role,
 spark_version='3.3',
 sagemaker_session=session,
 name=model_name,
 # passing the schema defined above by using an environment
 # variable that sagemaker-sparkml-serving understands
 env={"SAGEMAKER_SPARKML_SCHEMA": schema_json},
)

In [None]:
%%local
endpoint_name = "sparkml-abalone-ep-" + timestamp_prefix
sparkml_model.deploy(
 initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name
)

In [None]:
%%local
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer, JSONSerializer
from sagemaker.deserializers import JSONDeserializer


payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"

predictor = Predictor(
 endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)
print(predictor.predict(payload))

In [None]:
%%local
payload = {"data": ["F", 0.515, 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255]}

predictor = Predictor(
 endpoint_name=endpoint_name, sagemaker_session=session, serializer=JSONSerializer()
)
print(predictor.predict(payload))

In [None]:
%%local
payload = {
 "schema": {
 "input": [
 {"name": "length", "type": "double"},
 {"name": "sex", "type": "string"},
 {"name": "diameter", "type": "double"},
 {"name": "height", "type": "double"},
 {"name": "whole_weight", "type": "double"},
 {"name": "shucked_weight", "type": "double"},
 {"name": "viscera_weight", "type": "double"},
 {"name": "shell_weight", "type": "double"},
 ],
 "output": {"name": "prediction", "type": "double"},
 },
 "data": [0.515, "F", 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255],
}

predictor = Predictor(
 endpoint_name=endpoint_name, sagemaker_session=session, serializer=JSONSerializer()
)
print(predictor.predict(payload))

## Clean Up

In [None]:
%%cleanup -f

In [None]:
%%local
predictor.delete_endpoint()