## Optimizing data for analysis with Amazon Athena and AWS Glue
We will continue our open data analytics workflow starting with the AWS Console then moving to using the notebook. Using [AWS Glue](https://aws.amazon.com/glue/) we can automate creating a metadata catalog based on flat files stored on Amazon S3. Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console. You simply point AWS Glue to your data stored on AWS, and AWS Glue discovers your data and stores the associated metadata (e.g. table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.
### Glue Data Catalog
We have sourced the open dataset from the [Registry of Open Data on AWS](https://registry.opendata.aws/). We also stored the data on S3. Now we are ready to extract, transform, and load the data for analytics. We will use AWS Glue service to do this. First step is to create a logical database entry in the data catalog. Note that we are not creating a physical database which requires resources. This is just a metadata placeholder for the flat file we copied into S3.
> When creating the data catalog name try choosing a name without hyphens and few characters long. This will make SQL queries more readable and also avoid certain errors when running these queries.

We can also setup the notebook for accessing AWS Glue service using the ``Boto3`` Python SDK. The ``pandas`` and ``IPython`` dependencies are imported for output formatting purposes only. We also import ``numpy`` a popular statistical analysis library. Charts and visualizations will be supported by ``seaborn`` and ``matplotlib`` libraries. To access the Glue service API we create a Glue client.
```python
import boto3
import pandas as pd
import numpy as np
from IPython.display import display, Markdown
import seaborn as sns
%matplotlib inline
import matplotlib.pyplot as plt
```
```python
glue = boto3.client('glue')
s3 = boto3.client('s3')
```
### List Glue Databases
We will recreate the AWS Console GUI experience using SDK calls by creating the ``list_glue_databases`` function. We simply get the data catalogs in one statement and iterate over the results in the next one.
```python
def list_glue_databases():
glue_database = glue.get_databases()
for db in glue_database['DatabaseList']:
print(db['Name'])
```
```python
list_glue_databases()
```
default
odoc
sampledb
taxicatalog
### Glue Crawler
Next, we create a logical table using Glue crawler. This is again just table metadata definition while the actual data is still stored only in the flat file on S3. For this notebook we will define and run the default Glue Crawler to extract and load the metadata schema from our flat file. This requires selection of a data store which is S3 in this case, defining an IAM role for access from Glue to S3, selecting a schedule for the crawler to run repeatedly if required, and output destination of the crawler results.
> Please ensure that the flat file is stored on S3 within its own folder and you point at the folder when picking the data source during crawler definition. If you point directly to a flat file when running the crawler, it may return zero results when querying using Amazon Athena.
Glue will pick up folder name for the logical table name. Keeping our data source files in a folder has the added advantage of incremntally updating the folder with updates to the data with more files or updating the original file. Glue will pick up these changes based on crawler run schedule.

### Glue Table Metadata
This results in extraction of table metadata stored within our data catalog. The schema with data types is extracted and stored in Glue Data Catalog. Note that the default Glue Crawler understands well-formed CSV files with first row as comma-separated list of column names, and next set of rows representing ordered data records. The Glue Crawler automatically guesses data types based on the contents of the flat file.

### Transform Data Using Athena
Transforming big data in notebook environment is not viable. Instead we can use Amazon Athena for large data transforms and bring the results back into our notebook.

We will use following query to create a well formed table transformed from our original table. Note that we specify the output location so that Athena defined WorkGroup location is not used by default. We also specify the format as ``TEXTFILE`` otherwise default ``PARQUET`` format is used which may generate errors when sampling this data.
```SQL
CREATE TABLE
IF NOT EXISTS "taxicatalog"."many_trips_well_formed"
WITH (
external_location = 's3://open-data-analytics-taxi-trips/many-trips-well-formed/',
format = 'TEXTFILE',
field_delimiter = ','
)
AS SELECT vendorid AS vendor,
passenger_count AS passengers,
trip_distance AS distance,
ratecodeid AS rate,
pulocationid AS pick_location,
dolocationid AS drop_location,
payment_type AS payment_type,
fare_amount AS fare,
extra AS extra_fare,
mta_tax AS tax,
tip_amount AS tip,
tolls_amount AS toll,
improvement_surcharge AS surcharge,
total_amount AS total_fare,
tpep_pickup_datetime AS pick_when,
tpep_dropoff_datetime AS drop_when
FROM "taxicatalog"."many_trips";
```
### List Glue Tables
In the spirit of AWS Open Data Analytics API we will recreate the AWS Console feature which lists the tables and displays the metadata within one single reusable function. We get the list of table metadata stored within our data catalog by passing the ``database`` parameter. Next we iterate over each table object and display the name, source data file, number of records (estimate), average record size, data size in MB, and the name of the crawler used to extract the table metadata. We also display the list of column names and data types extracted as schema from the flat file stored on S3.
```python
def list_glue_tables(database, verbose=True):
glue_tables = glue.get_tables(DatabaseName=database)
for table in glue_tables['TableList']:
display(Markdown('**Table: ' + table['Name'] + '**'))
display(Markdown('Location: ' + table['StorageDescriptor']['Location']))
created = table['CreatedBy'].split('/')
display(Markdown('Created by: ' + created[-1]))
if verbose and created[-1] == 'AWS Crawler':
display(Markdown(f'Records: {int(table["Parameters"]["recordCount"]):,}'))
display(Markdown(f'Average Record Size: {table["Parameters"]["averageRecordSize"]} Bytes'))
display(Markdown(f'Dataset Size: {float(table["Parameters"]["sizeKey"])/1024/1024:3.0f} MB'))
display(Markdown(f'Crawler: {table["Parameters"]["UPDATED_BY_CRAWLER"]}'))
if verbose:
df_columns = pd.DataFrame.from_dict(table["StorageDescriptor"]["Columns"])
display(df_columns[['Name', 'Type']])
display(Markdown('---'))
```
```python
list_glue_tables('taxicatalog', verbose=False)
```
**Table: many_trips**
Location: s3://open-data-analytics-taxi-trips/many-trips/
Created by: AWS-Crawler
**Table: many_trips_well_formed**
Location: s3://open-data-analytics-taxi-trips/many-trips-well-formed
Created by: manav
```python
athena = boto3.client('athena')
```
### Athena Query
Our next action is to bring the data created within Athena into the notebook environment using a ``pandas`` DataFrame. This can be done using the ``athena_query`` function which calls the Amazon Athena API to execute a query and store the output within a bucket and folder. This output is then read by a DataFrame which is returned by the function.
```python
def athena_query(query, bucket, folder):
output = 's3://' + bucket + '/' + folder + '/'
response = athena.start_query_execution(QueryString=query,
ResultConfiguration={'OutputLocation': output})
qid = response['QueryExecutionId']
response = athena.get_query_execution(QueryExecutionId=qid)
state = response['QueryExecution']['Status']['State']
while state == 'RUNNING':
response = athena.get_query_execution(QueryExecutionId=qid)
state = response['QueryExecution']['Status']['State']
key = folder + '/' + qid + '.csv'
data_source = {'Bucket': bucket, 'Key': key}
url = s3.generate_presigned_url(ClientMethod = 'get_object', Params = data_source)
data = pd.read_csv(url)
return data
```
To explore the data within Athena we will query returning thousand random samples.
```python
bucket = 'open-data-analytics-taxi-trips'
folder = 'queries'
query = 'SELECT * FROM "taxicatalog"."many_trips_well_formed" TABLESAMPLE BERNOULLI(100) LIMIT 1000;'
df = athena_query(query, bucket, folder)
df.head()
```
|
vendor |
passengers |
distance |
rate |
pick_location |
drop_location |
payment_type |
fare |
extra_fare |
tax |
tip |
toll |
surcharge |
total_fare |
pick_when |
drop_when |
0 |
2 |
1 |
1.25 |
1 |
237 |
236 |
1 |
9.0 |
0.0 |
0.5 |
0.00 |
0.0 |
0.3 |
9.80 |
2018-06-06 10:43:34 |
2018-06-06 10:54:58 |
1 |
1 |
1 |
1.20 |
1 |
158 |
90 |
2 |
7.5 |
0.0 |
0.5 |
0.00 |
0.0 |
0.3 |
8.30 |
2018-06-06 10:06:22 |
2018-06-06 10:15:21 |
2 |
1 |
1 |
3.30 |
1 |
234 |
236 |
1 |
17.0 |
0.0 |
0.5 |
3.55 |
0.0 |
0.3 |
21.35 |
2018-06-06 10:17:20 |
2018-06-06 10:43:07 |
3 |
1 |
1 |
0.90 |
1 |
236 |
140 |
1 |
7.0 |
0.0 |
0.5 |
1.55 |
0.0 |
0.3 |
9.35 |
2018-06-06 10:48:28 |
2018-06-06 10:57:08 |
4 |
1 |
1 |
1.00 |
1 |
141 |
162 |
1 |
7.0 |
0.0 |
0.5 |
1.95 |
0.0 |
0.3 |
9.75 |
2018-06-06 10:59:28 |
2018-06-06 11:08:05 |
Next we will determine statistical correlation between various features (columns) within the given set of samples (records).
```python
corr = df.corr(method ='spearman')
corr
```
|
vendor |
passengers |
distance |
rate |
pick_location |
drop_location |
payment_type |
fare |
extra_fare |
tax |
tip |
toll |
surcharge |
total_fare |
vendor |
1.000000 |
0.283619 |
0.015401 |
0.055897 |
-0.024097 |
-0.005115 |
0.013634 |
-0.014083 |
NaN |
-0.009308 |
-0.024436 |
0.025840 |
NaN |
-0.015078 |
passengers |
0.283619 |
1.000000 |
0.033053 |
0.051624 |
-0.021166 |
0.003783 |
0.020200 |
0.035106 |
NaN |
-0.022736 |
0.008936 |
0.003765 |
NaN |
0.033289 |
distance |
0.015401 |
0.033053 |
1.000000 |
0.119010 |
-0.119491 |
-0.148011 |
-0.068732 |
0.917127 |
NaN |
-0.080828 |
0.389773 |
0.401863 |
NaN |
0.903529 |
rate |
0.055897 |
0.051624 |
0.119010 |
1.000000 |
-0.042557 |
-0.053956 |
-0.007774 |
0.185992 |
NaN |
-0.501256 |
0.083778 |
0.246460 |
NaN |
0.184445 |
pick_location |
-0.024097 |
-0.021166 |
-0.119491 |
-0.042557 |
1.000000 |
0.150656 |
-0.009998 |
-0.129692 |
NaN |
0.010869 |
-0.028087 |
-0.153488 |
NaN |
-0.127936 |
drop_location |
-0.005115 |
0.003783 |
-0.148011 |
-0.053956 |
0.150656 |
1.000000 |
0.003079 |
-0.162211 |
NaN |
0.090225 |
-0.042135 |
-0.087721 |
NaN |
-0.154017 |
payment_type |
0.013634 |
0.020200 |
-0.068732 |
-0.007774 |
-0.009998 |
0.003079 |
1.000000 |
-0.073051 |
NaN |
-0.015087 |
-0.776507 |
-0.068458 |
NaN |
-0.212893 |
fare |
-0.014083 |
0.035106 |
0.917127 |
0.185992 |
-0.129692 |
-0.162211 |
-0.073051 |
1.000000 |
NaN |
-0.091508 |
0.425216 |
0.395950 |
NaN |
0.983444 |
extra_fare |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
tax |
-0.009308 |
-0.022736 |
-0.080828 |
-0.501256 |
0.010869 |
0.090225 |
-0.015087 |
-0.091508 |
NaN |
1.000000 |
0.012988 |
-0.148891 |
NaN |
-0.089695 |
tip |
-0.024436 |
0.008936 |
0.389773 |
0.083778 |
-0.028087 |
-0.042135 |
-0.776507 |
0.425216 |
NaN |
0.012988 |
1.000000 |
0.267483 |
NaN |
0.555170 |
toll |
0.025840 |
0.003765 |
0.401863 |
0.246460 |
-0.153488 |
-0.087721 |
-0.068458 |
0.395950 |
NaN |
-0.148891 |
0.267483 |
1.000000 |
NaN |
0.403146 |
surcharge |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
NaN |
total_fare |
-0.015078 |
0.033289 |
0.903529 |
0.184445 |
-0.127936 |
-0.154017 |
-0.212893 |
0.983444 |
NaN |
-0.089695 |
0.555170 |
0.403146 |
NaN |
1.000000 |
We can drop features which show ``NaN`` correlation.
```python
df = df.drop(columns=['surcharge'])
```
```python
corr = df.corr(method ='spearman')
```
### Heatmap
Completing the data science workflow from sourcing big data, wrangling it using Amazon Athena to well formed schema, bringing adequate sample data from Athena to notebook environment, conducting exploratory data analysis, and finally visualizing the results.
```python
def heatmap(corr):
sns.set(style="white")
# Generate a mask for the upper triangle
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))
# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)
# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0, annot=True, fmt="3.2f",
square=True, linewidths=.5, cbar_kws={"shrink": .5})
```
```python
heatmap(corr)
```
