# Keep Calm and Parquet

In this workshop we will be leveraging a number of analytics tools to show the diversity of the AWS platform. We will walk through querying unoptimized csv files and converting them to Parquet to improve performance. We also want to show how you can access data in your data lake with Redshift, Athena, and EMR giving you freedom of choice to choose the right tool for the job keeping a single source of truth of your data in S3.

![Modern Data Lake](../../docs/assets/images/modern-datalake.png)

In [None]:
import boto3
import botocore
import json
import time
import os
import getpass

import project_path # path to helper methods
from lib import workshop
from pandas import read_sql

glue = boto3.client('glue')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
cfn = boto3.client('cloudformation')
redshift_client = boto3.client('redshift')
ec2_client = boto3.client('ec2')

session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

database_name = 'taxi' # AWS Glue Data Catalog Database Name
redshift_database_name = 'taxidb'
environment_name = 'taxi-workshop'
table_name = 'yellow'
redshift_node_type = 'ds2.xlarge'
redshift_port=5439

use_existing = True

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'taxi-')
print(bucket)

### [Copy Sample Data to S3 bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-download-file.html) 

We will download some files from New York City Taxi and Limousine Commission (TLC) Trip Record Data dataset available on the [AWS Open Data Registry](https://registry.opendata.aws/nyc-tlc-trip-records-pds/).


In [None]:
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-01.csv s3://$bucket/datalake/yellow/
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-02.csv s3://$bucket/datalake/yellow/


### [Upload to S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)

Next, we will upload the json file created above to S3 to be used later in the workshop.

[s3.upload_file](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file) boto3 documentation

In [None]:
file_name = 'paymenttype.csv'
session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'paymenttype', file_name)).upload_file(file_name)

file_name = 'ratecode.csv'
session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'ratecode', file_name)).upload_file(file_name)

file_name = 'taxi_zone_lookup.csv'
session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'taxi_zone_lookup', file_name)).upload_file(file_name)


### [Create VPC](https://docs.aws.amazon.com/vpc/index.html) 

We need a VPC for some of the resources in this workshop. You have the option to create a brand new VPC or use the VPC flaged as the default.

In [None]:
if use_existing:
 vpc_filter = [{'Name':'isDefault', 'Values':['true']}]
 default_vpc = ec2_client.describe_vpcs(Filters=vpc_filter)
 vpc_id = default_vpc['Vpcs'][0]['VpcId']

 subnet_filter = [{'Name':'vpc-id', 'Values':[vpc_id]}]
 subnets = ec2_client.describe_subnets(Filters=subnet_filter)
 subnet1_id = subnets['Subnets'][0]['SubnetId']
 subnet2_id = subnets['Subnets'][1]['SubnetId']
else: 
 vpc, subnet1, subnet2 = workshop.create_and_configure_vpc()
 vpc_id = vpc.id
 subnet1_id = subnet1.id
 subnet2_id = subnet2.id

In [None]:
print(vpc_id)
print(subnet1_id)
print(subnet2_id)

### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template

In the interest of time we will leverage CloudFormation to launch EMR and Redshift instances to leverage on the analytics side after we have cataloged and transformed the data.

In [None]:
redshift_file = 'cfn/redshift.yaml'
session.resource('s3').Bucket(bucket).Object(redshift_file).upload_file(redshift_file)

In [None]:
emr_file = 'cfn/emr.yaml'
session.resource('s3').Bucket(bucket).Object(emr_file).upload_file(emr_file)

### Enter the user name used for the Redshift Cluster

In [None]:
admin_user = getpass.getpass()

### Enter the password used in creating the Redshift Cluster

In [None]:
#Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower
admin_password = getpass.getpass()

In [None]:
import re

pattern = re.compile(r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[a-zA-Z\d]{8,}$")
result = pattern.match(admin_password)
if result:
 print('Valid')
else:
 print('Invalid, Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower')

### Execute CloudFormation Stack to generate Redshift Data Warehouse

Later in the workshop we will be using this [Redshift](https://aws.amazon.com/redshift/) cluster to run queries over data populated in our data lake with [Redshift Spectrum](https://aws.amazon.com/blogs/big-data/amazon-redshift-spectrum-extends-data-warehousing-out-to-exabytes-no-loading-required/).

In [None]:
cfn_template = 'https://s3-{0}.amazonaws.com/{1}/{2}'.format(region, bucket, redshift_file)
print(cfn_template)

redshift_stack_name = 'RedshiftTaxiStack'
response = cfn.create_stack(
 StackName=redshift_stack_name,
 TemplateURL=cfn_template,
 Capabilities = ["CAPABILITY_NAMED_IAM"],
 Parameters=[
 {
 'ParameterKey': 'EnvironmentName',
 'ParameterValue': environment_name
 },
 {
 'ParameterKey': 'AdministratorUser',
 'ParameterValue': admin_user
 },
 {
 'ParameterKey': 'AdministratorPassword',
 'ParameterValue': admin_password
 },
 {
 'ParameterKey': 'DatabaseName',
 'ParameterValue': redshift_database_name
 },
 {
 'ParameterKey': 'NodeType',
 'ParameterValue': redshift_node_type
 },
 {
 'ParameterKey': 'S3Bucket',
 'ParameterValue': bucket
 }
 ]
)

print(response)

### Execute CloudFormation Stack to generate EMR Cluster

We will also be querying data in the Data Lake from [EMR](https://aws.amazon.com/emr/) as well through the use of an [EMR Notebook](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks.html).

In [None]:
cfn_template = 'https://s3-{0}.amazonaws.com/{1}/{2}'.format(region, bucket, emr_file)
print(cfn_template)
emr_stack_name = 'EMRTaxiStack'

response = cfn.create_stack(
 StackName=emr_stack_name,
 TemplateURL=cfn_template,
 Capabilities = ["CAPABILITY_NAMED_IAM"],
 Parameters=[
 {
 'ParameterKey': 'EnvironmentName',
 'ParameterValue': environment_name
 },
 {
 'ParameterKey': 'VPC',
 'ParameterValue': vpc_id
 },
 {
 'ParameterKey': 'PublicSubnet',
 'ParameterValue': subnet1_id
 },
 {
 'ParameterKey': 'OutputS3Bucket',
 'ParameterValue': bucket
 }
 ]
)

print(response)

### Discover the data in your Data Lake

In this next section we will be using [AWS Glue](https://aws.amazon.com/glue/) to discover, catalog, and transform your data. Glue currently only supports `Python 2.7`, hence we'll write the script in `Python 2.7`.

### Permission setup for invoking AWS Glue from this Notebook
In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default execution role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to [IAM Dashboard](https://console.aws.amazon.com/iam/home) to edit the Role to add AWS Glue specific permission. 

### Finding out the current execution role of the Notebook
We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue.

In [None]:
# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
role_name = role[role.rfind('/') + 1:]
print(role_name)


### Adding AWS Glue as an additional trusted entity to this role
This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. 

If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step.

On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. 

Click on **Edit trust relationship** and replace the JSON with this JSON.
```
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": [
 "sagemaker.amazonaws.com",
 "glue.amazonaws.com"
 ]
 },
 "Action": "sts:AssumeRole"
 }
 ]
}
```
Once this is complete, click on **Update Trust Policy** and you are done.

![IAM Roles](../../docs/assets/images/iam_roles_hl.png)

In [None]:
print("https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}".format(region, role_name))

### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)

When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.

There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`

[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)

In [None]:
workshop.create_db(glue, account_id, database_name, 'New York City Taxi and Limousine Commission (TLC) Trip Record Data')

### Use a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) to Discover the transformed data

You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.

A crawler can crawl both file-based and table-based data stores. Crawlers can crawl the following data stores:

* Amazon Simple Storage Service (Amazon S3)
 * [Built-in Classifiers](https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html#classifier-built-in)
 * [Custom Classifiers](https://docs.aws.amazon.com/glue/latest/dg/custom-classifier.html)
* Amazon Redshift
* Amazon Relational Database Service (Amazon RDS)
 * Amazon Aurora
 * MariaDB
 * Microsoft SQL Server
 * MySQL
 * Oracle
 * PostgreSQL
* Amazon DynamoDB
* Publicly accessible databases [Blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/)
 * Aurora
 * MariaDB
 * SQL Server
 * MySQL
 * Oracle
 * PostgreSQL

[glue.create_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler)

In [None]:
crawler_name = 'NY-Taxi-Crawler'
crawler_path = 's3://{0}/datalake/'.format(bucket)

response = glue.create_crawler(
 Name=crawler_name,
 Role=role,
 DatabaseName=database_name,
 Description='Crawler for NY Taxi Data',
 Targets={
 'S3Targets': [
 {
 'Path': crawler_path
 }
 ]
 },
 SchemaChangePolicy={
 'UpdateBehavior': 'UPDATE_IN_DATABASE',
 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
 }
)

### Start the Glue Crawler

You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.

[glue.start_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_crawler)

In [None]:
response = glue.start_crawler(
 Name=crawler_name
)

print ("Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}".format(region, crawler_name))

### Checking Glue crawler status

We will now monitor the crawler status waiting for it to get back into the `READY` state meaning the crawler completed it's crawl. You can also look at the [CloudWatch logs](https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html#console-crawlers-details) for the crawler for more details.

In [None]:
crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']

while crawler_status not in ('READY'):
 crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']
 print(crawler_status)
 time.sleep(30)

### View Crawler Results

Now that we have crawled the raw data available, we want to look at the results of the crawl to see the tables that were created. You will click on the link `Tables in taxi` to view the tables the crawler found. It will look like the image below:

![Taxi Tables](../../docs/assets/images/glue-taxi-tables.png)

In [None]:
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

### Create Parquet version of the yellow CSV table

From [Wikipedia](https://en.wikipedia.org/wiki/Apache_Parquet), "Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk."

The key points in this code is how easy it is to get access to the AWS Glue Data Catalog leveraging the [Glue libraries](https://github.com/awslabs/aws-glue-libs). Some of the key concepts are below:

* [`glueContext.create_dynamic_frame.from_catalog`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog) - Read table metadata from the Glue Data Catalog using Glue libs to load tables into the pyspark job.
* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:
 * Convert data to different format `format="parquet"`. This format is [columnar](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) and provides [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default.
 
You can find more best practices for Glue and Athena [here](https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html)

In [None]:
%%writefile yellow_parquet_etl.py

import sys
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'TABLE_NAME', 'REGION'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "taxi", table_name = "yellow", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name=args['TABLE_NAME'], transformation_ctx = "datasource0")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice1"]
## @return: resolvechoice1
## @inputs: [frame = datasource0]
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields2"]
## @return: dropnullfields2
## @inputs: [frame = resolvechoice1]
dropnullfields2 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields2")

parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])
print(parquet_output_path)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": ""}, format = "parquet", transformation_ctx = "datasink3"]
## @return: datasink3
## @inputs: [frame = dropnullfields2]
datasink3 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields2, connection_type = "s3", connection_options = {"path": parquet_output_path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

### Upload the ETL script to S3
We will be uploading the `yellow_parquet_etl` script to S3 so Glue can use it to run the PySpark job. You can replace it with your own script if needed. If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here.

In [None]:
script_location = sess.upload_data(path='yellow_parquet_etl.py', bucket=bucket, key_prefix='codes')

In [None]:
# Output location of the data.
s3_output_key_prefix = 'datalake/yellow_parquet/'

### [Authoring jobs with AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/author-job.html)

Next we'll be creating Glue client via Boto so that we can invoke the `create_job` API of Glue. `create_job` API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.

`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).

[glue.create_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job)

In [None]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

job_name = 'ny-yellow-parquet-' + timestamp_prefix
response = glue.create_job(
 Name=job_name,
 Description='PySpark job to convert yellow taxi csv data to parquet',
 Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
 ExecutionProperty={
 'MaxConcurrentRuns': 1
 },
 Command={
 'Name': 'glueetl',
 'ScriptLocation': script_location
 },
 DefaultArguments={
 '--job-language': 'python',
 '--job-bookmark-option': 'job-bookmark-disable'
 },
 AllocatedCapacity=5,
 Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

The aforementioned job will be executed now by calling `start_job_run` API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the `job_run_id` for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.

[glue.start_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_job_run)

In [None]:
job_run_id = glue.start_job_run(JobName=job_name,
 Arguments = {
 '--S3_OUTPUT_BUCKET': bucket,
 '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
 '--DATABASE_NAME': database_name,
 '--TABLE_NAME': table_name,
 '--REGION': region
 })['JobRunId']
print(job_run_id)

### Checking Glue Job Status

Now we will check for the job status to see if it has `SUCCEEDED`, `FAILED` or `STOPPED`. Once the job is succeeded, we have the transformed data into S3 in Parquet format which we will use to query with Athena and visualize with QuickSight. If the job fails, you can go to AWS Glue console, click on **Jobs** tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under **Logs**) link for these jobs which can help you to see what exactly went wrong in the job execution.

[glue.get_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_job_run)

In [None]:
job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
 job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
 print (job_run_status)
 time.sleep(60)
print(job_run_status)

### Create Crawler to populate Parquet formated table in Glue Data Catalog

We will create another crawler for the curated dataset we created converting the CSV files into Parquet formatted data.

In [None]:
parq_crawler_name = 'NY-Curated-Crawler'
parq_crawler_path = 's3://'+bucket+'/datalake/yellow_parquet/'

response = glue.create_crawler(
 Name=parq_crawler_name,
 Role=role,
 DatabaseName=database_name,
 Description='Crawler for the Parquet transformed yellow taxi data',
 Targets={
 'S3Targets': [
 {
 'Path': parq_crawler_path
 }
 ]
 },
 SchemaChangePolicy={
 'UpdateBehavior': 'UPDATE_IN_DATABASE',
 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
 }
)

### Start Crawler 

Much like we did with the raw data crcawler we will start the curated crawler pointing to the new data set created from the Glue job.

In [None]:
response = glue.start_crawler(
 Name=parq_crawler_name
)

print ("Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}".format(region, parq_crawler_name))


### Monitor the status of the Parquet crawler

In [None]:
crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']

while crawler_status not in ('READY'):
 crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']
 print(crawler_status)
 time.sleep(30)

In [None]:
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

### [Query the Data Lake with Athena](https://aws.amazon.com/athena/)

For the self-serve end users that need the ability to create ad-hoc queries against the data Athena is a great choice the utilizes Presto and ANSI SQL to query a number of file formats on S3.

To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/). You can also use the AWS console by browsing to the Athena service and run queries through the browser. Alternatively, you can also use the [JDBC/ODBC](https://docs.aws.amazon.com/athena/latest/ug/athena-bi-tools-jdbc-odbc.html) drivers available.

In [None]:
!pip install PyAthena

### Simple Select Query

In this first query we will create a simple query to show the ability of Athena to query the raw CSV data.

In [None]:
%%time
from pyathena import connect
from pyathena.util import as_pandas

cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()
cursor.execute('select * from ' + database_name + '.yellow limit 10')

df = as_pandas(cursor)

In [None]:
df.head(5)

### Complex Join Query

Now we will get more complex and create a query that utilizes multiple joins using Athena.

In [None]:
%%time
cursor.execute('''SELECT * FROM ''' + database_name + '''.yellow 
JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id 
JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id 
JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid 
JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid 
limit 10;''')

df = as_pandas(cursor)

In [None]:
df.head(5)

### Complex Join Query With Where Clause

Taking it a step further, we will now utilize the query with multiple joins and aggregate the number of entries by vendor looking at just the data found in the first 10 days of Jan. 2017.

In our Glue job we could have taken it a step further to optimze queries like this using data partitioning by date.

#### What is data partitioning?

A partition is a division of a logical database or its constituent elements into distinct independent parts. Database partitioning is normally done for manageability, performance or availability reasons, or for load balancing.

Examples in S3 would utilize prefixes in the bucket for the partitions in key=value pairs.

* s3://datalake/taxi/yellow/year=2018/month=1/
* s3://datalake/taxi/yellow/year=2018/month=1/day=1/
 
**Optional Exercise**
If you would like to try this for yourself you can change the Glue Job above when writing the data to S3 you select how to partition the data.

#### Glue context writing patitions
* Extract `year`, `month`, and `day` from the `tpep_pickup_datetime`. Look at [Pyspark documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn) for help.
* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:
 * [Partition](https://docs.aws.amazon.com/athena/latest/ug/partitions.html) the data based on columns `connection_options = {"path": parquet_output_path, "partitionKeys": ["year, month, day"]}`
 * Convert data to a [columnar format](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) `format="parquet"`

In [None]:
%%time
cursor.execute('''WITH yellow AS (SELECT date_parse(yellow.tpep_pickup_datetime,'%Y-%m-%d %H:%i:%s') AS pu_datetime, yellow.* FROM ''' + database_name + '''.yellow ) 
SELECT count(yellow.vendorid) as cnt FROM yellow 
JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id 
JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id 
JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid 
JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid 
WHERE year(pu_datetime) = 2017 
AND month(pu_datetime) = 1 
AND day(pu_datetime) BETWEEN 1 AND 10''')

df = as_pandas(cursor)

In [None]:
df.head(1)

### Optimized queries using the Parquet yellow taxi data

We will run the same queries again but this time we will use the dataset utilizing the parquet format to show the performance gains you get when converting.

In [None]:
%%time
cursor.execute('select * from ' + database_name + '.yellow_parquet limit 10')
df = as_pandas(cursor)

In [None]:
df.head(5)

Same complex queries using the `yellow_parquet` table instead.

In [None]:
%%time
cursor.execute('''
WITH yellow AS (SELECT date_parse(yellow.tpep_pickup_datetime,'%Y-%m-%d %H:%i:%s') AS pu_datetime, yellow.* FROM ''' + database_name + '''.yellow_parquet as yellow ) 
select count( yellow.vendorid)
FROM yellow
Inner JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id 
Inner JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id 
Inner JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid 
Inner JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid 
WHERE year(pu_datetime) = 2017 
AND month(pu_datetime) = 1 
AND day(pu_datetime) BETWEEN 1 AND 10''')

df = as_pandas(cursor)

In [None]:
df.head(5)

### Check status Redshift Cloudformation Stacks

Let's check in on the status of the EMR and Redshift CloudFormation stacks. Now that we showed how you can leverage Athena for querying the raw and curated data we want to dive into using other analytics engines to show the capability of keeping all your data in your data lake and leverage the right tools for the job.

Separating your storage from your compute allows you to scale each component independently. This gives you the flexibility needed when making tool selection as well providing agility in upgrading to new tools and services as they come out helping future proof your data lake solution.

In [None]:
response = cfn.describe_stacks(
 StackName=redshift_stack_name
)


if response['Stacks'][0]['StackStatus'] == 'CREATE_COMPLETE':
 for output in response['Stacks'][0]['Outputs']:
 if (output['OutputKey'] == 'RedshiftAddress'):
 redshift_cluster_name = output['OutputValue'].split('.')[0]
 print(redshift_cluster_name)
else:
 print('Not yet complete.')

In [None]:
response = redshift_client.describe_clusters(
 ClusterIdentifier=redshift_cluster_name
)

status = response['Clusters'][0]['ClusterStatus']

if status == 'available':
 redshift_address = response['Clusters'][0]['Endpoint']['Address']
 print(redshift_address)
 jdbc_url = 'jdbc:redshift://' + redshift_address + ':' + str(redshift_port) + '/' + redshift_database_name
 print(jdbc_url)
 iam_role = response['Clusters'][0]['IamRoles'][0]['IamRoleArn']
 print(iam_role)
else: 
 print('Not yet available. Current status is {}'.format(status))

### Install the psycopg2 library to connect to Redshift

Psycopg is the most popular PostgreSQL database adapter for the Python programming language. Its main features are the complete implementation of the Python DB API 2.0 specification and the thread safety. 

[psycopg2](http://initd.org/psycopg/)

In [None]:
!pip install psycopg2

### Create connection attributes

In [None]:
conn_string = { 'dbname': redshift_database_name, 
 'user': admin_user,
 'pwd':admin_password,
 'host': redshift_address,
 'port':redshift_port
 }

In [None]:
import psycopg2

def create_conn(config):
 try:
 # get a connection, if a connect cannot be made an exception will be raised here
 con=psycopg2.connect(dbname=config['dbname'], host=config['host'], 
 port=config['port'], user=config['user'], 
 password=config['pwd'])
 return con
 except Exception as err:
 print(err)

In [None]:
con = create_conn(config=conn_string)
print("Connected to Redshift!\n")

### Create Redshift Spectrum external table

In [None]:
statement = '''create external schema spectrum 
from data catalog 
database \'''' + database_name + '''\'
iam_role \'''' + iam_role + '''\'
create external database if not exists;'''

print(statement)
# con.cursor will return a cursor object, you can use this cursor to perform queries
cur = con.cursor()
cur.execute(statement)
con.commit()

Get the count by vendor for Jan 1st - 10th on 2017 using the CSV formatted data.

In [None]:
%%time
## Unoptimized

statement = '''select count(yellow.vendorid)
from spectrum.yellow
Inner JOIN spectrum.paymenttype ON yellow.payment_type = paymenttype.id
Inner JOIN spectrum.ratecode ON yellow.ratecodeid = ratecode.id
Inner JOIN spectrum.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid =
pu_taxizone.locationid
Inner JOIN spectrum.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid =
do_taxizone.locationid
where extract(month from cast(tpep_pickup_datetime as date)) = 1 and
extract(year from cast(tpep_pickup_datetime as date)) = 2017 and
extract(day from cast(tpep_pickup_datetime as date)) between 1 and 10;'''

df = read_sql(statement, con=con)

### View results

In [None]:
df

Get the count by vendor for Jan 1st - 10th on 2017 using the Parquet formatted data.

In [None]:
%%time
## Optimized

statement = '''select count(yellow.vendorid)
from spectrum.yellow_parquet as yellow
Inner JOIN spectrum.paymenttype ON yellow.payment_type = paymenttype.id
Inner JOIN spectrum.ratecode ON yellow.ratecodeid = ratecode.id
Inner JOIN spectrum.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid =
pu_taxizone.locationid
Inner JOIN spectrum.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid =
do_taxizone.locationid
where extract(month from cast(tpep_pickup_datetime as date)) = 1 and
extract(year from cast(tpep_pickup_datetime as date)) = 2017 and
extract(day from cast(tpep_pickup_datetime as date)) between 1 and 10;'''

df = read_sql(statement, con=con)

In [None]:
df

### Check status EMR Cloudformation Stacks
Let's check in on the status of the EMR cluster. If it's not yet finished please wait until it's ready.

In [None]:
response = cfn.describe_stacks(
 StackName=emr_stack_name
)

if response['Stacks'][0]['StackStatus'] == 'CREATE_COMPLETE':
 for output in response['Stacks'][0]['Outputs']:
 if (output['OutputKey'] == 'EMRClusterId'):
 cluster_id = output['OutputValue']
 print(cluster_id)
else:
 print('Not yet complete.')

In [None]:
notebook_prefix = 's3://{0}/notebooks/'.format(bucket)
emr_notebooks_file = 'TaxiEMRNotebook.ipynb'

print('Notebook Name: {}'.format(emr_notebooks_file.split('.')[0]))
print('Notebook Location: {}'.format(notebook_prefix))
print('Notebook Cluster: {}'.format(cluster_id))

### Create an EMR Notebook

Create a notebook in EMR to run Spark queries in based on the attributes above.

In [None]:
print('https://{0}.console.aws.amazon.com/elasticmapreduce/home?region={0}#create-notebook:'.format(region))

### Find Notebook id and import TaxiEMRNotebook into EMR Notebook

There is a notebook `TaxiEMRNotebook.ipynb` that you will want to download and import into the EMR notebook you just created and walk through the cells comparing the optimized vs. unoptimized schema format.

In [None]:
#Get Notebook Id
notebook_id = '{{notebook_id}}'
session.resource('s3').Bucket(bucket).Object(os.path.join('notebooks', notebook_id, emr_notebooks_file)).upload_file(emr_notebooks_file)


### Open EMR Notebook and execute queries

In [None]:
print('https://{0}.console.aws.amazon.com/elasticmapreduce/home?region={0}#notebooks-list:'.format(region))

**Congratulations!!!!** You have completed the workshops showing the capabilities of leveraging a Data Lake on AWS and the flexibility of choice when using analytics tools in AWS. Before you run the cleanup please delete the EMR Notebook you created above by selecting the notebook and clicking `Delete` in the toolbar on the EMR Notebook console.

### Cleanup 

In [None]:
response = cfn.delete_stack(StackName=redshift_stack_name)

In [None]:
response = cfn.delete_stack(StackName=emr_stack_name)

In [None]:
response = glue.delete_crawler(Name=parq_crawler_name)

In [None]:
response = glue.delete_crawler(Name=crawler_name)

In [None]:
response = glue.delete_job(JobName=glue_job_name)

In [None]:
response = glue.delete_database(
 CatalogId = account_id,
 Name = database_name
)

In [None]:
!aws s3 rb s3://$bucket --force 

In [None]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
 StackName=emr_stack_name
)

print('The wait is over for {0}'.format(emr_stack_name))

In [None]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
 StackName=redshift_stack_name
)

print('The wait is over for {0}'.format(redshift_stack_name))

In [None]:
if not use_existing:
 workshop.vpc_cleanup(vpc_id)