AWSTemplateFormatVersion: 2010-09-09 Description: Kinesis Swiss Army Knife Parameters: GlacierTransition: Type: Number Description: Number of days until S3 objects are moved to Glacier Default: 7 GlacierExpiration: Type: Number Description: Number of days (from creation) when objects are deleted from S3 and Glacier Default: 365 SlackWebHookUrl: Type: String NoEcho: true Description: Slack Webhook Url SlackChannel: Type: String Default: "#kinesis" Description: Slack Channel Name Resources: LambdaBasicExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:GetTable - glue:GetTables - glue:CreatePartition Resource: '*' FirehoseExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Condition: StringEquals: sts:ExternalId: !Ref 'AWS::AccountId' Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:GetTableVersions Resource: '*' - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !Sub arn:aws:s3:::${S3RootBucket} - !Sub arn:aws:s3:::${S3RootBucket}/* - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: !Join - ':' - - !GetAtt PreProcessLambdaFunc.Arn - '$LATEST' - Effect: Allow Action: - logs:* Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - kinesis:DescribeStream - kinesis:GetShardIterator - kinesis:GetRecords Resource: - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME% - !GetAtt EventStream.Arn - Effect: Allow Action: - kms:Decrypt Resource: !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID% Condition: StringEquals: kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com StringLike: kms:EncryptionContext:aws:kinesis:arn : !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::Region}:stream/%FIREHOSE_STREAM_NAME% ApplyNotificationFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Path: / Policies: - PolicyName: S3BucketNotificationPolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowBucketNotification Effect: Allow Action: s3:PutBucketNotification Resource: - !Sub 'arn:aws:s3:::${S3RootBucket}' - !Sub 'arn:aws:s3:::${S3RootBucket}/*' StreamRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: 'lambda.amazonaws.com' Action: 'sts:AssumeRole' Policies: - PolicyName: lambda PolicyDocument: Statement: - Effect: Allow Action: - 'kinesis:ListShards' Resource: '*' - Effect: Allow Action: - 'kinesis:SubscribeToShard' - 'kinesis:DescribeStreamSummary' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' Resource: - !GetAtt EventStream.Arn - Effect: Allow Action: - 'kinesis:SubscribeToShard' Resource: - !GetAtt EventStreamConsumer.ConsumerARN - Effect: Allow Action: - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: '*' PreProcessLambdaFunc: Type: AWS::Lambda::Function Properties: Code: ZipFile: | from __future__ import print_function import base64 import json import re import logging from dateutil.parser import parse from datetime import datetime, tzinfo, timedelta import urllib.parse logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): output = [] succeeded_record_cnt = 0 failed_record_cnt = 0 safe_string_to_int = lambda x: int(x) if x.isdigit() else x for record in event['records']: print(record['recordId']) payload = base64.b64decode(record['data']) logger.info('%s', payload) p = re.compile(r"^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] \"(\S+)\s?(\S+)?\s?(\S+)?\" (\d{3}|-) (\d+|-)\s?\"?([^\"]*)\"?\s?\"?([^\"]*)?\"?$") m = p.match(payload.decode('utf-8')) if m: succeeded_record_cnt += 1 ts = m.group(4) try: d = parse(ts.replace(':', ' ', 1)) ts = d.isoformat() except: logger.error('Parsing the timestamp to date failed.') data_field = { 'host_address': m.group(1), 'request_time': ts, 'request_method': m.group(5), 'request_path': m.group(6), 'request_protocol': m.group(7), 'response_code': m.group(8), 'response_size': m.group(9), 'referrer_host': m.group(10), 'user_agent': m.group(11) } rec = json.dumps(data_field) output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(rec.encode()).decode() } else: print('Parsing failed') failed_record_cnt += 1 output_record = { 'recordId': record['recordId'], 'result': 'ProcessingFailed', 'data': record['data'] } output.append(output_record) logger.info('Processing completed. Successful records %s, Failed records %s.'.format(succeeded_record_cnt, failed_record_cnt)) return {'records': output} Description: Pre-Process for Apache log timestamps Handler: index.lambda_handler Role: !GetAtt LambdaBasicExecutionRole.Arn Runtime: python3.6 Timeout: 120 MemorySize: 1280 NotificationLambdaFunction: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.6 Code: ZipFile: | from __future__ import print_function import boto3 import botocore import base64 import json import re import os import urllib.parse import logging from dateutil.parser import parse from datetime import datetime, tzinfo, timedelta glue = boto3.client('glue') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): account_id = boto3.client('sts').get_caller_identity().get('Account') database_name = os.getenv("DATABASE_NAME") bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote(event['Records'][0]['s3']['object']['key']) values=key.split("/") prefixes = "/".join(values[0:2]) location = "s3://{0}/{1}/".format(bucket, prefixes) response = glue.get_tables( CatalogId=account_id, DatabaseName=database_name ) for table in response['TableList']: if table['StorageDescriptor']['Location'] == location: table_name = table['Name'] values=values[2:-1] partition_location = "s3://{0}/{1}".format(bucket, key[0: key.rfind("/")+1]) partitions = [i.split('=')[1] for i in values] partitionInput = {} partitionInput['StorageDescriptor'] = table['StorageDescriptor'] partitionInput['StorageDescriptor']['Location'] = partition_location partitionInput['Values'] = partitions try: p=glue.create_partition( CatalogId=account_id, DatabaseName=database_name, TableName=table_name, PartitionInput=partitionInput ) logger.info('Partition Added %s', location) except glue.exceptions.AlreadyExistsException as e: logger.error('Partition already exists %s', location) break return Role: !GetAtt LambdaBasicExecutionRole.Arn Timeout: 60 MemorySize: 512 Environment: Variables: DATABASE_NAME : !Ref DataLakeDatabase StreamFunction: Type: AWS::Lambda::Function Properties: Handler: 'index.lambda_handler' Runtime: 'python3.6' MemorySize: 128 Timeout: 30 Role: !GetAtt 'StreamRole.Arn' Code: ZipFile: | import boto3 import botocore import base64 import json import os import re import logging from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError from dateutil.parser import parse from datetime import datetime, tzinfo, timedelta HOOK_URL = os.getenv("WEB_HOOK_URL") logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): for record in get_records(event): p = re.compile(r"^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] \"(\S+)\s?(\S+)?\s?(\S+)?\" (\d{3}|-) (\d+|-)\s?\"?([^\"]*)\"?\s?\"?([^\"]*)?\"?$") m = p.match(record.decode('utf-8')) if m: ts = m.group(4) try: d = parse(ts.replace(':', ' ', 1)) ts = d.isoformat() except e: logger.error("Request failed: %d %s", e.code, e.reason) if m.group(8) == '500': data_field = { 'host_address': m.group(1), 'request_time': ts, 'request_method': m.group(5), 'request_path': m.group(6), 'request_protocol': m.group(7), 'response_code': m.group(8), 'response_size': m.group(9), 'referrer_host': m.group(10), 'user_agent': m.group(11) } rec = json.dumps(data_field) notify_to_slack(rec) return "Successfully processed {} Kinesis Records(s)" def get_records(update): data = [] if "Records" in update: for record in update["Records"]: if "kinesis" in record and "data" in record['kinesis']: decoded_data = base64.b64decode(record['kinesis']['data']) logger.info('%s', decoded_data) data.append(decoded_data) return data def notify_to_slack(log_entry): payload = { 'text': log_entry, 'username': 'Apache Log 500 Error Notifier', 'icon_emoji': ':rage:' } req = Request(HOOK_URL, json.dumps(payload).encode('utf-8')) try: response = urlopen(req) response.read() logger.info("Message posted to Slack.") except HTTPError as e: logger.error("Request failed: %d %s", e.code, e.reason) except URLError as e: logger.error("Server connection failed: %s", e.reason) Environment: Variables: WEB_HOOK_URL : !Ref SlackWebHookUrl LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref NotificationLambdaFunction Principal: s3.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !GetAtt S3RootBucket.Arn S3RootBucket: Type: AWS::S3::Bucket DependsOn: NotificationLambdaFunction Properties: LifecycleConfiguration: Rules: - Id: GlacierRule Status: Enabled ExpirationInDays: !Ref GlacierExpiration Transitions: - TransitionInDays: !Ref GlacierTransition StorageClass: GLACIER Prefix: weblogs/raw ApplyBucketNotificationFunction: Type: AWS::Lambda::Function Properties: Description: Function to update partitions when new objects land in S3 Handler: index.handler Runtime: python2.7 Role: !GetAtt 'ApplyNotificationFunctionRole.Arn' Timeout: 240 Code: ZipFile: | import boto3 import logging import json import cfnresponse s3Client = boto3.client('s3') logger = logging.getLogger() logger.setLevel(logging.DEBUG) def addBucketNotification(bucketName, notificationId, functionArn): notificationResponse = s3Client.put_bucket_notification_configuration( Bucket=bucketName, NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'Id': notificationId, 'LambdaFunctionArn': functionArn, 'Events': [ 's3:ObjectCreated:*' ] }, ] } ) return notificationResponse def create(properties, physical_id): bucketName = properties['S3Bucket'] notificationId = properties['NotificationId'] functionArn = properties['FunctionARN'] response = addBucketNotification(bucketName, notificationId, functionArn) logger.info('AddBucketNotification response: %s' % json.dumps(response)) return cfnresponse.SUCCESS, physical_id def update(properties, physical_id): return cfnresponse.SUCCESS, None def delete(properties, physical_id): return cfnresponse.SUCCESS, None def handler(event, context): logger.info('Received event: %s' % json.dumps(event)) status = cfnresponse.FAILED new_physical_id = None try: properties = event.get('ResourceProperties') physical_id = event.get('PhysicalResourceId') status, new_physical_id = { 'Create': create, 'Update': update, 'Delete': delete }.get(event['RequestType'], lambda x, y: (cfnresponse.FAILED, None))(properties, physical_id) except Exception as e: logger.error('Exception: %s' % e) status = cfnresponse.FAILED finally: cfnresponse.send(event, context, status, {}, new_physical_id) ApplyNotification: Type: Custom::ApplyNotification Properties: ServiceToken: !GetAtt ApplyBucketNotificationFunction.Arn S3Bucket: !Ref S3RootBucket FunctionARN: !GetAtt NotificationLambdaFunction.Arn NotificationId: S3ObjectCreatedEvent EventStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 EventStreamConsumer: Type: AWS::Kinesis::StreamConsumer Properties: ConsumerName: lastUpdateConsumer StreamARN: !GetAtt EventStream.Arn EventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 16 Enabled: true EventSourceArn: !Ref EventStreamConsumer FunctionName: !GetAtt StreamFunction.Arn StartingPosition: LATEST StreamLogGroup: Type: 'AWS::Logs::LogGroup' Properties: LogGroupName: !Sub '/aws/lambda/${StreamFunction}' RetentionInDays: 14 KinesisFirehoseLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName : '/aws/kinesisfirehose/dc_demo' RetentionInDays: 7 IngestionLogStream: Type: AWS::Logs::LogStream Properties : LogGroupName : !Ref KinesisFirehoseLogGroup LogStreamName : 'ingestion_stream' RawLogStream: Type: AWS::Logs::LogStream Properties : LogGroupName : !Ref KinesisFirehoseLogGroup LogStreamName : 'raw_stream' DataLakeDatabase: Type: AWS::Glue::Database Properties: DatabaseInput: Name: weblogs Description: Kinesis Swiss Army Knife demo database CatalogId: !Ref AWS::AccountId CuratedStreamingTable: Type: AWS::Glue::Table Properties: DatabaseName: !Ref DataLakeDatabase CatalogId: !Ref AWS::AccountId TableInput: Name: p_streaming_logs StorageDescriptor: Compressed: False InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat NumberOfBuckets: -1 OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat StoredAsSubDirectories: False Location: !Join - '/' - - 's3:/' - !Ref S3RootBucket - 'weblogs/processed/' SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Parameters: serialization.format: '1' Columns: - Name: host_address Type: string - Name: request_time Type: string - Name: request_method Type: string - Name: request_path Type: string - Name: request_protocol Type: string - Name: response_code Type: string - Name: response_size Type: string - Name: referrer_host Type: string - Name: user_agent Type: string Parameters: classification: 'parquet' compressionType: 'none' PartitionKeys: - Name: year Type: int - Name: month Type: int - Name: day Type: int - Name: hour Type: int Parameters: classification: 'parquet' compressionType: 'none' TableType: 'EXTERNAL_TABLE' RawStreamingTable: Type: AWS::Glue::Table Properties: DatabaseName: !Ref DataLakeDatabase CatalogId: !Ref AWS::AccountId TableInput: Name: r_streaming_logs StorageDescriptor: Compressed: False InputFormat: org.apache.hadoop.mapred.TextInputFormat NumberOfBuckets: -1 OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat StoredAsSubDirectories: False Location: !Join - '/' - - 's3:/' - !Ref S3RootBucket - 'weblogs/raw/' SerdeInfo: SerializationLibrary: com.amazonaws.glue.serde.GrokSerDe Parameters: input.format: "%{COMBINEDAPACHELOG}" Columns: - Name: clientip Type: string - Name: ident Type: string - Name: auth Type: string - Name: timestamp Type: string - Name: verb Type: string - Name: request Type: string - Name: httpversion Type: string - Name: response Type: string - Name: bytes Type: string - Name: referrer Type: string - Name: agent Type: string Parameters: classification: 'combinedapache' compressionType: 'none' grokPattern: "%{COMBINEDAPACHELOG}" PartitionKeys: - Name: year Type: int - Name: month Type: int - Name: day Type: int - Name: hour Type: int Parameters: classification: 'combinedapache' compressionType: 'none' grokPattern: "%{COMBINEDAPACHELOG}" TableType: 'EXTERNAL_TABLE' Outputs: FirehoseExecutionRole: Description: Firehose IAM Role Value: !GetAtt FirehoseExecutionRole.Arn LambdaPreProcessArn: Description: Pre Processing Arn Value: !GetAtt PreProcessLambdaFunc.Arn GlueDatabase: Description: Glue Database Value: !Ref DataLakeDatabase RawTable: Description: Raw Glue Table Value: !Ref RawStreamingTable CuratedTable: Description: Curated Glue Table Value: !Ref CuratedStreamingTable EventStream: Description: Kinesis Event Stream Value: !Ref EventStream WeblogsBucket: Description: S3 Bucket for weblog events Value: !Ref S3RootBucket FirehoseLogGroup: Description: CloudWatch Log Group for Firehose Value: !Ref KinesisFirehoseLogGroup KinesisEventStream: Description: Kinesis Data Stream Value: !GetAtt EventStream.Arn