{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Overview\n", "\n", "This notebook is tested using SageMaker `Studio SparkMagic - PySpark Kernel`. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.\n", "\n", "This notebook does the following:\n", "\n", "* Demonstrates how you can visually connect Amazon SageMaker Studio Sparkmagic kernel to an EMR cluster\n", "* Explore and query data from a Hive table\n", "* Use the data locally\n", "* Provides resources that demonstrate how to use the local data for ML including using SageMaker Processing.\n", "\n", "\n", "\n", "\n", "----------\n", "\n", "\n", "When using PySpark kernel notebooks, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:\n", "- SparkContext (sc)\n", "- HiveContext (sqlContext)\n", "\n", "----------\n", "### PySpark magics\n", "\n", "The PySpark kernel provides some predefined “magics”, which are special commands that you can call with `%%` (e.g. `%%MAGIC` ). The magic command must be the first word in a code cell and allow for multiple lines of content. You can’t put comments before a cell magic.\n", "\n", "For more information on magics, see [here](http://ipython.readthedocs.org/en/stable/interactive/magics.html).\n", "\n", "#### Running locally (%%local)\n", "\n", "You can use the `%%local` magic to run your code locally on the Jupyter server without going to Spark. When you use %%local all subsequent lines in the cell will be executed locally. The code in the cell must be valid Python code.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "print(\"Demo Notebook\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connection to EMR Cluster\n", "\n", "In the cell below, the code block is autogenerated. You can generate this code by clicking on the \"Cluster\" link on the top of the notebook and select the EMR cluster. The \"j-xxxxxxxxxxxx\" is the cluster id of the cluster selected. \n", "\n", "For this workshop, we use a no-auth cluster for simplicity, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# %load_ext sagemaker_studio_analytics_extension.magics\n", "# %sm_analytics emr connect --cluster-id j-xxxxxxxxxxxx --auth-type None " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will query the movie_reviews table and get the data into a spark dataframe. You can visualize the data from the remote cluster locally in the notebook " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import regexp_replace, col, concat, lit\n", "movie_reviews = sqlContext.sql(\"select * from movie_reviews\").cache()\n", "movie_reviews= movie_reviews.where(col('sentiment') != \"sentiment\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using the SageMaker Studio sparkmagic kernel, you can train machine learning models in the Spark cluster using the *SageMaker Spark library*. SageMaker Spark is an open source Spark library for Amazon SageMaker. For examples, \n", "see [here](https://github.com/aws/sagemaker-spark#example-using-sagemaker-spark-with-any-sagemaker-algorithm)\n", "\n", "In this notebook however, we will use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting\n", "\n", "In the next cell, we will install the necessary libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "%pip install -q sagemaker-experiments " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will import libraries and set global definitions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "import sagemaker\n", "import boto3\n", "import botocore\n", "from botocore.exceptions import ClientError\n", "from time import strftime, gmtime\n", "import json\n", "from sagemaker import get_execution_role\n", "\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "\n", "train_bucket = f\"s3://{bucket}/reviews/train\"\n", "val_bucket = f\"s3://{bucket}/reviews/val\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Send the following variables to spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i train_bucket -t str -n train_bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i val_bucket -t str -n val_bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val_bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pre-process data and feature engineering" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import regexp_replace, col, concat, lit\n", "\n", "movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'positive', '__label__positive'))\n", "movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'negative', '__label__negative'))\n", "\n", "# Remove all the special characters\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', '\\W', \" \"))\n", "\n", "# Remove all single characters\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\s+[a-zA-Z]\\s+\", \" \"))\n", "\n", "# Remove single characters from the start\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\^[a-zA-Z]\\s+\", \" \"))\n", "\n", "# Substituting multiple spaces with single space\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"\\s+\", \" \"))\n", "\n", "# Removing prefixed 'b'\n", "movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r\"^b\\s+\", \" \"))\n", "\n", "movie_reviews.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Merge columns for BlazingText input format:\n", "# https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html\n", "movie_reviews = movie_reviews.select(concat(col(\"sentiment\"), lit(\" \"), col(\"review\")).alias(\"record\"))\n", "movie_reviews.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set flag so that _SUCCESS meta files are not written to S3\n", "spark.conf.set(\"mapreduce.fileoutputcommitter.marksuccessfuljobs\", \"false\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df, val_df = movie_reviews.randomSplit([0.8, 0.2], seed=42)\n", "train_df.coalesce(1).write.csv(train_bucket, mode='overwrite') \n", "val_df.coalesce(1).write.csv(val_bucket, mode='overwrite') \n", "\n", "print(train_bucket)\n", "print(val_bucket)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "instance_type_smtraining=\"ml.m5.xlarge\"\n", "instance_type_smendpoint=\"ml.m5.xlarge\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "prefix = 'blazingtext/supervised' \n", "output_location = 's3://{}/{}/output'.format(bucket, prefix)\n", "\n", "print(train_bucket)\n", "print(val_bucket)\n", "print(output_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a SageMaker model\n", "#### Amazon SageMaker Experiments\n", "\n", "Amazon SageMaker Experiments allows us to keep track of model training; organize related models together; and log model configuration, parameters, and metrics to reproduce and iterate on previous models and compare models. \n", "Let's create the experiment, trial, and train the model. To reduce cost, the training code below has a variable to utilize spot instances." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "import boto3\n", "region_name = boto3.Session().region_name\n", "\n", "sm_session = sagemaker.session.Session()\n", "\n", "create_date = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "sentiment_experiment = Experiment.create(experiment_name=\"sentimentdetection-{}\".format(create_date), \n", " description=\"Detect sentiment in text\", \n", " sagemaker_boto_client=boto3.client('sagemaker'))\n", "\n", "trial = Trial.create(trial_name=\"sentiment-trial-blazingtext-{}\".format(strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())), \n", " experiment_name=sentiment_experiment.experiment_name,\n", " sagemaker_boto_client=boto3.client('sagemaker'))\n", "\n", "container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, \"blazingtext\", \"latest\")\n", "print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "train_use_spot_instances = False\n", "train_max_run=3600\n", "train_max_wait = 3600 if train_use_spot_instances else None\n", "\n", "bt_model = sagemaker.estimator.Estimator(container,\n", " role=sagemaker.get_execution_role(), \n", " instance_count=1, \n", " instance_type=instance_type_smtraining,\n", " volume_size = 30,\n", " input_mode= 'File',\n", " output_path=output_location,\n", " sagemaker_session=sm_session,\n", " use_spot_instances=train_use_spot_instances,\n", " max_run=train_max_run,\n", " max_wait=train_max_wait)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "bt_model.set_hyperparameters(mode=\"supervised\",\n", " epochs=10,\n", " min_count=2,\n", " learning_rate=0.005328,\n", " vector_dim=286,\n", " early_stopping=True,\n", " patience=4,\n", " min_epochs=5,\n", " word_ngrams=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "train_data = sagemaker.inputs.TrainingInput(train_bucket, distribution='FullyReplicated', \n", " content_type='text/plain', s3_data_type='S3Prefix')\n", "\n", "validation_data = sagemaker.inputs.TrainingInput(val_bucket, distribution='FullyReplicated', \n", " content_type='text/plain', s3_data_type='S3Prefix')\n", "\n", "\n", "data_channels = {'train': train_data, 'validation': validation_data}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "%%time\n", "\n", "bt_model.fit(data_channels, \n", " experiment_config={\n", " \"ExperimentName\": sentiment_experiment.experiment_name, \n", " \"TrialName\": trial.trial_name,\n", " \"TrialComponentDisplayName\": \"BlazingText-Training\",\n", " },\n", " logs=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the model and get predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "from sagemaker.serializers import JSONSerializer\n", "\n", "text_classifier = bt_model.deploy(initial_instance_count = 1, instance_type = instance_type_smendpoint, serializer=JSONSerializer())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "import json\n", "\n", "review = [\"please give this one a miss br br kristy swanson and the rest of the cast\"\n", " \"rendered terrible performances the show is flat flat flat br br\"\n", " \"i don't know how michael madison could have allowed this one on his plate\"\n", " \"he almost seemed to know this wasn't going to work out\"\n", " \"and his performance was quite lacklustre so all you madison fans give this a miss\"]\n", "\n", "payload = {\"instances\" : review}\n", "output = json.loads(text_classifier.predict(payload).decode('utf-8'))\n", "classification = output[0]['label'][0].split('__')[-1]\n", "\n", "print(\"Sentiment:\", classification.upper())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "# Delete endpoint\n", "text_classifier.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%cleanup -f" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-1:742091327244:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }