{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Overview\n", "\n", "This notebook is tested using SageMaker `Studio SparkMagic - PySpark Kernel`. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.\n", "\n", "\n", "This notebook does the following:\n", "\n", "* Demonstrates how you can visually connect Amazon SageMaker Studio Sparkmagic kernel to an EMR cluster\n", "* Explore and query data from a Hive table \n", "* Demonstrates how to saved processed data to S3 and utilize Built In SageMaker algorithm (BlazingText) for sentiment analysis\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "print(\"Demo Notebook\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connection to EMR Cluster\n", "\n", "In the cell below, the code block is autogenerated. You can generate this code by clicking on the \"Cluster\" link on the top of the notebook and select the EMR cluster. \n", "\n", "For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms\n", "![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will query the movie_reviews table and get the data into a spark dataframe. You can visualize the data from the remote cluster locally in the notebook " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import regexp_replace, col, concat, lit\n", "movie_reviews = sqlContext.sql(\"select * from movie_reviews\").cache()\n", "movie_reviews= movie_reviews.where(col('sentiment') != \"sentiment\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using the SageMaker Studio sparkmagic kernel, you can train machine learning models in the Spark cluster using the *SageMaker Spark library*. SageMaker Spark is an open source Spark library for Amazon SageMaker. For examples, \n", "see [here](https://github.com/aws/sagemaker-spark#example-using-sagemaker-spark-with-any-sagemaker-algorithm)\n", "\n", "In this notebook however, we will use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting\n", "\n", "In the next cell, we will install the necessary libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "%pip install -q sagemaker-experiments " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will import libraries and set global definitions. \n", "If you see warnings about **PkgResourceDeprecation** or **invalid version** when you run the cell below, please proceed \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "import sagemaker\n", "import boto3\n", "import botocore\n", "from botocore.exceptions import ClientError\n", "from time import strftime, gmtime\n", "import json\n", "from sagemaker import get_execution_role\n", "\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "\n", "train_bucket = f\"s3://{bucket}/reviews/train\"\n", "val_bucket = f\"s3://{bucket}/reviews/val\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Send the following variables to spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i train_bucket -t str -n train_bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i val_bucket -t str -n val_bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val_bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pre-process data and feature engineering" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import regexp_replace, col, concat, lit\n", "\n", "movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'positive', '__label__positive'))\n", "movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'negative', '__label__negative'))\n", "\n", "# Remove all the special characters\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', '\\W', \" \"))\n", "\n", "# Remove all single characters\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\s+[a-zA-Z]\\s+\", \" \"))\n", "\n", "# Remove single characters from the start\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\^[a-zA-Z]\\s+\", \" \"))\n", "\n", "# Substituting multiple spaces with single space\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\s+\", \" \"))\n", "\n", "# Removing prefixed 'b'\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"^b\\s+\", \" \"))\n", "\n", "movie_reviews.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Merge columns for BlazingText input format:\n", "# https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html\n", "movie_reviews = movie_reviews.select(concat(col(\"sentiment\"), lit(\" \"), col(\"review\")).alias(\"record\"))\n", "movie_reviews.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set flag so that _SUCCESS meta files are not written to S3\n", "spark.conf.set(\"mapreduce.fileoutputcommitter.marksuccessfuljobs\", \"false\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df, val_df = movie_reviews.randomSplit([0.8, 0.2], seed=42)\n", "train_df.coalesce(1).write.csv(train_bucket, mode='overwrite') \n", "val_df.coalesce(1).write.csv(val_bucket, mode='overwrite') \n", "\n", "print(train_bucket)\n", "print(val_bucket)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "instance_type_smtraining=\"ml.m5.xlarge\"\n", "instance_type_smendpoint=\"ml.m5.xlarge\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "prefix = 'blazingtext/supervised' \n", "output_location = 's3://{}/{}/output'.format(bucket, prefix)\n", "\n", "print(train_bucket)\n", "print(val_bucket)\n", "print(output_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a SageMaker model\n", "#### Amazon SageMaker Experiments\n", "\n", "Amazon SageMaker Experiments allows us to keep track of model training; organize related models together; and log model configuration, parameters, and metrics to reproduce and iterate on previous models and compare models. \n", "Let's create the experiment, trial, and train the model. To reduce cost, the training code below has a variable to utilize spot instances." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "import boto3\n", "region_name = boto3.Session().region_name\n", "\n", "sm_session = sagemaker.session.Session()\n", "\n", "create_date = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "sentiment_experiment = Experiment.create(experiment_name=\"sentimentdetection-{}\".format(create_date), \n", " description=\"Detect sentiment in text\", \n", " sagemaker_boto_client=boto3.client('sagemaker'))\n", "\n", "trial = Trial.create(trial_name=\"sentiment-trial-blazingtext-{}\".format(strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())), \n", " experiment_name=sentiment_experiment.experiment_name,\n", " sagemaker_boto_client=boto3.client('sagemaker'))\n", "\n", "container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, \"blazingtext\", \"latest\")\n", "print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "train_use_spot_instances = False\n", "train_max_run=3600\n", "train_max_wait = 3600 if train_use_spot_instances else None\n", "\n", "bt_model = sagemaker.estimator.Estimator(container,\n", " role=sagemaker.get_execution_role(), \n", " instance_count=1, \n", " instance_type=instance_type_smtraining,\n", " volume_size = 30,\n", " input_mode= 'File',\n", " output_path=output_location,\n", " sagemaker_session=sm_session,\n", " use_spot_instances=train_use_spot_instances,\n", " max_run=train_max_run,\n", " max_wait=train_max_wait)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "bt_model.set_hyperparameters(mode=\"supervised\",\n", " epochs=10,\n", " min_count=2,\n", " learning_rate=0.005328,\n", " vector_dim=286,\n", " early_stopping=True,\n", " patience=4,\n", " min_epochs=5,\n", " word_ngrams=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "train_data = sagemaker.inputs.TrainingInput(train_bucket, distribution='FullyReplicated', \n", " content_type='text/plain', s3_data_type='S3Prefix')\n", "\n", "validation_data = sagemaker.inputs.TrainingInput(val_bucket, distribution='FullyReplicated', \n", " content_type='text/plain', s3_data_type='S3Prefix')\n", "\n", "\n", "data_channels = {'train': train_data, 'validation': validation_data}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "%%time\n", "\n", "bt_model.fit(data_channels, \n", " experiment_config={\n", " \"ExperimentName\": sentiment_experiment.experiment_name, \n", " \"TrialName\": trial.trial_name,\n", " \"TrialComponentDisplayName\": \"BlazingText-Training\",\n", " },\n", " logs=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the model and get predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "from sagemaker.serializers import JSONSerializer\n", "\n", "text_classifier = bt_model.deploy(initial_instance_count = 1, instance_type = instance_type_smendpoint, serializer=JSONSerializer())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "import json\n", "\n", "review = [\"please give this one a miss br br kristy swanson and the rest of the cast\"\n", " \"rendered terrible performances the show is flat flat flat br br\"\n", " \"i don't know how michael madison could have allowed this one on his plate\"\n", " \"he almost seemed to know this wasn't going to work out\"\n", " \"and his performance was quite lacklustre so all you madison fans give this a miss\"]\n", "\n", "payload = {\"instances\" : review}\n", "output = json.loads(text_classifier.predict(payload).decode('utf-8'))\n", "classification = output[0]['label'][0].split('__')[-1]\n", "\n", "print(\"Sentiment:\", classification.upper())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "# Delete endpoint\n", "text_classifier.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%cleanup -f" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }