### Overview

This notebook does the following:

* Demonstrates how you can visually connect Amazon SageMaker Studio Python 3 (Data Science) kernel to an EMR Cluster
* Explore and query data from a Hive table using the pyhive library
* Demonstrates how to use the data for Machine Learning






### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. The "j-xxxxxxxxxxxx" is the cluster id of the cluster selected. 

For the example in our blog, we used a no-auth cluster for simplicity, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms

In [None]:
# %load_ext sagemaker_studio_analytics_extension.magics
# %sm_analytics emr connect --cluster-id j-xxxxxxxxxxx --auth_type None --language python

First, we will import hive module from the pyhive library

In [None]:
from pyhive import hive

Next, We use the private DNS name of the EMR primary in the following code. Replace the host with the correct DNS name. You can find this information on the Amazon EMR console (expand the cluster name and locate Master public DNS under in summary section).

In [None]:
conn = hive.Connection(host=''', port=10000)

Next, we will query the movie_reviews table and get the data into a pandas dataframe. You can visualize the data using the code below

In [None]:
cursor = conn.cursor()
cursor.execute("show databases")
cursor.fetchall()

In [None]:
cursor.execute("show tables")
cursor.fetchall()

In [None]:
import pandas as pd

movie_reviews = pd.read_sql("select review, sentiment from movie_reviews", conn)

In [None]:
movie_reviews.head()

In [None]:
pos_reviews = movie_reviews.filter(movie_reviews.sentiment == "positive")
neg_reviews = movie_reviews.filter(movie_reviews.sentiment == "negative")

In [None]:
import matplotlib.pyplot as plt


def plot_counts(positive, negative):
 plt.rcParams["figure.figsize"] = (6, 6)
 plt.bar(0, positive, width=0.6, label="Positive Reviews", color="Green")
 plt.bar(2, negative, width=0.6, label="Negative Reviews", color="Red")
 handles, labels = plt.gca().get_legend_handles_labels()
 by_label = dict(zip(labels, handles))
 plt.legend(by_label.values(), by_label.keys())
 plt.ylabel("Count")
 plt.xlabel("Type of Review")
 plt.tick_params(axis="x", which="both", bottom=False, top=False, labelbottom=False)
 plt.show()


plot_counts(len(pos_reviews), len(neg_reviews))

next, we will use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting

In the next cell, we will install the necessary libraries

In [None]:
import sys

!{sys.executable} -m pip install sagemaker-experiments
!{sys.executable} -m pip show sagemaker

Next, we will import libraries and set global definitions

In [None]:
import sagemaker
import boto3
import botocore
from botocore.exceptions import ClientError
from time import strftime, gmtime
import json
from sagemaker import get_execution_role

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

In [None]:
sess = boto3.Session()
region_name = sess.region_name
role = sagemaker.get_execution_role()
sm_runtime = boto3.Session().client("sagemaker-runtime")

In the next cell, we will create a new S3 bucket that will be used for storing the training and validation data

In [None]:
stsclient = boto3.client("sts", region_name=region_name)
s3client = boto3.client("s3", region_name=region_name)

aws_account_id = stsclient.get_caller_identity()["Account"]
bucket = "sagemaker-studio-pyhive-{}-{}".format(region_name, aws_account_id)
key = "sentiment/movie_reviews.csv"
smprocessing_input = "s3://{}/{}".format(bucket, key)

try:
 if region_name == "us-east-1":
 s3client.create_bucket(Bucket=bucket)
 else:
 s3client.create_bucket(
 Bucket=bucket, CreateBucketConfiguration={"LocationConstraint": region_name}
 )
except ClientError as e:
 error_code = e.response["Error"]["Code"]
 message = e.response["Error"]["Message"]
 if error_code == "BucketAlreadyOwnedByYou":
 print(
 "A bucket with the same name already exists in your account - using the same bucket."
 )
 pass
 else:
 print("Error->{}:{}".format(error_code, message))

Upload the data to the S3 bucket

In [None]:
import boto3
from io import StringIO

csv_buffer = StringIO()
movie_reviews.to_csv(csv_buffer)
s3_resource = boto3.resource("s3")
s3_resource.Object(bucket, key).put(Body=csv_buffer.getvalue())

### Pre-process data and feature engineering

#### Amazon SageMaker Processing jobs using the Scikit-learn Processor

Pre-process data and feature engineering
Amazon SageMaker Processing jobs using the Scikit-learn Processor
With Amazon SageMaker Processing jobs, you can leverage a simplified, managed experience to run data pre- or post-processing and model evaluation workloads on the Amazon SageMaker platform.

A processing job downloads input from Amazon Simple Storage Service (Amazon S3), then uploads outputs to Amazon S3 during or after the processing job.

The cell below shows how to run scikit-learn scripts using a Docker image provided and maintained by SageMaker to preprocess data.

Note: We will use a "ml.m5.xlarge" instance as the instance type for sagemaker processing, training and model hosting. If you don't have access to this instance type and see a "ResourceLimitExceeded" error, use another instance type that you have access to. You can also request a service limit increase using AWS Support Center

In [None]:
instance_type_smprocessing = "ml.m5.xlarge"
instance_type_smtraining = "ml.m5.xlarge"
instance_type_smendpoint = "ml.m5.xlarge"

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
 framework_version="0.20.0",
 role=role,
 instance_type=instance_type_smprocessing,
 instance_count=1,
)

In [None]:
print(smprocessing_input)
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
 code="preprocessing.py",
 inputs=[
 ProcessingInput(
 source=smprocessing_input, destination="/opt/ml/processing/input"
 )
 ],
 outputs=[
 ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
 ProcessingOutput(
 output_name="validation_data", source="/opt/ml/processing/validation"
 ),
 ],
 arguments=["--train-test-split-ratio", "0.2"],
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
 if output["OutputName"] == "train_data":
 preprocessed_training_data = output["S3Output"]["S3Uri"]
 if output["OutputName"] == "validation_data":
 preprocessed_validation_data = output["S3Output"]["S3Uri"]

In [None]:
print(preprocessed_training_data)
print(preprocessed_validation_data)

In [None]:
prefix = "blazingtext/supervised"
s3_train_data = preprocessed_training_data
s3_validation_data = preprocessed_validation_data
s3_output_location = "s3://{}/{}/output".format(bucket, prefix)

### Train a SageMaker model
#### Amazon SageMaker Experiments

Amazon SageMaker Experiments allows us to keep track of model training; organize related models together; and log model configuration, parameters, and metrics to reproduce and iterate on previous models and compare models. 
Let's create the experiment, trial, and train the model. To reduce cost, the training code below uses spot instances.

In [None]:
sm_session = sagemaker.session.Session()

create_date = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sentiment_experiment = Experiment.create(
 experiment_name="sentimentdetection-{}".format(create_date),
 description="Detect sentiment in text",
 sagemaker_boto_client=boto3.client("sagemaker"),
)

trial = Trial.create(
 trial_name="sentiment-trial-blazingtext-{}".format(
 strftime("%Y-%m-%d-%H-%M-%S", gmtime())
 ),
 experiment_name=sentiment_experiment.experiment_name,
 sagemaker_boto_client=boto3.client("sagemaker"),
)

container = sagemaker.amazon.amazon_estimator.get_image_uri(
 region_name, "blazingtext", "latest"
)
print("Using SageMaker BlazingText container: {} ({})".format(container, region_name))

In [None]:
train_use_spot_instances = False
train_max_run = 3600
train_max_wait = 3600 if train_use_spot_instances else None

bt_model = sagemaker.estimator.Estimator(
 container,
 role,
 instance_count=1,
 instance_type=instance_type_smtraining,
 volume_size=30,
 input_mode="File",
 output_path=s3_output_location,
 sagemaker_session=sm_session,
 use_spot_instances=train_use_spot_instances,
 max_run=train_max_run,
 max_wait=train_max_wait,
)

In [None]:
bt_model.set_hyperparameters(
 mode="supervised",
 epochs=10,
 min_count=2,
 learning_rate=0.005328,
 vector_dim=286,
 early_stopping=True,
 patience=4,
 min_epochs=5,
 word_ngrams=2,
)

In [None]:
train_data = sagemaker.inputs.TrainingInput(
 s3_train_data,
 distribution="FullyReplicated",
 content_type="text/plain",
 s3_data_type="S3Prefix",
)
validation_data = sagemaker.inputs.TrainingInput(
 s3_validation_data,
 distribution="FullyReplicated",
 content_type="text/plain",
 s3_data_type="S3Prefix",
)
data_channels = {"train": train_data, "validation": validation_data}

In [None]:
%%time

bt_model.fit(
 data_channels,
 experiment_config={
 "ExperimentName": sentiment_experiment.experiment_name,
 "TrialName": trial.trial_name,
 "TrialComponentDisplayName": "BlazingText-Training",
 },
 logs=False,
)

### Deploy the model and get predictions

In [None]:
text_classifier = bt_model.deploy(
 initial_instance_count=1, instance_type=instance_type_smendpoint
)

In [None]:
review = [
 "please give this one a miss br br kristy swanson and the rest of the cast"
 "rendered terrible performances the show is flat flat flat br br"
 "i don't know how michael madison could have allowed this one on his plate"
 "he almost seemed to know this wasn't going to work out"
 "and his performance was quite lacklustre so all you madison fans give this a miss"
]
tokenized_review = [" ".join(t.split(" ")) for t in review]
# For retrieving the top k predictions, you can set k in the configuration
payload = {"instances": tokenized_review}
bt_endpoint_name = text_classifier.endpoint_name
response = sm_runtime.invoke_endpoint(
 EndpointName=bt_endpoint_name,
 ContentType="application/json",
 Body=json.dumps(payload),
)
output = json.loads(response["Body"].read().decode("utf-8"))
# make the output readable
import copy

predictions = copy.deepcopy(output)
for output in predictions:
 output["label"] = output["label"][0][9:].upper()
print(predictions)

### Clean up 

In [None]:
# Clean up resources created as part of this notebook
# delete endpoint
text_classifier.delete_endpoint()
# empty s3 bucket we created
s3_bucket_to_remove = "s3://{}".format(bucket)
!aws s3 rm {s3_bucket_to_remove} --recursive