# Streaming Data to your Data Lake

This workshop will walk through another common scenario when building your data lake. We will generate a Kinesis Firehose to send data to our data lake in real-time. We will walk through manually adding the metadata to the Glue Data Catalog and add the partitions for the data coming in. Finally, we will create an aggregate query that can utilize the partitions of the data generated.

In [None]:
import boto3
import botocore
import json

import project_path # path to helper methods
from lib import workshop

iam = boto3.client('iam')
logs = boto3.client('logs')
firehose = boto3.client('firehose')
glue = boto3.client('glue')
s3 = boto3.client('s3')

# General variables for the region and account id for the location of the resources being created
session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

workshop_user = 'bdw'
# Kinesis Firehose
delivery_stream_name = 'taxi-streaming_{0}'.format(workshop_user) # Name of the firehose to send Apache log simulations
firehose_role_name = 'taxi-firehose-role_{0}'.format(workshop_user) # Role used for the Kinesis firehose
firehose_policy_name = 'taxi-firehose-policy_{0}'.format(workshop_user) # Inline policy of the Kinesis Firehose Role
cloudwatch_logs_group_name = '/taxi_{0}'.format(workshop_user)
cloudwatch_logs_stream_name = 'ingestion-stream'

# Glue
database_name = 'taxi_{0}'.format(workshop_user)
table_name = 'yellow_streaming_{0}'.format(workshop_user)

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'taxi-')
print(bucket)

### [Create the Role for the Kinesis Firehose](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html)

When you're using an Amazon S3 destination, Kinesis Data Firehose delivers data to your S3 bucket and can optionally use an AWS KMS key that you own for data encryption. If error logging is enabled, Kinesis Data Firehose also sends data delivery errors to your CloudWatch log group and streams. You are required to have an IAM role when creating a delivery stream. Kinesis Data Firehose assumes that IAM role and gains access to the specified bucket, key, and CloudWatch log group and streams.

[Kinesis Firehose access to S3 Destination](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-iam-s3)

In [None]:
role_doc = {
 "Version": "2012-10-17", 
 "Statement": [
 {"Sid": "", 
 "Effect": "Allow", 
 "Principal": {
 "Service": "firehose.amazonaws.com"
 }, 
 "Action": "sts:AssumeRole", 
 "Condition": {
 "StringEquals": {
 "sts:ExternalId": account_id
 }
 }
 }]
 }

inline_policy = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Action": [
 "cloudwatch:*",
 "cloudwatchlogs:*"
 ],
 "Resource": [
 "*"
 ],
 "Effect": "Allow"
 },
 {
 "Action": [
 "s3:*"
 ],
 "Resource": [
 "arn:aws:s3:::" + bucket + "/*",
 "arn:aws:s3:::" + bucket
 ],
 "Effect": "Allow"
 }
 ]
 }

role_arn = workshop.create_role(iam, firehose_role_name, json.dumps(role_doc), firehose_policy_name, json.dumps(inline_policy))
print(role_arn)

## Create the CloudWatch Log Group and Stream

[Monitoring Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html)

In [None]:
response = logs.create_log_group(
 logGroupName=cloudwatch_logs_group_name,
)

print('CloudWatch Log Group status: ' + str(response['ResponseMetadata']['HTTPStatusCode']))

response = logs.create_log_stream(
 logGroupName=cloudwatch_logs_group_name,
 logStreamName=cloudwatch_logs_stream_name
)

print('CloudWatch Log Stream status: ' + str(response['ResponseMetadata']['HTTPStatusCode']))

### [Copy Sample Data to S3 bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-download-file.html) 

We will download some files from New York City Taxi and Limousine Commission (TLC) Trip Record Data dataset available on the [AWS Open Data Registry](https://registry.opendata.aws/nyc-tlc-trip-records-pds/).


In [None]:
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-01.csv .

### [Create the Kinesis Firehose we will use to send Apache Logs to our Data Lake](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose is part of the Kinesis streaming data platform, along with Kinesis Data Streams, Kinesis Video Streams, and Amazon Kinesis Data Analytics. With Kinesis Data Firehose, you don't need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

In this example, we will create custom S3 prefixes for when the data lands in S3. This will allow us to precreate the partitions that will be cataloged in the Glue Data Catalog. To find more information follow this [link](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html)

[firehose.create_delivery_stream](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.create_delivery_stream)

In [None]:
response = firehose.create_delivery_stream(
 DeliveryStreamName=delivery_stream_name,
 DeliveryStreamType='DirectPut',
 S3DestinationConfiguration={
 'RoleARN': role_arn,
 'BucketARN': 'arn:aws:s3:::' + bucket,
 'Prefix': 'datalake/taxi/streaming/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
 'ErrorOutputPrefix': "datalake/taxi-error/streaming/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}",
 'BufferingHints': {
 'SizeInMBs': 50,
 'IntervalInSeconds': 60
 },
 'CompressionFormat': 'GZIP',
 'EncryptionConfiguration': {
 'NoEncryptionConfig': 'NoEncryption'
 },
 'CloudWatchLoggingOptions': {
 'Enabled': True,
 'LogGroupName': cloudwatch_logs_group_name,
 'LogStreamName': cloudwatch_logs_stream_name
 }
 }
)

### Wait for the Kinesis Firehose to become 'Active'
The Kinesis Firehose Delivery Stream is in the process of being created.

In [None]:
import time

response = firehose.describe_delivery_stream(
 DeliveryStreamName=delivery_stream_name
)

status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
print(status)

while status == 'CREATING':
 time.sleep(30)
 response = firehose.describe_delivery_stream(
 DeliveryStreamName=delivery_stream_name
 )
 status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
 print(status)

print('Kinesis Firehose created.')

### Send simulated Taxi events to Kinesis Firehose

The code below will generate taxi events from the file we downloaded and send them to the Kinesis Data Firehose. We will generate new times for the data to allow them to fit into the current time window. To optimize writes to the firehose we could also leverage the [firehose.put_record_batch](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.put_record_batch) call as well.

[firehose.put_record](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.put_record)

In [None]:
import csv
import datetime
import random

f = open('yellow_tripdata_2017-01.csv')
csv_f = csv.reader(f)
first = True
max_records = 1000
cnt = 0;

for row in csv_f: 
 if first: # skip the header record for the csv file
 first = False
 continue
 
 if row:
 drop_off = datetime.datetime.now()
 pick_up = drop_off - datetime.timedelta(minutes=random.randint(5,60))
 fmt = "%Y-%m-%d %H:%M:%S"

 data_row = {
 'vendorid': row[0],
 'tpep_pickup_datetime': pick_up.strftime(fmt), # change to updated time
 'tpep_dropoff_datetime': drop_off.strftime(fmt), # change to updated time
 'passenger_count': row[3],
 'trip_distance': row[4],
 'ratecodeid': row[5],
 'store_and_fwd_flag': row[6],
 'pulocationid': row[7],
 'dolocationid': row[8],
 'payment_type': row[9],
 'fare_amount': row[10],
 'extra': row[11],
 'mta_tax': row[12],
 'tip_amount': row[13],
 'tolls_amount': row[14],
 'improvement_surcharge': row[15],
 'total_amount': row[16]
 }
 
 res = firehose.put_record(
 DeliveryStreamName=delivery_stream_name, 
 Record={
 'Data': json.dumps(data_row) + '\n' 
 }
 ) 
 time.sleep(0.1)
 cnt = cnt + 1

 if cnt >= max_records:
 break


### Wait for Firehose to persist data to S3

In [None]:
time.sleep(60)

### Finding out the current execution role of the Notebook
We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue.

In [None]:
# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
role_name = role[role.rfind('/') + 1:]
print(role_name)

### Adding AWS Glue as an additional trusted entity to this role
This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. 

If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step.

On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. 

Click on **Edit trust relationship** and replace the JSON with this JSON.
```
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": [
 "sagemaker.amazonaws.com",
 "glue.amazonaws.com"
 ]
 },
 "Action": "sts:AssumeRole"
 }
 ]
}
```
Once this is complete, click on **Update Trust Policy** and you are done.

![IAM Roles](../../docs/assets/images/iam_roles_hl.png)

In [None]:
print("https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}".format(region, role_name))

### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)

When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.

There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`

[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)

In [None]:
workshop.create_db(glue, account_id, database_name, 'New York City Taxi and Limousine Commission (TLC) Trip Record Data')

### [Create the Streaming table in Glue](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html)

When you define a table in AWS Glue, you also specify the value of a classification field that indicates the type and format of the data that's stored in that table. If a crawler creates the table, these classifications are determined by either a built-in classifier or a custom classifier. If you create a table manually in the console or by using an API, you specify the classification when you define the table. For more information about creating a table using the AWS Glue console, see [Working with Tables on the AWS Glue Console](https://docs.aws.amazon.com/glue/latest/dg/console-tables.html).

We also create the partitions that will be generated by using the Kinesis Firehose. You can find those values in the `PartitionKeys` section of the `create_table` call in Glue.

[glue.create_table](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_table)

In [None]:
prefix = 'datalake/taxi/streaming/'
location = 's3://{0}/{1}'.format(bucket,prefix)

response = glue.create_table(
 CatalogId=account_id,
 DatabaseName=database_name,
 TableInput={
 'Name': table_name,
 'Description': 'Yellow Taxi Streaming dataset',
 'StorageDescriptor': {
 'Columns': [
 {
 'Name': 'vendorid',
 'Type': 'bigint'
 },
 {
 'Name': 'tpep_pickup_datetime',
 'Type': 'string'
 },
 {
 'Name': 'tpep_dropoff_datetime',
 'Type': 'string'
 },
 {
 'Name': 'passenger_count',
 'Type': 'bigint'
 },
 {
 'Name': 'trip_distance',
 'Type': 'double'
 },
 {
 'Name': 'ratecodeid',
 'Type': 'bigint'
 },
 {
 'Name': 'store_and_fwd_flag',
 'Type': 'string'
 },
 {
 'Name': 'pulocationid',
 'Type': 'bigint'
 },
 {
 'Name': 'dolocationid',
 'Type': 'bigint'
 },
 {
 'Name': 'payment_type',
 'Type': 'bigint'
 },
 {
 'Name': 'fare_amount',
 'Type': 'double'
 },
 {
 'Name': 'extra',
 'Type': 'double'
 },
 {
 'Name': 'mta_tax',
 'Type': 'double'
 },
 {
 'Name': 'tip_amount',
 'Type': 'double'
 },
 {
 'Name': 'tolls_amount',
 'Type': 'double'
 },
 {
 'Name': 'improvement_surcharge',
 'Type': 'double'
 },
 {
 'Name': 'total_amount',
 'Type': 'double'
 }
 ],
 'Location': location,
 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
 'SerdeInfo': {
 'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
 'Parameters': {
 'paths': 'vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount'
 }
 },
 },
 'PartitionKeys': [
 {
 'Name': 'year',
 'Type': 'bigint',
 },
 {
 'Name': 'month',
 'Type': 'bigint',
 },
 {
 'Name': 'day',
 'Type': 'bigint',
 },
 {
 'Name': 'hour',
 'Type': 'bigint',
 }
 ],
 'TableType': 'EXTERNAL_TABLE',
 'Parameters': {
 'classification': 'json'
 }
 }
)


In [None]:
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

### Update partitions for Glue Table

When manually creating a table with partitions you need to notify the Glue Data Catalog of the partitions. There are a number of ways to populate the partitions, you could use Athena and run `MSCK REPAIR TABLE table_name` to recover partitions and data associated with partitions as an option, but in this workshop we will discover the associated partitions in the S3 bucket and add them with the Glue Partition API.

[glue.batch_create_partition](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition)

In [None]:
response = s3.list_objects_v2(
 Bucket=bucket,
 Prefix=prefix
)

# Load the table created above to get the StorageDescriptor def for columns, etc.
streaming_table = glue.get_table(
 CatalogId=account_id,
 DatabaseName=database_name,
 Name=table_name
)

storage_descriptor= streaming_table['Table']['StorageDescriptor']

# De-dupe partitions if there are any
partitions = set()
for obj in response['Contents']:
 # remove the first 3 prefixes for the datalake/taxi/streaming/ data lake location above and the last entry for the file
 keys = obj['Key'].split('/')[3:-1]
 # get the values of the prefixes in between. These are the year,month,day,hour values to be used for the partition
 values = [k.split('=')[1] for k in keys]
 storage_descriptor['Location'] = '{0}{1}'.format(location, '/'.join(keys))
 partitions.add(json.dumps({"StorageDescriptor": storage_descriptor ,"Values": list(values)}))

#batch add partitions from the kinesis stream
response = glue.batch_create_partition(
 CatalogId=account_id,
 DatabaseName=database_name,
 TableName=table_name,
 PartitionInputList=list(json.loads(part) for part in partitions)
)

### Get the result of the partition load

If you run the above commands multiple times or over partitions that have already been registered you will see and erro message `Partition already exists.` and a list of partition values that already exist.

In [None]:
print(response)

### Query the Data Lake with Athena

To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/)

In [None]:
!pip install PyAthena

### Simple Select Query

Notice you have the values for the partitions as part of the table returned. You can use these values to minimize the number of objects scanned in the S3 location helping to improve both performance and cost of the Athena queries.

In [None]:
from pyathena import connect
from pyathena.util import as_pandas

cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()
cursor.execute('select * from ' + database_name + '.' + table_name + ' limit 10')

df = as_pandas(cursor)
df.head(5)

### Query with where clause using partitions

In this example, if we had more partition you would benefit from the partitions in S3 for the `year`, `month`, and `day` whenlooking for specific data that leverage the partitions. Here we parse the variables from the partition we just added to Glue and use them in an aggregate query to return the records generated for the day.

In [None]:
path = json.loads(list(partitions)[0])['StorageDescriptor']['Location'].split('/')
year = path[6].split('=')[1]
month = path[7].split('=')[1]
day = path[8].split('=')[1]
hour = path[9].split('=')[1]

print('{0}-{1}-{2} {3}'.format(year, month, day, hour))

In [None]:
%%time
cursor.execute('''SELECT count(vendorid) as cnt FROM ''' + database_name + '''.''' + table_name + '''
WHERE year = ''' + year + '''
AND month = ''' + month + ''' 
AND day = ''' + day)

df = as_pandas(cursor)

In [None]:
df

## Clean Up

That's it, in this notebook we pulled data from the open data registry, cataloged it by creating a new table and partition in the Glue Data Catalog, and finally queried the results with Athena.

In [None]:
response = iam.delete_role_policy(
 RoleName=firehose_role_name,
 PolicyName=firehose_policy_name
)

In [None]:
iam.delete_role(RoleName=firehose_role_name)

In [None]:
response = logs.delete_log_group(
 logGroupName=cloudwatch_logs_group_name
)

In [None]:
firehose.delete_delivery_stream(
 DeliveryStreamName=delivery_stream_name
)

In [None]:
response = glue.delete_database(
 CatalogId = account_id,
 Name = database_name
)

In [None]:
!aws s3 rb s3://$bucket --force 