### Overview

This notebook is tested using SageMaker `Studio SparkMagic - PySpark Kernel`. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.

This notebook does the following:

* Demonstrates how you can visually connect Amazon SageMaker Studio Sparkmagic kernel to an EMR cluster
* Explore and query data from a Hive table 
* Use the data locally

----------


When using PySpark kernel notebooks, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkContext 
- sqlContext 

----------
### PySpark magics 

The PySpark kernel provides some predefined “magics”, which are special commands that you can call with `%%` (e.g. `%%MAGIC` ). The magic command must be the first word in a code cell and allow for multiple lines of content. You can’t put comments before a cell magic.

For more information on magics, see [here](http://ipython.readthedocs.org/en/stable/interactive/magics.html).

#### Running locally (%%local)

You can use the `%%local` magic to run your code locally on the Jupyter server without going to Spark. When you use %%local all subsequent lines in the cell will be executed locally. The code in the cell must be valid Python code.

----------
### Livy Connection
 
Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management, all via a simple REST interface or an RPC client library. 
 
![image](https://livy.incubator.apache.org/assets/images/livy-architecture.png)

In [None]:
%%local
print("Demo Notebook")

### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. This should auto populate a cell similar to the code below. The "j-xxxxxxxxxxxx" is the cluster id of the cluster selected. 

```
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-xxxxxxxxxxxx --auth-type None 
```

For our workshop we used a no-auth cluster for simplicity, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms

## SparkUI
The above connection generates a presigned url for viewing the SparkUI and debugging commands throughout this notebook

## Lets start by viewing the available SparkMagic Commands

In [None]:
%%help

In the next cell, we will use the sqlContext that was return to use through the connection to query Hive and look at the databases and tables

In [None]:
dbs = sqlContext.sql("show databases")
dbs.show()

tables = sqlContext.sql("show tables")
tables.show()

In [None]:
from pyspark.sql.functions import regexp_replace, col, concat, lit
movie_reviews = sqlContext.sql("select * from movie_reviews").cache()
movie_reviews= movie_reviews.where(col('sentiment') != "sentiment")

In [None]:
# Shape
print((movie_reviews.count(), len(movie_reviews.columns)))

# Count of both positive and negative sentiments
movie_reviews.groupBy('sentiment').count().show()

Let's look at the data size and size of each class (positive and negative) and visualize it. You can see that we have a balanced dataset with equal number on both classes (25000 each)

In [None]:
pos_reviews = movie_reviews.filter(movie_reviews.sentiment == 'positive').collect()
neg_reviews = movie_reviews.filter(movie_reviews.sentiment == 'negative').collect()

In [None]:
import matplotlib.pyplot as plt
def plot_counts(positive,negative):
 plt.rcParams['figure.figsize']=(6,6)
 plt.bar(0,positive,width=0.6,label='Positive Reviews',color='Green')
 plt.bar(2,negative,width=0.6,label='Negative Reviews',color='Red')
 handles, labels = plt.gca().get_legend_handles_labels()
 by_label = dict(zip(labels, handles))
 plt.legend(by_label.values(), by_label.keys())
 plt.ylabel('Count')
 plt.xlabel('Type of Review')
 plt.tick_params(
 axis='x', 
 which='both', 
 bottom=False, 
 top=False, 
 labelbottom=False) 
 plt.show()
 
plot_counts(len(pos_reviews),len(neg_reviews))
%matplot plt

Next, Let's inspect length of reviews using the pyspark.sql.functions module

In [None]:
from pyspark.sql.functions import length
reviewlengthDF = movie_reviews.select(length('review').alias('Length of Review')) 
reviewlengthDF.show() 

You can also execute SparkSQL queries using the %%sql magic and pass results to a local data frame using the `-o` option. This allows for quick data exploration. Max rows returned by default is 2500. You can set the max rows by using the `-n` argument. 

In [None]:
%%sql -o movie_reviews_sparksql_df -n 10
select * from movie_reviews 

You can access and explore the data in the dataframe locally

In [None]:
%%local 
movie_reviews_sparksql_df.head(10)

### Session logs (%%logs)

Instead of the SparkUI, you can also get the logs of your current Livy session to debug any issues you encounter.

In [None]:
%%logs

### Session information (%%info)

Livy is an open source REST server for Spark. When you execute a code cell in a sparkmagic notebook, it creates a Livy session to execute your code. `%%info` magic will display the current Livy session information.

In [None]:
%%info

## Terminating all Livy Sessions

You can terminate all livy sessions by using the `%%cleanup -f` command. Keep in mind that this will terminate all sessions, including ones used by other users or jobs that are connected to this cluster. To end your livy session, you can shut down the notebook kernel. 

In [None]:
%%cleanup -f