{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Overview\n", "\n", "This notebook is tested using SageMaker `Studio SparkMagic - PySpark Kernel`. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.\n", "\n", "This notebook does the following:\n", "\n", "* Demonstrates how you can visually connect Amazon SageMaker Studio Sparkmagic kernel to an EMR cluster\n", "* Explore and query data from a Hive table \n", "* Use the data locally\n", "\n", "----------\n", "\n", "\n", "When using PySpark kernel notebooks, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:\n", "- SparkContext \n", "- sqlContext " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connection to EMR Cluster\n", "\n", "In the cell below, the code block is autogenerated. You can generate this code by clicking on the \"Cluster\" link on the top of the notebook and select the EMR cluster. \n", "\n", "For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms\n", "![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SparkUI\n", "The above connection generates a presigned url for viewing the SparkUI and debugging commands throughout this notebook" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----------\n", "### PySpark magics \n", "\n", "The PySpark kernel provides some predefined “magics”, which are special commands that you can call with `%%` (e.g. `%%MAGIC` ). The magic command must be the first word in a code cell and allow for multiple lines of content. You can’t put comments before a cell magic.\n", "\n", "For more information on magics, see [here](http://ipython.readthedocs.org/en/stable/interactive/magics.html).\n", "\n", "#### Running locally (%%local)\n", "\n", "You can use the `%%local` magic to run your code locally on the Jupyter server without going to Spark. When you use %%local all subsequent lines in the cell will be executed locally. The code in the cell must be valid Python code." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Lets start by viewing the available SparkMagic Commands" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%help" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the next cell, we will use the sqlContext that was return to use through the connection to query Hive and look at the databases and tables" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "dbs = sqlContext.sql(\"show databases\")\n", "dbs.show()\n", "\n", "tables = sqlContext.sql(\"show tables\")\n", "tables.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from pyspark.sql.functions import regexp_replace, col, concat, lit\n", "movie_reviews = sqlContext.sql(\"select * from movie_reviews\").cache()\n", "movie_reviews= movie_reviews.where(col('sentiment') != \"sentiment\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Shape\n", "print((movie_reviews.count(), len(movie_reviews.columns)))\n", "\n", "# Count of both positive and negative sentiments\n", "movie_reviews.groupBy('sentiment').count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at the data size and size of each class (positive and negative) and visualize it. You can see that we have a balanced dataset with equal number on both classes (25000 each)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "pos_reviews = movie_reviews.filter(movie_reviews.sentiment == 'positive').collect()\n", "neg_reviews = movie_reviews.filter(movie_reviews.sentiment == 'negative').collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "def plot_counts(positive,negative):\n", " plt.rcParams['figure.figsize']=(6,6)\n", " plt.bar(0,positive,width=0.6,label='Positive Reviews',color='Green')\n", " plt.bar(2,negative,width=0.6,label='Negative Reviews',color='Red')\n", " handles, labels = plt.gca().get_legend_handles_labels()\n", " by_label = dict(zip(labels, handles))\n", " plt.legend(by_label.values(), by_label.keys())\n", " plt.ylabel('Count')\n", " plt.xlabel('Type of Review')\n", " plt.tick_params(\n", " axis='x', \n", " which='both', \n", " bottom=False, \n", " top=False, \n", " labelbottom=False) \n", " plt.show()\n", " \n", "plot_counts(len(pos_reviews),len(neg_reviews))\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, Let's inspect length of reviews using the pyspark.sql.functions module" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from pyspark.sql.functions import length\n", "reviewlengthDF = movie_reviews.select(length('review').alias('Length of Review')) \n", "reviewlengthDF.show() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also execute SparkSQL queries using the %%sql magic and pass results to a local data frame using the `-o` option. This allows for quick data exploration. Max rows returned by default is 2500. You can set the max rows by using the `-n` argument. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can access and explore the data in the dataframe locally" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "----------\n", "### Livy Connection\n", " \n", "Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management, all via a simple REST interface or an RPC client library. \n", " \n", "![image](https://user-images.githubusercontent.com/18154355/216506704-41eb8a56-d9a8-4935-b112-97d3e28f48d0.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "local_var = \"This variable exists on Studio Notebook kernel\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%send_to_spark -i local_var -t str -n var_on_cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "print(var_on_cluster)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean Up" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%cleanup -f" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----------\n", "## Schedule Notebook\n", "\n", "With SageMaker notebook jobs, you can now run your notebooks as is or in a parameterized fashion with just a few simple clicks from the Studio interface. You can run these notebooks on a schedule or immediately. There's no need for the end-user to modify their existing notebook code. When the job is complete, you can view the populated notebook cells, including any visualizations!\n", "\n", "![image](https://camo.githubusercontent.com/0f61ebabd0b7ed4376a964fab4d4fbe81fbb9ba47a08c33b99a8fb5cef3e33be/68747470733a2f2f736167656d616b65722d73616d706c652d66696c65732e73332e616d617a6f6e6177732e636f6d2f696d616765732f736167656d616b65722d73747564696f2d7363686564756c696e672f6f766572766965772e706e67)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To run this notebook as a scheduled job:\n", "\n", "1. Goto \"Kernel\" -> \"Restart kernel and clear outputs\"\n", "2. Save this notebook file now that EMR connection cell is populated\n", "3. Right click this notebook in the file browser and select \"Create Notebook Job\"\n", "4. Choose \"Run Now\" or set a schedule. \n", "5. Modify \"Additional Options\" as necessary\n", "\n", "For more information on scheduled notebooks, such as notebook parameterization view the docs:\n", "\n", "[https://docs.aws.amazon.com/sagemaker/latest/dg/create-manage-notebook-auto-run.html](https://docs.aws.amazon.com/sagemaker/latest/dg/create-manage-notebook-auto-run.html)\n", "\n", "![img](https://camo.githubusercontent.com/c0b0b4fa49b9d048a4c4b3374f95f011de3c125583fdf9b9a76c695a5202a31b/68747470733a2f2f736167656d616b65722d73616d706c652d66696c65732e73332e616d617a6f6e6177732e636f6d2f696d616765732f736167656d616b65722d73747564696f2d7363686564756c696e672f6372656174655f6a6f622e706e67)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }