{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Building a Data Lake on AWS\n", "\n", "\n", "### Introduction\n", "In this workshop we are going to be get visualize sentiment of Yelp reviews from 2015 using a data set from the AWS Open Data Registry [Yelp Reviews NLP Fast.ai](https://registry.opendata.aws/fast-ai-nlp/). This will give you experience taking raw data and transforming it to provide new insights, optimize data sets in your data lake, and serverlessly visualize the results. We will start by uploading the Yelp review dataset and uploading it to S3, registering the raw data, and once registered transform the data to get only the columns necessary to run an NLP job on the reviews to get sentiment. We could build our own NLP model by leveraging [Amazon SageMaker](https://aws.amazon.com/sagemaker/), but in this workshop we will be using [Amazon Comprehend](https://aws.amazon.com/comprehend/) to get the sentiment of the reviews as an example of using the built-in APIs available from AWS. \n", "\n", "![Data Lake](../../docs/assets/images/yelp_dl.png)\n", "\n", "### Goals of this workshop\n", "1. Download review dataset from [Yelp Reviews NLP Fast.ai](https://registry.opendata.aws/fast-ai-nlp/).\n", "2. Register raw data set as a table with the [AWS Glue Data Catalog](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html).\n", "3. Run a pyspark [AWS Glue Job] to convert data set into [Parquet](https://parquet.apache.org/) and get review sentiment with [Amazon Comprehend](https://aws.amazon.com/comprehend/).\n", "4. Store transformed results in a new curated data set.\n", "5. Serverless query the optimzied data set with [Amazon Athena](https://aws.amazon.com/athena/)\n", "6. Provide visual insights of the results with [Amazon QuickSight](https://aws.amazon.com/quicksight/) or [Bokeh](https://bokeh.pydata.org/en/latest/)\n", "\n", "This notebook is inspired by the blog [How to scale sentiment analysis using Amazon Comprehend, AWS Glue and Amazon Athena\n", "](https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import botocore\n", "import json\n", "import time\n", "import os\n", "\n", "import project_path\n", "from lib import workshop\n", "\n", "glue = boto3.client('glue')\n", "s3 = boto3.resource('s3')\n", "s3_client = boto3.client('s3')\n", "\n", "session = boto3.session.Session()\n", "region = session.region_name\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "database_name = 'yelp' # AWS Glue Data Catalog Database Name\n", "raw_table_name = 'raw_reviews' # AWS Glue Data Catalog raw table name\n", "parquet_table_name = 'parq_reviews' # AWS Glue Data Catalog parquet table name\n", "open_data_bucket = 'fast-ai-nlp'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Download Yelp Reviews](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-download-file.html) \n", "\n", "We will download the reviews from the Fast.ai NLP dataset available on the [AWS Open Data Registry](https://registry.opendata.aws/fast-ai-nlp/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " s3.Bucket(open_data_bucket).download_file('yelp_review_full_csv.tgz', 'yelp_review_full_csv.tgz')\n", "except botocore.exceptions.ClientError as e:\n", " if e.response['Error']['Code'] == \"404\":\n", " print(\"The object does not exist.\")\n", " else:\n", " raise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Untar Yelp Reviews\n", "\n", "There are two `csv` files in the tarball. One is called `train.csv`, the other is `test.csv`. If we were to leverage this data set to build an new AI/ML model we would have cleaned and split data sets for both train and test. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!tar -xvzf yelp_review_full_csv.tgz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For those interested, The `readme.txt` file describes in more details the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cat yelp_review_full_csv/readme.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View raw csv file\n", "\n", "We will use [Pandas](https://pandas.pydata.org/) to read the csv and view the data set. You will notice the data contains 2 unnamed columns for rating and review. The rating is between 1-5 and the review is a free form text field." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "pd.set_option('display.max_colwidth', -1)\n", "\n", "df = pd.read_csv('yelp_review_full_csv/train.csv', header=None)\n", "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)\n", "\n", "We will create an S3 bucket that will be used throughout the workshop for storing our data.\n", "\n", "[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = workshop.create_bucket(region, session, 'datalake-')\n", "print(bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Upload to S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)\n", "\n", "Next, we will upload the json file created above to S3 to be used later in the workshop.\n", "\n", "[s3.upload_file](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "file_name = 'train.csv'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('yelp', 'raw', file_name)).upload_file('yelp_review_full_csv/'+file_name)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover the data in your Data Lake\n", "\n", "In this next section we will be using [AWS Glue](https://aws.amazon.com/glue/) to discover, catalog, and transform your data. Glue currently only supports `Python 2.7`, hence we'll write the script in `Python 2.7`.\n", "\n", "### Permission setup for invoking AWS Glue from this Notebook\n", "In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default execution role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to [IAM Dashboard](https://console.aws.amazon.com/iam/home) to edit the Role to add AWS Glue specific permission. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finding out the current execution role of the Notebook\n", "We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import SageMaker Python SDK to get the Session and execution_role\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "sess = sagemaker.Session()\n", "role = get_execution_role()\n", "role_name = role[role.rfind('/') + 1:]\n", "print(role_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Adding AWS Glue as an additional trusted entity to this role\n", "This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. \n", "\n", "If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. \n", "\n", "Click on **Edit trust relationship** and replace the JSON with this JSON.\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": [\n", " \"sagemaker.amazonaws.com\",\n", " \"glue.amazonaws.com\"\n", " ]\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "```\n", "Once this is complete, click on **Update Trust Policy** and you are done.\n", "\n", "![IAM Roles](../../docs/assets/images/iam_roles_hl.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}\".format(region, role_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)\n", "\n", "When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.\n", "\n", "There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`\n", "\n", "[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workshop.create_db(glue, account_id, database_name, 'Database for Yelp Reviews')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create the Raw table in Glue](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html)\n", "\n", "When you define a table in AWS Glue, you also specify the value of a classification field that indicates the type and format of the data that's stored in that table. If a crawler creates the table, these classifications are determined by either a built-in classifier or a custom classifier. If you create a table manually in the console or by using an API, you specify the classification when you define the table. For more information about creating a table using the AWS Glue console, see [Working with Tables on the AWS Glue Console](https://docs.aws.amazon.com/glue/latest/dg/console-tables.html).\n", "\n", "[glue.create_table](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_table)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "location = 's3://{0}/yelp/raw'.format(bucket)\n", "\n", "response = glue.create_table(\n", " CatalogId=account_id,\n", " DatabaseName=database_name,\n", " TableInput={\n", " 'Name': raw_table_name,\n", " 'Description': 'Raw Yelp reviews dataset',\n", " 'StorageDescriptor': {\n", " 'Columns': [\n", " {\n", " 'Name': 'rating',\n", " 'Type': 'tinyint',\n", " 'Comment': 'Rating of from the Yelp review'\n", " },\n", " {\n", " 'Name': 'review',\n", " 'Type': 'string',\n", " 'Comment': 'Review text of from the Yelp review'\n", " }\n", " ],\n", " 'Location': location,\n", " 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',\n", " 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',\n", " 'SerdeInfo': {\n", " 'SerializationLibrary': 'org.apache.hadoop.hive.serde2.OpenCSVSerde',\n", " 'Parameters': {\n", " 'escapeChar': '\\\\',\n", " 'separatorChar': ',',\n", " 'serialization.format': '1'\n", " }\n", " },\n", " },\n", " 'TableType': 'EXTERNAL_TABLE',\n", " 'Parameters': {\n", " 'classification': 'csv'\n", " }\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View Yelp Raw Reviews Data \n", "\n", "To see the raw Yelp reviews we will be installing a python library for querying the data in the Glue Data Catalog with Athena. More information about [PyAthena](https://pypi.org/project/PyAthena/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install PyAthena" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyathena import connect\n", "from pyathena.util import as_pandas\n", "\n", "cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/yelp/temp').cursor()\n", "cursor.execute('select * from ' + database_name + '.' + raw_table_name + ' limit 10')\n", "\n", "df = as_pandas(cursor)\n", "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Transform Raw data to provide insights and visualization\n", "\n", "### [Detect Sentiment](https://docs.aws.amazon.com/comprehend/latest/dg/how-sentiment.html)\n", "We are now going to transform the raw data using [PySpark](http://spark.apache.org/docs/latest/api/python/index.html) in an AWS Glue job to call Amazon Comprehend APIs to get sentiment analysis on the review, convert the data into parquet, and [partition](https://docs.aws.amazon.com/athena/latest/ug/partitions.html) by sentiment. This will allow us to optimize analytics queries when viewing data by sentiment and returning just the values we need leveraging the columnar format of parquet.\n", "\n", "The example below is the API call required to pass text into the Comprehend API to detect sentiment and the result returned.\n", "\n", "[client.detect_sentiment](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/comprehend.html#Comprehend.Client.detect_sentiment)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pprint as pp\n", "\n", "client = boto3.client('comprehend', region_name=region)\n", "\n", "response = client.detect_sentiment(Text='This API call is awesome!!! So easy to get sentiment of text!', LanguageCode='en')\n", "pp.pprint(response['SentimentScore'])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Uploading the code and other dependencies to S3 for AWS Glue\n", "In order to run your code in AWS Glue, we need to upload the code and dependencies directly to S3 and pass those locations while invoking the Glue job. We will write the ETL job using Jupyter Notebooks cell magic [%%writefile](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-writefile)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get Sentiment of Yelp Review Data\n", "\n", "We will create a Pyspark job to add primary key and run a batch of reviews through [Amazon Comprehend](https://aws.amazon.com/comprehend/) to get sentiment analysis of the reviews. Replace `{region}` with the region this notebook is running in.\n", "\n", "The job will limit the number of rows it converts due to timeliness of the workshop, but this code could be modified to run the entire data set.\n", "\n", "The key points in this code is how easy it is to get access to the AWS Glue Data Catalog leveraging the [Glue libraries](https://github.com/awslabs/aws-glue-libs).\n", "\n", "* [`glueContext.create_dynamic_frame.from_catalog`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog) - Read table metadata from the Glue Data Catalog using Glue libs to load tables into the job.\n", "* `yelpDF = yelp.toDF()` - Easy conversion from [Glue DynamicFrame](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html) to [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html) and vice-versa `joinedsink= DynamicFrame.fromDF(joinedDF, glueContext, \"joined\")`.\n", "* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:\n", " * [Partition](https://docs.aws.amazon.com/athena/latest/ug/partitions.html) the data based on columns `connection_options = {\"path\": parquet_output_path, \"partitionKeys\": [\"sentiment\"]}`\n", " * Convert data to a [columnar format](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) `format=\"parquet\"`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile yelp_etl.py\n", "\n", "import os\n", "import sys\n", "import boto3\n", "\n", "from awsglue.transforms import *\n", "from awsglue.utils import getResolvedOptions\n", "from pyspark.context import SparkContext\n", "from awsglue.context import GlueContext\n", "from awsglue.job import Job\n", "from awsglue.dynamicframe import DynamicFrame\n", "\n", "import pyspark.sql.functions as F\n", "from pyspark.sql import Row, Window, SparkSession\n", "from pyspark.sql.types import *\n", "from pyspark.conf import SparkConf\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.functions import *\n", "\n", "args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'TABLE_NAME', 'REGION'])\n", "\n", "sc = SparkContext()\n", "glueContext = GlueContext(sc)\n", "spark = glueContext.spark_session\n", "job = Job(glueContext)\n", "job.init(args['JOB_NAME'], args)\n", "\n", "# Covert Glue DynamocFrame to Spark DataFrame\n", "yelp = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name=args['TABLE_NAME'], transformation_ctx = \"datasource0\")\n", "yelpDF = yelp.toDF().select('rating', 'review')\n", "\n", "MIN_SENTENCE_LENGTH_IN_CHARS = 10 \n", "MAX_SENTENCE_LENGTH_IN_CHARS = 4500\n", "COMPREHEND_BATCH_SIZE = 25 ## This batch size results in groups no larger than 25 items\n", "NUMBER_OF_BATCHES = 40\n", "ROW_LIMIT = 10000 #Number of reviews we will process for this workshop\n", "\n", "## Each task handles 25*40 records, there should be 10 partitions overall to process 10000 records.\n", "ComprehendRow = Row(\"review\", \"rating\", \"sentiment\")\n", "\n", "def getBatchComprehend(input_list):\n", " arr = []\n", " bodies = [i[0] for i in input_list]\n", " client = boto3.client('comprehend',region_name=args['REGION'])\n", "\n", " def callApi(text_list):\n", " response = client.batch_detect_sentiment(TextList = text_list, LanguageCode = 'en')\n", " return response\n", " \n", " for i in range(NUMBER_OF_BATCHES):\n", " text_list = bodies[COMPREHEND_BATCH_SIZE * i : COMPREHEND_BATCH_SIZE * (i+1)]\n", " #response = client.batch_detect_sentiment(TextList = text_list, LanguageCode = 'en')\n", " response = callApi(text_list)\n", " for r in response['ResultList']:\n", " idx = COMPREHEND_BATCH_SIZE * i + r['Index']\n", " arr.append(ComprehendRow(input_list[idx][0], input_list[idx][1], r['Sentiment']))\n", " \n", " return arr\n", "\n", "# Grab a sample set of records with review size under Comprehend limits\n", "yelpDF = yelpDF \\\n", " .withColumn('review_len', F.length('review')) \\\n", " .filter(F.col('review_len') > MIN_SENTENCE_LENGTH_IN_CHARS) \\\n", " .filter(F.col('review_len') < MAX_SENTENCE_LENGTH_IN_CHARS) \\\n", " .limit(ROW_LIMIT)\n", "\n", "record_count = yelpDF.count()\n", "print('record count=' + str(record_count))\n", "\n", "yelpDF = yelpDF.repartition(record_count/(NUMBER_OF_BATCHES*COMPREHEND_BATCH_SIZE))\n", "\n", " ## Concatenate submission id and body tuples into arrays of similar size\n", "group_rdd = yelpDF.rdd.map(lambda l: (l.review.encode(\"utf-8\"), l.rating)).glom()\n", " \n", "transformed = group_rdd \\\n", " .map(lambda l: getBatchComprehend(l)) \\\n", " .flatMap(lambda x: x) \\\n", " .toDF()\n", "\n", "print(\"transformed count=\" + str(transformed.count()))\n", " \n", "transformedsink = DynamicFrame.fromDF(transformed, glueContext, \"joined\")\n", "parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])\n", "print(parquet_output_path)\n", "datasink5 = glueContext.write_dynamic_frame.from_options(frame = transformedsink, connection_type = \"s3\", connection_options = {\"path\": parquet_output_path, \"partitionKeys\": [\"sentiment\"]}, format=\"parquet\", transformation_ctx=\"datasink5\")\n", " \n", "job.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload the Yelp ETL script to S3\n", "We will be uploading the `github_etl.py` script to S3 now so that Glue can use it to run the PySpark job. You can replace it with your own script if needed. If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "script_location = sess.upload_data(path='yelp_etl.py', bucket=bucket, key_prefix='yelp/codes')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Output location of the data. The input data will be split, transformed, and \n", "# uploaded to output/train and output/validation\n", "s3_output_key_prefix = 'yelp/parquet/'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Calling Glue APIs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we'll be creating Glue client via Boto so that we can invoke the `create_job` API of Glue. `create_job` API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.\n", "\n", "`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).\n", "\n", "[glue.create_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "import time\n", "\n", "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "\n", "job_name = 'yelp-etl-' + timestamp_prefix\n", "response = glue.create_job(\n", " Name=job_name,\n", " Description='PySpark job to extract Yelp review sentiment analysis',\n", " Role=role, # you can pass your existing AWS Glue role here if you have used Glue before\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 1\n", " },\n", " Command={\n", " 'Name': 'glueetl',\n", " 'ScriptLocation': script_location\n", " },\n", " DefaultArguments={\n", " '--job-language': 'python',\n", " '--job-bookmark-option': 'job-bookmark-disable'\n", " },\n", " AllocatedCapacity=5,\n", " Timeout=60,\n", ")\n", "glue_job_name = response['Name']\n", "print(glue_job_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The aforementioned job will be executed now by calling `start_job_run` API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the `job_run_id` for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.\n", "\n", "[glue.start_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_id = glue.start_job_run(JobName=job_name,\n", " Arguments = {\n", " '--S3_OUTPUT_BUCKET': bucket,\n", " '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,\n", " '--DATABASE_NAME': database_name,\n", " '--TABLE_NAME': raw_table_name,\n", " '--REGION': region\n", " })['JobRunId']\n", "print(job_run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue Job status" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will check for the job status to see if it has `SUCCEEDED`, `FAILED` or `STOPPED`. Once the job is succeeded, we have the transformed data into S3 in Parquet format which we will use to query with Athena and visualize with QuickSight. If the job fails, you can go to AWS Glue console, click on **Jobs** tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under **Logs**) link for these jobs which can help you to see what exactly went wrong in the job execution.\n", "\n", "[glue.get_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", "while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):\n", " job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", " print (job_run_status)\n", " time.sleep(60)\n", "print(job_run_status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) to Discover the transformed data\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.\n", "\n", "A crawler can crawl both file-based and table-based data stores. Crawlers can crawl the following data stores:\n", "\n", "* Amazon Simple Storage Service (Amazon S3)\n", " * [Built-in Classifiers](https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html#classifier-built-in)\n", " * [Custom Classifiers](https://docs.aws.amazon.com/glue/latest/dg/custom-classifier.html)\n", "* Amazon Redshift\n", "* Amazon Relational Database Service (Amazon RDS)\n", " * Amazon Aurora\n", " * MariaDB\n", " * Microsoft SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "* Amazon DynamoDB\n", "* Publicly accessible databases [Blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/)\n", " * Aurora\n", " * MariaDB\n", " * SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "\n", "[glue.create_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parq_crawler_name = 'YelpCuratedCrawler'\n", "parq_crawler_path = 's3://{0}/yelp/parquet/'.format(bucket)\n", "\n", "response = glue.create_crawler(\n", " Name=parq_crawler_name,\n", " Role=role,\n", " DatabaseName=database_name,\n", " Description='Crawler for the Parquet Yelp Reviews with Sentiment',\n", " Targets={\n", " 'S3Targets': [\n", " {\n", " 'Path': parq_crawler_path\n", " }\n", " ]\n", " },\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'\n", " },\n", " TablePrefix='reviews_'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start the Glue Crawler\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.start_crawler(\n", " Name=parq_crawler_name\n", ")\n", "\n", "print (\"Parquet Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}\".format(region, parq_crawler_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue crawler status\n", "\n", "We will now monitor the crawler status waiting for it to get back into the `READY` state meaning the crawler completed it's crawl. You can also look at the [CloudWatch logs](https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html#console-crawlers-details) for the crawler for more details." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", "while crawler_status not in ('READY'):\n", " crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", " print(crawler_status)\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View transformed results\n", "\n", "We will again us the PyAthena library to run queries against the newly created data set with sentiment result and in parquet format. In the interest of time, we will be using the [Bokeh](https://bokeh.pydata.org/en/latest/) within the notebook to visualize the results instead of [Amazon QuickSight](https://aws.amazon.com/quicksight/). QuickSight is able to use the same Athena queries to visualize the results as well as numerous [built-in connectors](https://docs.aws.amazon.com/quicksight/latest/user/supported-data-sources.html) to many datasources." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor.execute('select rating, review, sentiment from yelp.reviews_parquet')\n", "\n", "df = as_pandas(cursor)\n", "df.head(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Group the data in the DataFrame by Sentiment\n", "\n", "Using Pandas DataFrame functionality we will do the groupby locally. Alternatively we could have used the built-in [SQL and Aggregate functions](https://docs.aws.amazon.com/athena/latest/ug/functions-operators-reference-section.html) in Athena to achieve the same result." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "group = df.groupby(('sentiment'))\n", "group.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize results\n", "\n", "The Bokeh framework has a number of built-in visualizations. For more information check out the [Bokeh Notebook](https://hub.mybinder.org/user/bokeh-bokeh-notebooks-6wusjpsu/notebooks/tutorial/00%20-%20Introduction%20and%20Setup.ipynb). In this example, we will be comparing the counts of Sentiment vs. Rating in the Yelp dataset using Bar Charts to display the results. Bokeh has great support for Jupyter Notebooks and provides the ability to display the output in notebook cells.\n", "\n", "#### Visualize by Sentiment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from bokeh.io import show, output_notebook\n", "from bokeh.plotting import figure\n", "from bokeh.palettes import Spectral5\n", "from bokeh.transform import factor_cmap\n", "from bokeh.models import ColumnDataSource\n", "\n", "output_notebook()\n", "\n", "source = ColumnDataSource(group)\n", "\",\".join(source.column_names)\n", "\n", "sent_cmap = factor_cmap('sentiment', palette=Spectral5, factors=sorted(df.sentiment.unique()))\n", "\n", "p = figure(plot_height=350, x_range=group)\n", "p.vbar(x='sentiment', top='rating_count', width=1, line_color=\"white\", \n", " fill_color=sent_cmap, source=source)\n", "\n", "p.xgrid.grid_line_color = None\n", "p.xaxis.axis_label = \"Sentiment\"\n", "p.yaxis.axis_label = \"Count\"\n", "p.y_range.start = 0\n", "\n", "show(p)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Visualize by Rating\n", "\n", "We will now compare what the Comprehend API came up with compared to the user rating in the data set. We are changing the group by in the dataframe to change the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "group = df.groupby(('rating'))\n", "group.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Visualize by User Rating" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "source = ColumnDataSource(group)\n", "\",\".join(source.column_names)\n", "\n", "rating_cmap = factor_cmap('rating', palette=Spectral5, factors=sorted(df.rating.unique()))\n", "\n", "p = figure(plot_height=350, x_range=group)\n", "p.vbar(x='rating', top='review_count', width=1, line_color=\"white\", \n", " fill_color=rating_cmap, source=source)\n", "\n", "p.xgrid.grid_line_color = None\n", "p.xaxis.axis_label = \"Rating\"\n", "p.yaxis.axis_label = \"Count\"\n", "p.y_range.start = 0\n", "\n", "show(p)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Amazon QuickSight](https://aws.amazon.com/quicksight/) visualization\n", "\n", "**Optional**\n", "We can also visualize the data in [Amazon QuickSight](https://aws.amazon.com/quicksight/). You can follow allow with the [Getting Started](https://docs.aws.amazon.com/quicksight/latest/user/getting-started.html) guide for QuickSight to setup your account." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.quicksight.aws.amazon.com/sn/start?#'.format(region))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Manage Data sets\n", "\n", "Once your account it created you will start by clicking the `Manage Data` button to create a new dataset that uses Amazon Athena to query the data. Click the `New data set` button and select the `Athena` data source, name the data source, and choose the `yelp` Glue database and `reviews_parquet` table. Finish creation by clicking the `Create data source` button. QuickSight supports a number of [Data Connectors](https://docs.aws.amazon.com/quicksight/latest/user/supported-data-sources.html). \n", "\n", "![QS Manage Data](../../docs/assets/images/qs_manage_data.png)\n", "\n", "***\n", "\n", "#### [Create New Data Set from S3](https://docs.aws.amazon.com/quicksight/latest/user/create-a-data-set-s3.html)\n", "\n", "To create a data set using one or more text files (.csv, .tsv, .clf, or .elf) from Amazon S3, create a manifest that Amazon QuickSight can use to identify the files that you want to use, and also the upload settings needed to import them. When you create a data set using Amazon S3, the file data is automatically imported into [SPICE](https://docs.aws.amazon.com/quicksight/latest/user/welcome.html#spice).\n", "\n", "![QS Manage Data](../../docs/assets/images/qs_new_dataset.png)\n", "\n", "#### [Creating a Data Set Using Amazon Athena Data](https://docs.aws.amazon.com/quicksight/latest/user/create-a-data-set-athena.html)\n", "\n", "You can connect to Amazon Athena data sources and use Athena data to create Amazon QuickSight data sets.\n", "\n", "Before you try to read files from Amazon S3 buckets, make sure that you grant Amazon QuickSight access to them. For more information, see [Managing Amazon QuickSight Permissions to AWS Resources](https://docs.aws.amazon.com/quicksight/latest/user/managing-permissions.html).\n", "\n", "In the `Data source name` textbox enter the name `yelp_reviews` and click `Create data source`.\n", "\n", "![QS Manage Data](../../docs/assets/images/qs_athena_ds.png)\n", "\n", "#### Chose the Glue table for Athena\n", "\n", "Next, you will be selecting the `yelp` database we created in the Glue Data Catalog and the `reviews_parquet` table.\n", "\n", "![QS Manage Data](../../docs/assets/images/qs_choose_table.png)\n", "\n", "#### Edit the data set\n", "\n", "![QS Edit Data Set](../../docs/assets/images/qs_data_edit.png)\n", "\n", "#### Visualize!\n", "\n", "![QS Edit Data Set](../../docs/assets/images/qs_visual.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "We will finish this workshop by removing the resources created in this notebook. Ensure each cell completes successfully to verify all resources have been removed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_crawler(Name=parq_crawler_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_job(JobName=glue_job_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_database(\n", " CatalogId = account_id,\n", " Name = database_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workshop.delete_bucket_completely(bucket)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }