## Serverless Data Lake Ingestion with Kinesis

In this notebook we will walk through the steps required to use the Kinesis suite of tools as a swiss army knife to land data into your Data Lake. We will simulate Apache logs from a web server that could be sent from the [Kinesis Logs Agent](https://github.com/awslabs/amazon-kinesis-agent) to an Kinesis Data Stream. Once the logs have been sent to Kinesis we can convert, transform, and persist the processed and raw logs in your Data Lake. Finally, we will also show the real-time aspect of Kinesis by using an enhanced fan-out consumer with Lambda to send 500 errors found in the stream to Slack. The diagram below depicts the solutions we will be creating below.

![Kinesis Ingestion](../../docs/assets/images/kinesis-swiss-army.png)

You will need a Slack account and web hook to complete this workshop. Create a Slack account [here](https://slack.com/get-started). Once you have the acocunt created you will need to create a [Slack Channel](https://get.slack.help/hc/en-us/articles/201402297-Create-a-channel) and add a [WebHook](https://get.slack.help/hc/en-us/articles/115005265063-Incoming-WebHooks-for-Slack) to it.

In [None]:
import boto3
import botocore
import json
import time
import project_path
import getpass
import pandas as pd

from lib import workshop

cfn = boto3.client('cloudformation')
logs = boto3.client('logs')
firehose = boto3.client('firehose')
s3 = boto3.client('s3')

# General variables for the region and account id for the location of the resources being created
session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

delivery_stream_name = 'dc-demo-firehose'
kdg_stack = 'kinesis-data-generator-cognito'
kdg_username = 'admin'

slack_web_hook = '{{SlackURL}}'

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'demo-')
print(bucket)

### [Launch Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)

*** This is optional if you haven't already launched the KDG in your account ***

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose. Learn how to use the tool and create templates for your records.

Create an Amazon Cognito User
Before you can send data to Kinesis, you must first create an [Amazon Cognito](https://aws.amazon.com/cognito/) user in your AWS account with permissions to access Amazon Kinesis. To simplify this process, an [Amazon Lambda](https://aws.amazon.com/lambda/) function and an [Amazon CloudFormation](https://aws.amazon.com/cloudformation/) template are provided to create the user and assign just enough permissions to use the KDG.

Create the CloudFormation stack by clicking the link below. It will take you to the AWS CloudFormation console and start the stack creation wizard. You only need to provide a Username and Password for the user that you will use to log in to the KDG. Accept the defaults for any other options presented by CloudFormation.

[Create a Cognito user with CloudFormation](https://console.aws.amazon.com/cloudformation/home?region=us-west-2#/stacks/new?stackName=Kinesis-Data-Generator-Cognito-User&templateURL=https://s3-us-west-2.amazonaws.com/kinesis-helpers/cognito-setup.json)

The CloudFormation template can be downloaded from [here](https://s3-us-west-2.amazonaws.com/kinesis-helpers/cognito-setup.json). Because Amazon Cognito is not supported by CloudFormation, much of the setup is done in a Lambda function. The source code for the function can be downloaded from [here](https://s3-us-west-2.amazonaws.com/kinesis-helpers/datagen-cognito-setup.zip).

### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template

In the interest of time we will leverage CloudFormation to launch many of the supporting resources needed for utilizing Kinesis Data Firehose to store the raw data, pre-process the incoming data, and store the curated data in S3.

In [None]:
demo_file = 'cfn/kinesis-swiss-army.yaml'
session.resource('s3').Bucket(bucket).Object(demo_file).upload_file(demo_file)

What does the CloudFormation template look like?

In [None]:
!cat cfn/kinesis-swiss-army.yaml

In [None]:
cfn_template = 'https://s3.amazonaws.com/{0}/{1}'.format(bucket, demo_file)
print(cfn_template)

stack_name = 'kinesis-swiss-army'
response = cfn.create_stack(
 StackName=stack_name,
 TemplateURL=cfn_template,
 Capabilities = ["CAPABILITY_NAMED_IAM"],
 Parameters=[
 {
 'ParameterKey': 'SlackWebHookUrl',
 'ParameterValue': slack_web_hook
 }
 ] 
 
)

print(response)

In [None]:
response = cfn.describe_stacks(
 StackName=stack_name
)

while response['Stacks'][0]['StackStatus'] != 'CREATE_COMPLETE':
 print('Not yet complete.')
 time.sleep(30)
 response = cfn.describe_stacks(
 StackName=stack_name
 )

outputs = response['Stacks'][0]['Outputs']

for output in outputs:
 if (output['OutputKey'] == 'FirehoseExecutionRole'):
 firehose_arn = output['OutputValue']
 if (output['OutputKey'] == 'LambdaPreProcessArn'):
 pre_processing_arn = output['OutputValue']
 if (output['OutputKey'] == 'GlueDatabase'):
 database = output['OutputValue']
 if (output['OutputKey'] == 'RawTable'):
 raw_table = output['OutputValue']
 if (output['OutputKey'] == 'CuratedTable'):
 curated_table = output['OutputValue']
 if (output['OutputKey'] == 'WeblogsBucket'):
 event_bucket = output['OutputValue']
 if (output['OutputKey'] == 'FirehoseLogGroup'):
 cloudwatch_logs_group_name = output['OutputValue']
 if (output['OutputKey'] == 'KinesisEventStream'):
 event_stream_arn = output['OutputValue']
 
pd.set_option('display.max_colwidth', -1)
pd.DataFrame(outputs, columns=["OutputKey", "OutputValue"])

### [Create the Kinesis Firehose we will use to send Apache Logs to our Data Lake](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose is part of the Kinesis streaming data platform, along with Kinesis Data Streams, Kinesis Video Streams, and Amazon Kinesis Data Analytics. With Kinesis Data Firehose, you don't need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

In this example, we will create custom S3 prefixes for when the data lands in S3. This will allow us to precreate the partitions that will be cataloged in the Glue Data Catalog. To find more information follow this [link](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html)

[firehose.create_delivery_stream](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.create_delivery_stream)

In [None]:
response = firehose.create_delivery_stream(
 DeliveryStreamName=delivery_stream_name,
 DeliveryStreamType='KinesisStreamAsSource',
 KinesisStreamSourceConfiguration={
 'KinesisStreamARN': event_stream_arn,
 'RoleARN': firehose_arn
 },
 ExtendedS3DestinationConfiguration={
 'RoleARN': firehose_arn,
 'BucketARN': 'arn:aws:s3:::' + event_bucket,
 'Prefix': 'weblogs/processed/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
 'ErrorOutputPrefix': 'weblogs/failed/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
 'BufferingHints': {
 'SizeInMBs': 128,
 'IntervalInSeconds': 60
 },
 'CompressionFormat': 'UNCOMPRESSED',
 'EncryptionConfiguration': {
 'NoEncryptionConfig': 'NoEncryption'
 },
 'CloudWatchLoggingOptions': {
 'Enabled': True,
 'LogGroupName': cloudwatch_logs_group_name,
 'LogStreamName': 'ingestion_stream'
 },
 'ProcessingConfiguration': {
 'Enabled': True,
 'Processors': [
 {
 'Type': 'Lambda',
 'Parameters': [
 {
 'ParameterName': 'LambdaArn',
 'ParameterValue': '{0}:$LATEST'.format(pre_processing_arn)
 },
 {
 'ParameterName': 'NumberOfRetries',
 'ParameterValue': '1'
 },
 {
 'ParameterName': 'RoleArn',
 'ParameterValue': firehose_arn
 },
 {
 'ParameterName': 'BufferSizeInMBs',
 'ParameterValue': '3'
 },
 {
 'ParameterName': 'BufferIntervalInSeconds',
 'ParameterValue': '60'
 }
 ]
 }
 ]
 },
 'S3BackupMode': 'Enabled',
 'S3BackupConfiguration': {
 'RoleARN': firehose_arn,
 'BucketARN': 'arn:aws:s3:::' + event_bucket,
 'Prefix': 'weblogs/raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
 'ErrorOutputPrefix': 'weblogs/failed/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
 'BufferingHints': {
 'SizeInMBs': 128,
 'IntervalInSeconds': 60
 },
 'CompressionFormat': 'UNCOMPRESSED',
 'EncryptionConfiguration': {
 'NoEncryptionConfig': 'NoEncryption'
 },
 'CloudWatchLoggingOptions': {
 'Enabled': True,
 'LogGroupName': cloudwatch_logs_group_name,
 'LogStreamName': 'raw_stream'
 }
 },
 'DataFormatConversionConfiguration': {
 'SchemaConfiguration': {
 'RoleARN': firehose_arn,
 'DatabaseName': database,
 'TableName': curated_table,
 'Region': region,
 'VersionId': 'LATEST'
 },
 'InputFormatConfiguration': {
 'Deserializer': {
 'OpenXJsonSerDe': {}
 }
 },
 'OutputFormatConfiguration': {
 'Serializer': {
 'ParquetSerDe': {}
 }
 },
 'Enabled': True
 }
 }
)

print(response)

### Wait for the Kinesis Firehose to become 'Active'
The Kinesis Firehose Delivery Stream is in the process of being created.

[firehose.describe_delivery_stream](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.describe_delivery_stream)

In [None]:
response = firehose.describe_delivery_stream(
 DeliveryStreamName=delivery_stream_name
)

status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
print(status)

while status == 'CREATING':
 time.sleep(30)
 response = firehose.describe_delivery_stream(
 DeliveryStreamName=delivery_stream_name
 )
 status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
 print(status)

print('Kinesis Firehose created.')

We will be sending simulated apache logs to the Kinesis Data Stream and land the data in the raw and processed prefixes in the data lake.

``` json
{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss ZZ")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}} HTTP/1.1" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"
```

Once you log in with your Cognito user into KDG you will be filling out the form like below with the appropriate Kinesis Data Stream and paste the json snippet above into the form to simultae Apache logs. Once you have correctly filled out the form you will click 'Send Data' and Apache logs will be sent to your Kinesis Data Stream.

![KDG Setup](../../docs/assets/images/kdg-send.png)

At this point if you have everything setup correctly you will see 500 errors streaming to your Slack channel. Another thing you will see is data persist into the S3 bucket created in the CloudFormation template output param `WeblogsBucket` in prefixes raw for the raw data and processed for Parquet formatted data.

![S3 Setup](../../docs/assets/images/kinesis-s3-bucket.png)

### [Query the Data with Athena](https://aws.amazon.com/athena/)

For the self-serve end users that need the ability to create ad-hoc queries against the data Athena is a great choice the utilizes Presto and ANSI SQL to query a number of file formats on S3.

To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/). You can also use the AWS console by browsing to the Athena service and run queries through the browser. Alternatively, you can also use the [JDBC/ODBC](https://docs.aws.amazon.com/athena/latest/ug/athena-bi-tools-jdbc-odbc.html) drivers available.

In [None]:
!pip install PyAthena

### Select Query from Raw data

In this first query we will create a simple query to show the ability of Athena to query the raw CSV data.

In [None]:
%%time
from pyathena import connect
from pyathena.util import as_pandas

cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()
cursor.execute('select * from weblogs.r_streaming_logs limit 10')

df = as_pandas(cursor)

In [None]:
df.head(5)

### Select Query from Processed data

In this first query we will create a simple query to show the ability of Athena to query the processed Parquet data.

In [None]:
%%time
cursor.execute('select * from weblogs.p_streaming_logs limit 10')

df = as_pandas(cursor)

In [None]:
df.head(5)

## Cleanup

In [None]:
!aws s3 rb s3://$event_bucket --force

In [None]:
!aws s3 rb s3://$bucket --force

In [None]:
response = cfn.delete_stack(StackName=stack_name)

In [None]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
 StackName=stack_name
)

print('The wait is over for {0}'.format(stack_name))

In [None]:
response = firehose.delete_delivery_stream(
 DeliveryStreamName=delivery_stream_name
)