{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Overview\n", "\n", "This notebook does the following:\n", "\n", "* Demonstrates how you can visually connect Amazon SageMaker Studio Python 3 (Data Science) kernel to an EMR Cluster\n", "* Explore and query data from a Hive table using the pyhive library\n", "* Demonstrates how to use the data for Machine Learning\n", "\n", "\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connection to EMR Cluster\n", "\n", "In the cell below, the code block is autogenerated. You can generate this code by clicking on the \"Cluster\" link on the top of the notebook and select the EMR cluster. The \"j-xxxxxxxxxxxx\" is the cluster id of the cluster selected. \n", "\n", "For the example in our blog, we used a no-auth cluster for simplicity, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# %load_ext sagemaker_studio_analytics_extension.magics\n", "# %sm_analytics emr connect --cluster-id j-xxxxxxxxxxx --auth_type None --language python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we will import hive module from the pyhive library" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyhive import hive" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, We use the private DNS name of the EMR primary in the following code. Replace the host with the correct DNS name. You can find this information on the Amazon EMR console (expand the cluster name and locate Master public DNS under in summary section)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conn = hive.Connection(host=''', port=10000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will query the movie_reviews table and get the data into a pandas dataframe. You can visualize the data using the code below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor = conn.cursor()\n", "cursor.execute(\"show databases\")\n", "cursor.fetchall()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor.execute(\"show tables\")\n", "cursor.fetchall()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "movie_reviews = pd.read_sql(\"select review, sentiment from movie_reviews\", conn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "movie_reviews.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pos_reviews = movie_reviews.filter(movie_reviews.sentiment == \"positive\")\n", "neg_reviews = movie_reviews.filter(movie_reviews.sentiment == \"negative\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "\n", "def plot_counts(positive, negative):\n", " plt.rcParams[\"figure.figsize\"] = (6, 6)\n", " plt.bar(0, positive, width=0.6, label=\"Positive Reviews\", color=\"Green\")\n", " plt.bar(2, negative, width=0.6, label=\"Negative Reviews\", color=\"Red\")\n", " handles, labels = plt.gca().get_legend_handles_labels()\n", " by_label = dict(zip(labels, handles))\n", " plt.legend(by_label.values(), by_label.keys())\n", " plt.ylabel(\"Count\")\n", " plt.xlabel(\"Type of Review\")\n", " plt.tick_params(axis=\"x\", which=\"both\", bottom=False, top=False, labelbottom=False)\n", " plt.show()\n", "\n", "\n", "plot_counts(len(pos_reviews), len(neg_reviews))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "next, we will use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting\n", "\n", "In the next cell, we will install the necessary libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "\n", "!{sys.executable} -m pip install sagemaker-experiments\n", "!{sys.executable} -m pip show sagemaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will import libraries and set global definitions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import boto3\n", "import botocore\n", "from botocore.exceptions import ClientError\n", "from time import strftime, gmtime\n", "import json\n", "from sagemaker import get_execution_role\n", "\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess = boto3.Session()\n", "region_name = sess.region_name\n", "role = sagemaker.get_execution_role()\n", "sm_runtime = boto3.Session().client(\"sagemaker-runtime\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the next cell, we will create a new S3 bucket that will be used for storing the training and validation data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stsclient = boto3.client(\"sts\", region_name=region_name)\n", "s3client = boto3.client(\"s3\", region_name=region_name)\n", "\n", "aws_account_id = stsclient.get_caller_identity()[\"Account\"]\n", "bucket = \"sagemaker-studio-pyhive-{}-{}\".format(region_name, aws_account_id)\n", "key = \"sentiment/movie_reviews.csv\"\n", "smprocessing_input = \"s3://{}/{}\".format(bucket, key)\n", "\n", "try:\n", " if region_name == \"us-east-1\":\n", " s3client.create_bucket(Bucket=bucket)\n", " else:\n", " s3client.create_bucket(\n", " Bucket=bucket, CreateBucketConfiguration={\"LocationConstraint\": region_name}\n", " )\n", "except ClientError as e:\n", " error_code = e.response[\"Error\"][\"Code\"]\n", " message = e.response[\"Error\"][\"Message\"]\n", " if error_code == \"BucketAlreadyOwnedByYou\":\n", " print(\n", " \"A bucket with the same name already exists in your account - using the same bucket.\"\n", " )\n", " pass\n", " else:\n", " print(\"Error->{}:{}\".format(error_code, message))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload the data to the S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from io import StringIO\n", "\n", "csv_buffer = StringIO()\n", "movie_reviews.to_csv(csv_buffer)\n", "s3_resource = boto3.resource(\"s3\")\n", "s3_resource.Object(bucket, key).put(Body=csv_buffer.getvalue())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pre-process data and feature engineering\n", "\n", "#### Amazon SageMaker Processing jobs using the Scikit-learn Processor" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pre-process data and feature engineering\n", "Amazon SageMaker Processing jobs using the Scikit-learn Processor\n", "With Amazon SageMaker Processing jobs, you can leverage a simplified, managed experience to run data pre- or post-processing and model evaluation workloads on the Amazon SageMaker platform.\n", "\n", "A processing job downloads input from Amazon Simple Storage Service (Amazon S3), then uploads outputs to Amazon S3 during or after the processing job.\n", "\n", "The cell below shows how to run scikit-learn scripts using a Docker image provided and maintained by SageMaker to preprocess data.\n", "\n", "Note: We will use a \"ml.m5.xlarge\" instance as the instance type for sagemaker processing, training and model hosting. If you don't have access to this instance type and see a \"ResourceLimitExceeded\" error, use another instance type that you have access to. You can also request a service limit increase using AWS Support Center" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "instance_type_smprocessing = \"ml.m5.xlarge\"\n", "instance_type_smtraining = \"ml.m5.xlarge\"\n", "instance_type_smendpoint = \"ml.m5.xlarge\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.20.0\",\n", " role=role,\n", " instance_type=instance_type_smprocessing,\n", " instance_count=1,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(smprocessing_input)\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "\n", "sklearn_processor.run(\n", " code=\"preprocessing.py\",\n", " inputs=[\n", " ProcessingInput(\n", " source=smprocessing_input, destination=\"/opt/ml/processing/input\"\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train_data\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(\n", " output_name=\"validation_data\", source=\"/opt/ml/processing/validation\"\n", " ),\n", " ],\n", " arguments=[\"--train-test-split-ratio\", \"0.2\"],\n", ")\n", "\n", "preprocessing_job_description = sklearn_processor.jobs[-1].describe()\n", "\n", "output_config = preprocessing_job_description[\"ProcessingOutputConfig\"]\n", "for output in output_config[\"Outputs\"]:\n", " if output[\"OutputName\"] == \"train_data\":\n", " preprocessed_training_data = output[\"S3Output\"][\"S3Uri\"]\n", " if output[\"OutputName\"] == \"validation_data\":\n", " preprocessed_validation_data = output[\"S3Output\"][\"S3Uri\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(preprocessed_training_data)\n", "print(preprocessed_validation_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prefix = \"blazingtext/supervised\"\n", "s3_train_data = preprocessed_training_data\n", "s3_validation_data = preprocessed_validation_data\n", "s3_output_location = \"s3://{}/{}/output\".format(bucket, prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a SageMaker model\n", "#### Amazon SageMaker Experiments\n", "\n", "Amazon SageMaker Experiments allows us to keep track of model training; organize related models together; and log model configuration, parameters, and metrics to reproduce and iterate on previous models and compare models. \n", "Let's create the experiment, trial, and train the model. To reduce cost, the training code below uses spot instances." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_session = sagemaker.session.Session()\n", "\n", "create_date = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "sentiment_experiment = Experiment.create(\n", " experiment_name=\"sentimentdetection-{}\".format(create_date),\n", " description=\"Detect sentiment in text\",\n", " sagemaker_boto_client=boto3.client(\"sagemaker\"),\n", ")\n", "\n", "trial = Trial.create(\n", " trial_name=\"sentiment-trial-blazingtext-{}\".format(\n", " strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", " ),\n", " experiment_name=sentiment_experiment.experiment_name,\n", " sagemaker_boto_client=boto3.client(\"sagemaker\"),\n", ")\n", "\n", "container = sagemaker.amazon.amazon_estimator.get_image_uri(\n", " region_name, \"blazingtext\", \"latest\"\n", ")\n", "print(\"Using SageMaker BlazingText container: {} ({})\".format(container, region_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_use_spot_instances = False\n", "train_max_run = 3600\n", "train_max_wait = 3600 if train_use_spot_instances else None\n", "\n", "bt_model = sagemaker.estimator.Estimator(\n", " container,\n", " role,\n", " instance_count=1,\n", " instance_type=instance_type_smtraining,\n", " volume_size=30,\n", " input_mode=\"File\",\n", " output_path=s3_output_location,\n", " sagemaker_session=sm_session,\n", " use_spot_instances=train_use_spot_instances,\n", " max_run=train_max_run,\n", " max_wait=train_max_wait,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bt_model.set_hyperparameters(\n", " mode=\"supervised\",\n", " epochs=10,\n", " min_count=2,\n", " learning_rate=0.005328,\n", " vector_dim=286,\n", " early_stopping=True,\n", " patience=4,\n", " min_epochs=5,\n", " word_ngrams=2,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_data = sagemaker.inputs.TrainingInput(\n", " s3_train_data,\n", " distribution=\"FullyReplicated\",\n", " content_type=\"text/plain\",\n", " s3_data_type=\"S3Prefix\",\n", ")\n", "validation_data = sagemaker.inputs.TrainingInput(\n", " s3_validation_data,\n", " distribution=\"FullyReplicated\",\n", " content_type=\"text/plain\",\n", " s3_data_type=\"S3Prefix\",\n", ")\n", "data_channels = {\"train\": train_data, \"validation\": validation_data}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "bt_model.fit(\n", " data_channels,\n", " experiment_config={\n", " \"ExperimentName\": sentiment_experiment.experiment_name,\n", " \"TrialName\": trial.trial_name,\n", " \"TrialComponentDisplayName\": \"BlazingText-Training\",\n", " },\n", " logs=False,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the model and get predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "text_classifier = bt_model.deploy(\n", " initial_instance_count=1, instance_type=instance_type_smendpoint\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "review = [\n", " \"please give this one a miss br br kristy swanson and the rest of the cast\"\n", " \"rendered terrible performances the show is flat flat flat br br\"\n", " \"i don't know how michael madison could have allowed this one on his plate\"\n", " \"he almost seemed to know this wasn't going to work out\"\n", " \"and his performance was quite lacklustre so all you madison fans give this a miss\"\n", "]\n", "tokenized_review = [\" \".join(t.split(\" \")) for t in review]\n", "# For retrieving the top k predictions, you can set k in the configuration\n", "payload = {\"instances\": tokenized_review}\n", "bt_endpoint_name = text_classifier.endpoint_name\n", "response = sm_runtime.invoke_endpoint(\n", " EndpointName=bt_endpoint_name,\n", " ContentType=\"application/json\",\n", " Body=json.dumps(payload),\n", ")\n", "output = json.loads(response[\"Body\"].read().decode(\"utf-8\"))\n", "# make the output readable\n", "import copy\n", "\n", "predictions = copy.deepcopy(output)\n", "for output in predictions:\n", " output[\"label\"] = output[\"label\"][0][9:].upper()\n", "print(predictions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Clean up resources created as part of this notebook\n", "# delete endpoint\n", "text_classifier.delete_endpoint()\n", "# empty s3 bucket we created\n", "s3_bucket_to_remove = \"s3://{}\".format(bucket)\n", "!aws s3 rm {s3_bucket_to_remove} --recursive" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-1:742091327244:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }