# Visualize full Amazon Customer Reviews Dataset with Spark and AWS Glue Interactive Sessions

In this notebook, we will query the full reviews data using AWS Glue interactive sessions with PySpark. With [AWS Glue](https://aws.amazon.com/glue/) interactive sessions, you can rapidly build, test, and run data preparation and analytics applications. Interactive sessions provides a programmatic interface for building and testing extract, transform, and load (ETL) scripts for data preparation. This is a powerful tool for data engineers and scientists who are doing interactive development on large datasets which require distributed computing because they do not need to manage infrastructure for Spark clusters.

In the data science lifecycle, exploratory data analysis (EDA) is an important step for data scientists and engineers to gain an intuitive understanding of their datasets. This intuitive understanding then guides decisions which come further in the ML lifecycle like model training, deployment, and monitoring. In this notebook, we will use PySpark and Spark SQL to do EDA and associated visualizations of the Amazon reviews dataset.

**Please Note: if you are running this workshop on your own AWS Account (not an AWS lead workshop), you will need to update your SageMaker execution role IAM permissions via the documentation [here](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-notebooks-glue.html).**

![glue-interactive-sessions](./img/gis.png)

<a name='1'></a>
## Set up Kernel and Required Dependencies

First, check that the correct kernel is chosen.

<img src="img/gis_kernel_set_up.png" width="300"/>

You can click on that to see and check the details of the image, kernel, and instance type.

<img src="img/gis_kernel_and_instance_type.png" width="600"/>

# Dataset Column Descriptions

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.
- `year`: The year derived from the review date.

In [None]:
%stop_session

In [None]:
%additional_python_modules seaborn,psutil
%number_of_workers 10

In [None]:
spark

# Set Seaborn Parameters

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns

import matplotlib.pyplot as plt

sns.set_style = "seaborn-whitegrid"

sns.set(
    rc={
        "font.style": "normal",
        "axes.facecolor": "white",
        "grid.color": ".8",
        "grid.linestyle": "-",
        "figure.facecolor": "white",
        "figure.titlesize": 20,
        "text.color": "black",
        "xtick.color": "black",
        "ytick.color": "black",
        "axes.labelcolor": "black",
        "axes.grid": True,
        "axes.labelsize": 10,
        "xtick.labelsize": 10,
        "font.size": 10,
        "ytick.labelsize": 10,
    }
)

# Helper Code to Display Values on Bars

In [None]:
def show_values_barplot(axs, space):
    def _show_on_plot(ax):
        for p in ax.patches:
            _x = p.get_x() + p.get_width() + float(space)
            _y = p.get_y() + p.get_height()
            value = round(float(p.get_width()), 2)
            ax.text(_x, _y, value, ha="left")

    if isinstance(axs, np.ndarray):
        for idx, ax in np.ndenumerate(axs):
            _show_on_plot(ax)
    else:
        _show_on_plot(axs)

# Set Glue Database and Table names to query

In [None]:
database_name = "default"
table_name = "amazon_reviews_parquet"

# 1. Which Product Categories are Highest Rated by Average Rating?
## _This query takes a minute or two.  Please be patient._

In [None]:
df = spark.sql("""SELECT product_category, AVG(star_rating) AS avg_star_rating 
    FROM {}.{} 
    GROUP BY product_category 
    ORDER BY avg_star_rating DESC""".format(database_name, table_name)
)
df.show(50, truncate=False)

In [None]:
# Store number of categories for later
num_categories = df.toPandas().shape[0]

# Store average star ratings for later
average_star_ratings = df.toPandas()

## Visualization

In [None]:
plt.clf()

# Create plot
barplot = sns.barplot(y="product_category", x="avg_star_rating", data=df.toPandas(), saturation=1)

if num_categories < 10:
    sns.set(rc={"figure.figsize": (10.0, 5.0)})

# Set title and x-axis ticks
plt.title("Average Rating by Product Category")
plt.xticks([1, 2, 3, 4, 5], ["1-Star", "2-Star", "3-Star", "4-Star", "5-Star"])

# Helper code to show actual values afters bars
show_values_barplot(barplot, 0.1)

plt.xlabel("Average Rating")
plt.ylabel("Product Category")

# Export plot if needed
plt.tight_layout()
# plt.savefig('avg_ratings_per_category.png', dpi=300)

# Show graphic
plt.show()

%matplot plt

# 2. Which Product Categories Have the Most Reviews?
## _This query takes a minute or two.  Please be patient._

Note that in this cell, you are using PySpark syntax to process the data in your Glue table while the previous section was using Spark SQL.

In [None]:
from pyspark.sql import functions as f
df_raw = spark.sql("SELECT * FROM {}.{}".format(database_name, table_name))
df = (
    df_raw
    .select("product_category", "star_rating")
    .groupBy("product_category")
    .agg(f.count("star_rating").alias("count_star_rating"))
    .orderBy("count_star_rating", ascending=False)
)

df.show(50, truncate=False)

In [None]:
# Store counts for later
count_ratings = df.toPandas()["count_star_rating"]

# Store max ratings for later
max_ratings = df.toPandas()["count_star_rating"].max()

## Visualization

In [None]:
plt.clf()

# Create Seaborn barplot
barplot = sns.barplot(y="product_category", x="count_star_rating", data=df.toPandas(), saturation=1)

if num_categories < 10:
    sns.set(rc={"figure.figsize": (10.0, 5.0)})

# Set title
plt.title("Number of Ratings per Product Category for Subset of Product Categories")

# Set x-axis ticks to match scale
if max_ratings > 200000:
    plt.xticks([100000, 1000000, 5000000, 10000000, 15000000, 20000000], ["100K", "1m", "5m", "10m", "15m", "20m"])
    plt.xlim(0, 20000000)
elif max_ratings <= 200000:
    plt.xticks([50000, 100000, 150000, 200000], ["50K", "100K", "150K", "200K"])
    plt.xlim(0, 200000)

plt.xlabel("Number of Ratings")
plt.ylabel("Product Category")

plt.tight_layout()

# Export plot if needed
# plt.savefig('ratings_per_category.png', dpi=300)

# Show the barplot
plt.show()

%matplot plt

# 3. When did each product category become available in the Amazon catalog based on the date of the first review?
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT product_category, MIN(year) AS first_review_year
    FROM {}.{}
    WHERE year >= 1995
    GROUP BY product_category
    ORDER BY first_review_year 
""".format(database_name, table_name)
)

df.show(50, truncate=False)

In [None]:
def get_x_y(df):
    """ Get X and Y coordinates; return tuple """
    series = df["first_review_year"].value_counts().sort_index()
    # new_series = series.reindex(range(1,21)).fillna(0).astype(int)
    return series.index, series.values

In [None]:
X, Y = get_x_y(df.toPandas())
print(X)
print(Y)

## Visualization

In [None]:
plt.clf()

fig = plt.figure(figsize=(12, 5))
ax = plt.gca()

ax.set_title("Number Of First Product Category Reviews Per Year for Subset of Categories")
ax.set_xlabel("Year")
ax.set_ylabel("Count")


ax.plot(X, Y, color="black", linewidth=2, marker="o")
ax.fill_between(X, [0] * len(X), Y, facecolor="lightblue")

ax.locator_params(integer=True)

ax.set_xticks(range(1995, 2016, 1))
ax.set_yticks(range(0, max(Y) + 2, 1))

plt.xticks(rotation=45)

# fig.savefig('first_reviews_per_year.png', dpi=300)
plt.show()

%matplot plt

# 4. What is the breakdown of ratings (1-5) per product category?
## _This query takes a minute or two.  Please be patient._


In [None]:
# SQL statement
df = spark.sql("""
    SELECT product_category, star_rating, COUNT(*) AS count_reviews
    FROM {}.{}
    GROUP BY  product_category, star_rating
    ORDER BY  product_category ASC, star_rating DESC, count_reviews
""".format(database_name, table_name)
)

df.show(50, truncate=False)

## Prepare for Stacked Percentage Horizontal Bar Plot Showing Proportion of Star Ratings per Product Category

In [None]:
# Create grouped DataFrames by category and by star rating
grouped_category = df.toPandas().groupby("product_category")
grouped_star = df.toPandas().groupby("star_rating")

# Create sum of ratings per star rating
df_sum = df.toPandas().groupby(["star_rating"]).sum()

# Calculate total number of star ratings
total = df_sum["count_reviews"].sum()
print(total)

In [None]:
# Create dictionary of product categories and array of star rating distribution per category
distribution = {}
count_reviews_per_star = []
i = 0

for category, ratings in grouped_category:
    count_reviews_per_star = []
    for star in ratings["star_rating"]:
        count_reviews_per_star.append(ratings.at[i, "count_reviews"])
        i = i + 1
    distribution[category] = count_reviews_per_star

# Check if distribution has been created succesfully
print(distribution)

In [None]:
# Sort distribution by average rating per category
sorted_distribution = {}

average_star_ratings.iloc[:, 0]
for index, value in average_star_ratings.iloc[:, 0].items():
    sorted_distribution[value] = distribution[value]

In [None]:
df_sorted_distribution_pct = pd.DataFrame(sorted_distribution).transpose().apply(
    lambda num_ratings: num_ratings/sum(num_ratings)*100, axis=1
)
df_sorted_distribution_pct.columns=['5', '4', '3', '2', '1']
df_sorted_distribution_pct

## Visualization

In [None]:
plt.clf()

categories = df_sorted_distribution_pct.index

# Plot bars
if len(categories) > 10:
    plt.figure(figsize=(10,10))
else: 
    plt.figure(figsize=(10,5))

df_sorted_distribution_pct.plot(kind="barh", 
                                stacked=True, 
                                edgecolor='white',
                                width=1.0,
                                color=['green', 
                                       'orange', 
                                       'blue', 
                                       'purple', 
                                       'red'])

plt.title("Distribution of Reviews Per Rating Per Category", 
          fontsize='16')

plt.legend(bbox_to_anchor=(1.04,1), 
           loc="upper left",
           labels=['5-Star Ratings', 
                   '4-Star Ratings', 
                   '3-Star Ratings', 
                   '2-Star Ratings', 
                   '1-Star Ratings'])

plt.xlabel("% Breakdown of Star Ratings", fontsize='14')
plt.gca().invert_yaxis()
plt.tight_layout()

plt.show()

%matplot plt

# 5. How Many Reviews per Star Rating? (5, 4, 3, 2, 1)
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT star_rating, COUNT(*) AS count_reviews
    FROM {}.{}
    GROUP BY star_rating
    ORDER BY star_rating DESC, count_reviews 
""".format(database_name, table_name)
)

df.show()

In [None]:
plt.clf()

chart = df.toPandas().plot.bar(
    x="star_rating", y="count_reviews", rot="0", figsize=(10, 5), title="Review Count by Star Ratings", legend=False
)

plt.xlabel("Star Rating")
plt.ylabel("Review Count")

plt.show()

%matplot plt

# 6. How Did Star Ratings Change Over Time?
Is there a drop-off point for certain product categories throughout the year?

## Average Star Rating Across All Product Categories
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT year, ROUND(AVG(star_rating),4) AS avg_rating
    FROM {}.{}
    WHERE year >= 1995
    GROUP BY year
    ORDER BY year
""".format(database_name, table_name)
)

df.show(50, truncate=False)

In [None]:
df.toPandas()["year"] = pd.to_datetime(df.toPandas()["year"], format="%Y").dt.year

## Visualization

In [None]:
plt.clf()

fig = plt.gcf()
fig.set_size_inches(12, 5)

fig.suptitle("Average Star Rating Over Time (Across Subset of Product Categories)")

ax = plt.gca()
# ax = plt.gca().set_xticks(df['year'])
ax.locator_params(integer=True)
ax.set_xticks(df.toPandas()["year"].unique())

df.toPandas().plot(kind="line", x="year", y="avg_rating", color="red", ax=ax)

# plt.xticks(range(1995, 2016, 1))
# plt.yticks(range(0,6,1))
plt.xlabel("Years")
plt.ylabel("Average Star Rating")
plt.xticks(rotation=45)

# fig.savefig('average-rating.png', dpi=300)
plt.show()

%matplot plt

## Average Star Rating Per Product Categories Across Time
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT product_category, year, ROUND(AVG(star_rating), 4) AS avg_rating_category
    FROM {}.{}
    WHERE year >= 1995
    GROUP BY product_category, year
    ORDER BY year 
""".format(database_name, table_name)
)

df.show(50, truncate=False)

## Visualization

In [None]:
def plot_categories(df):
    df_categories = df["product_category"].unique()
    for category in df_categories:
        # print(category)
        df_plot = df.loc[df["product_category"] == category]
        df_plot.plot(
            kind="line",
            x="year",
            y="avg_rating_category",
            c=np.random.rand(
                3,
            ),
            ax=ax,
            label=category,
        )

In [None]:
plt.clf()

fig = plt.gcf()
fig.set_size_inches(12, 5)

fig.suptitle("Average Star Rating Over Time Across Subset Of Categories")

ax = plt.gca()

ax.locator_params(integer=True)
ax.set_xticks(df.toPandas()["year"].unique())

plot_categories(df.toPandas())

plt.xlabel("Year")
plt.ylabel("Average Star Rating")
plt.legend(bbox_to_anchor=(0, -0.15, 1, 0), loc=2, ncol=2, mode="expand", borderaxespad=0)

# fig.savefig('average_rating_category_all_data.png', dpi=300)
plt.show()

%matplot plt

# 7. Which Star Ratings (1-5) are Most Helpful?
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT star_rating, AVG(helpful_votes) AS avg_helpful_votes
    FROM {}.{}
    GROUP BY  star_rating
    ORDER BY  star_rating ASC
""".format(database_name, table_name)
)

df.show()

## Visualization

In [None]:
plt.clf()

chart = df.toPandas().plot.bar(
    x="star_rating", y="avg_helpful_votes", rot="0", figsize=(10, 5), title="Helpfulness Of Star Ratings", legend=False
)

plt.xlabel("Star Rating")
plt.ylabel("Average Helpful Votes")

plt.show()

%matplot plt

# 8. Which Products have Most Helpful Reviews?  How Long are the Most Helpful Reviews?
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT product_title, helpful_votes, star_rating,
           LENGTH(review_body) AS review_body_length,
           SUBSTR(review_body, 1, 100) AS review_body_substr
    FROM {}.{}
    ORDER BY helpful_votes DESC LIMIT 10 
""".format(database_name, table_name)
)

df.show(50, truncate=False)

# 9. What is the Ratio of Positive (5, 4) to Negative (3, 2 ,1) Reviews?
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT (CAST(positive_review_count AS DOUBLE) / CAST(negative_review_count AS DOUBLE)) AS positive_to_negative_sentiment_ratio
    FROM (
      SELECT count(*) AS positive_review_count
      FROM {}.{}
      WHERE star_rating >= 4

    ), (
      SELECT count(*) AS negative_review_count
      FROM {}.{}
      WHERE star_rating < 4 
    )
""".format(database_name, table_name, database_name, table_name)
)

df.show()

# 10. Which Customers are Abusing the Review System by Repeatedly Reviewing the Same Product?  What Was Their Average Star Rating for Each Product?
## _This query takes a minute or two.  Please be patient._

In [None]:
# SQL statement
df = spark.sql("""
    SELECT customer_id, product_category, product_title, 
    ROUND(AVG(star_rating),4) AS avg_star_rating, COUNT(*) AS review_count 
    FROM {}.{} 
    GROUP BY customer_id, product_category, product_title 
    HAVING COUNT(*) > 1 
    ORDER BY review_count DESC
    LIMIT 50
""".format(database_name, table_name)
)

df.show(50, truncate=False)

In [None]:
%stop_session

# Conclusions

Through this notebook, we have seen how you can use AWS Glue interactive sessions in SageMaker Studio to process large datasets at scale for ETL and EDA tasks. Benefits of this development paradigm include...

1. natively distributed processing which can scale with your datasets 
2. no need to manage cluster infrastructure
3. integration with SageMaker studio for a one stop shop for the ML lifecycle

# Release Resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>