### Overview

This notebook is tested using SageMaker `Studio SparkMagic - PySpark Kernel`. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.


This notebook does the following:

* Demonstrates how you can visually connect Amazon SageMaker Studio Sparkmagic kernel to an EMR cluster
* Explore and query data from a Hive table 
* Demonstrates how to saved processed data to S3 and utilize Built In SageMaker algorithm (BlazingText) for sentiment analysis


In [None]:
%%local
print("Demo Notebook")

### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. 

For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms
![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)

Next, we will query the movie_reviews table and get the data into a spark dataframe. You can visualize the data from the remote cluster locally in the notebook 

In [None]:
from pyspark.sql.functions import regexp_replace, col, concat, lit
movie_reviews = sqlContext.sql("select * from movie_reviews").cache()
movie_reviews= movie_reviews.where(col('sentiment') != "sentiment")

Using the SageMaker Studio sparkmagic kernel, you can train machine learning models in the Spark cluster using the *SageMaker Spark library*. SageMaker Spark is an open source Spark library for Amazon SageMaker. For examples, 
see [here](https://github.com/aws/sagemaker-spark#example-using-sagemaker-spark-with-any-sagemaker-algorithm)

In this notebook however, we will use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting

In the next cell, we will install the necessary libraries

In [None]:
%%local
%pip install -q sagemaker-experiments 

Next, we will import libraries and set global definitions. 
If you see warnings about **PkgResourceDeprecation** or **invalid version** when you run the cell below, please proceed 


In [None]:
%%local
import sagemaker
import boto3
import botocore
from botocore.exceptions import ClientError
from time import strftime, gmtime
import json
from sagemaker import get_execution_role

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

In [None]:
%%local 
sess = sagemaker.Session()
bucket = sess.default_bucket()

train_bucket = f"s3://{bucket}/reviews/train"
val_bucket = f"s3://{bucket}/reviews/val"

Send the following variables to spark

In [None]:
%%send_to_spark -i train_bucket -t str -n train_bucket

In [None]:
%%send_to_spark -i val_bucket -t str -n val_bucket

In [None]:
val_bucket

### Pre-process data and feature engineering

In [None]:
from pyspark.sql.functions import regexp_replace, col, concat, lit

movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'positive', '__label__positive'))
movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'negative', '__label__negative'))

# Remove all the special characters
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', '\W', " "))

# Remove all single characters
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\s+[a-zA-Z]\s+", " "))

# Remove single characters from the start
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\^[a-zA-Z]\s+", " "))

# Substituting multiple spaces with single space
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\s+", " "))

# Removing prefixed 'b'
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"^b\s+", " "))

movie_reviews.show()

In [None]:
# Merge columns for BlazingText input format:
# https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html
movie_reviews = movie_reviews.select(concat(col("sentiment"), lit(" "), col("review")).alias("record"))
movie_reviews.show()

In [None]:
# Set flag so that _SUCCESS meta files are not written to S3
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [None]:
train_df, val_df = movie_reviews.randomSplit([0.8, 0.2], seed=42)
train_df.coalesce(1).write.csv(train_bucket, mode='overwrite') 
val_df.coalesce(1).write.csv(val_bucket, mode='overwrite') 

print(train_bucket)
print(val_bucket)

In [None]:
%%local
instance_type_smtraining="ml.m5.xlarge"
instance_type_smendpoint="ml.m5.xlarge"

In [None]:
%%local
prefix = 'blazingtext/supervised' 
output_location = 's3://{}/{}/output'.format(bucket, prefix)

print(train_bucket)
print(val_bucket)
print(output_location)

### Train a SageMaker model
#### Amazon SageMaker Experiments

Amazon SageMaker Experiments allows us to keep track of model training; organize related models together; and log model configuration, parameters, and metrics to reproduce and iterate on previous models and compare models. 
Let's create the experiment, trial, and train the model. To reduce cost, the training code below has a variable to utilize spot instances.

In [None]:
%%local
import boto3
region_name = boto3.Session().region_name

sm_session = sagemaker.session.Session()

create_date = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sentiment_experiment = Experiment.create(experiment_name="sentimentdetection-{}".format(create_date), 
 description="Detect sentiment in text", 
 sagemaker_boto_client=boto3.client('sagemaker'))

trial = Trial.create(trial_name="sentiment-trial-blazingtext-{}".format(strftime("%Y-%m-%d-%H-%M-%S", gmtime())), 
 experiment_name=sentiment_experiment.experiment_name,
 sagemaker_boto_client=boto3.client('sagemaker'))

container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, "blazingtext", "latest")
print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))

In [None]:
%%local 
train_use_spot_instances = False
train_max_run=3600
train_max_wait = 3600 if train_use_spot_instances else None

bt_model = sagemaker.estimator.Estimator(container,
 role=sagemaker.get_execution_role(), 
 instance_count=1, 
 instance_type=instance_type_smtraining,
 volume_size = 30,
 input_mode= 'File',
 output_path=output_location,
 sagemaker_session=sm_session,
 use_spot_instances=train_use_spot_instances,
 max_run=train_max_run,
 max_wait=train_max_wait)

In [None]:
%%local 
bt_model.set_hyperparameters(mode="supervised",
 epochs=10,
 min_count=2,
 learning_rate=0.005328,
 vector_dim=286,
 early_stopping=True,
 patience=4,
 min_epochs=5,
 word_ngrams=2)

In [None]:
%%local
train_data = sagemaker.inputs.TrainingInput(train_bucket, distribution='FullyReplicated', 
 content_type='text/plain', s3_data_type='S3Prefix')

validation_data = sagemaker.inputs.TrainingInput(val_bucket, distribution='FullyReplicated', 
 content_type='text/plain', s3_data_type='S3Prefix')


data_channels = {'train': train_data, 'validation': validation_data}

In [None]:
%%local
%%time

bt_model.fit(data_channels, 
 experiment_config={
 "ExperimentName": sentiment_experiment.experiment_name, 
 "TrialName": trial.trial_name,
 "TrialComponentDisplayName": "BlazingText-Training",
 },
 logs=False)

### Deploy the model and get predictions

In [None]:
%%local 
from sagemaker.serializers import JSONSerializer

text_classifier = bt_model.deploy(initial_instance_count = 1, instance_type = instance_type_smendpoint, serializer=JSONSerializer())

In [None]:
%%local 
import json

review = ["please give this one a miss br br kristy swanson and the rest of the cast"
 "rendered terrible performances the show is flat flat flat br br"
 "i don't know how michael madison could have allowed this one on his plate"
 "he almost seemed to know this wasn't going to work out"
 "and his performance was quite lacklustre so all you madison fans give this a miss"]

payload = {"instances" : review}
output = json.loads(text_classifier.predict(payload).decode('utf-8'))
classification = output[0]['label'][0].split('__')[-1]

print("Sentiment:", classification.upper())

### Clean up 

In [None]:
%%local
# Delete endpoint
text_classifier.delete_endpoint()

In [None]:
%%cleanup -f