AWSTemplateFormatVersion: '2010-09-09' Description: A serverless datalake workshop. Resources: IngestionBucket: Type: AWS::S3::Bucket ApacheLogs: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /${AWS::StackName}/apache RetentionInDays: 1 ApacheLogsKinesis: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: RoleARN: !GetAtt ApacheLogsServiceRole.Arn BucketARN: !GetAtt IngestionBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 3 CloudWatchLoggingOptions: Enabled: False CompressionFormat: UNCOMPRESSED Prefix: weblogs/live/ ProcessingConfiguration: Enabled: true Processors: - Type: Lambda Parameters: - ParameterName: LambdaArn ParameterValue: !Sub ${TransformKinesis.Arn} - ParameterName: BufferSizeInMBs ParameterValue: 3 - ParameterName: BufferIntervalInSeconds ParameterValue: 60 CloudWatchLogsToKinesis: Type: AWS::Logs::SubscriptionFilter Properties: DestinationArn: !Sub ${ApacheLogsKinesis.Arn} FilterPattern: "" LogGroupName: !Sub ${ApacheLogs} RoleArn: !Sub ${LogsToKinesisServiceRole.Arn} LogsToKinesisServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: !Sub logs.${AWS::Region}.amazonaws.com Action: sts:AssumeRole LogsToKinesisRolePolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub ${AWS::StackName}_logs_kineis_policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'firehose:*' Resource: - !Sub '${ApacheLogsKinesis.Arn}' - Effect: Allow Action: - 'iam:PassRole' Resource: - !Sub '${LogsToKinesisServiceRole.Arn}' Roles: - !Ref 'LogsToKinesisServiceRole' ApacheLogsServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole ApacheLogsRolePolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub ${AWS::StackName}_weblog_delivery_policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 's3:*' Resource: - !Sub '${IngestionBucket.Arn}/*' - !Sub '${IngestionBucket.Arn}' - Effect: Allow Action: - 'lambda:InvokeFunction' - 'lambda:InvokeAsync' Resource: - !Sub '${TransformKinesis.Arn}' Roles: - !Ref 'ApacheLogsServiceRole' TransformKinesisFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess TransformKinesis: Type: 'AWS::Lambda::Function' Properties: Handler: index.handler Runtime: python2.7 Description: '' MemorySize: 512 Timeout: 60 Role: !GetAtt TransformKinesisFunctionRole.Arn Code: ZipFile: !Sub | import base64 import json import gzip import StringIO import boto3 def transformLogEvent(log_event): return log_event['message'] + '\n' def processRecords(records): for r in records: data = base64.b64decode(r['data']) striodata = StringIO.StringIO(data) with gzip.GzipFile(fileobj=striodata, mode='r') as f: data = json.loads(f.read()) recId = r['recordId'] """ CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable. They do not contain actual data. """ if data['messageType'] == 'CONTROL_MESSAGE': yield { 'result': 'Dropped', 'recordId': recId } elif data['messageType'] == 'DATA_MESSAGE': data = ''.join([transformLogEvent(e) for e in data['logEvents']]) print data data = base64.b64encode(data) yield { 'data': data, 'result': 'Ok', 'recordId': recId } else: yield { 'result': 'ProcessingFailed', 'recordId': recId } def handler(event, context): records = list(processRecords(event['records'])) return {"records": records} RedshiftServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: redshift.amazonaws.com Action: sts:AssumeRole RedshiftRolePolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub ${AWS::StackName}_redshift_policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 's3:*' Resource: - !Sub '${IngestionBucket.Arn}/*' - !Sub '${IngestionBucket.Arn}' - Effect: Allow Action: - 'glue:CreateDatabase' - 'glue:DeleteDatabase' - 'glue:GetDatabase' - 'glue:GetDatabases' - 'glue:UpdateDatabase' - 'glue:CreateTable' - 'glue:DeleteTable' - 'glue:BatchDeleteTable' - 'glue:UpdateTable' - 'glue:GetTable' - 'glue:GetTables' - 'glue:BatchCreatePartition' - 'glue:CreatePartition' - 'glue:DeletePartition' - 'glue:BatchDeletePartition' - 'glue:UpdatePartition' - 'glue:GetPartition' - 'glue:GetPartition' - 'glue:BatchGetPartition' Resource: '*' Roles: - !Ref 'RedshiftServiceRole' WriteLogsFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Runtime: python2.7 Description: '' MemorySize: 512 Timeout: 55 Role: !GetAtt TransformKinesisFunctionRole.Arn Environment: Variables: BUCKET_NAME: !Ref IngestionBucket LOG_GROUP_NAME: !Sub /${AWS::StackName}/apache Code: ZipFile: !Sub | import boto3 import random import string import uuid import json import os import datetime import calendar import time import urllib2 profiles = json.loads(urllib2.urlopen("https://s3.${AWS::Region}.amazonaws.com/${IngestionBucket}/data-gen/profiles.json").read()) requestUrls = ["GET /petstore/Fish/Feeders", "GET /petstore/Cats/DryFood", "GET /petstore/Bird/Etc", "GET /petstore/Dogs/DryFood", "GET /petstore/Dogs/CannedFood", "GET /petstore/Fish/PlantCare", "GET /petstore/Fish/Food", "GET /petstore/Cats/WetFood", "GET /petstore/Bird/Treats", "GET /petstore/Bird/Food", "GET /petstore/Dogs/FoodToppers", "GET /petstore/Cats/Treats"] client = boto3.client('logs') def lambda_handler(event, context): timestamp = datetime.datetime.now() nextSeqTokem = "" logGroupName = os.environ['LOG_GROUP_NAME'] logStreamName = timestamp.strftime('%d-%b-%Y-%H-%M-%S') response = client.create_log_stream( logGroupName=logGroupName, logStreamName=logStreamName ) waittime = 100 #write log every 100 ms while context.get_remaining_time_in_millis() > (waittime+1000): time.sleep(float(waittime)/1000.0) profile = profiles[random.randint(0,len(profiles)-1)] requestUrl = requestUrls[random.randint(0,len(requestUrls)-1)] timestamp = datetime.datetime.now() milliseconds = timestamp.microsecond / 1000 ts = calendar.timegm(timestamp.timetuple())*1000 + milliseconds logentry = profile["ip"] + "," + profile["username"] + "," + timestamp.strftime('%d/%b/%Y:%H:%M:%S') + ",\"" + requestUrl +"\",200," + str(random.randint(300,1000)) if nextSeqTokem == "" : response = client.put_log_events( logGroupName=logGroupName, logStreamName=logStreamName, logEvents=[ { 'timestamp': ts, 'message': logentry } ] ) nextSeqTokem = response["nextSequenceToken"] else: response = client.put_log_events( logGroupName=logGroupName, logStreamName=logStreamName, logEvents=[ { 'timestamp': ts, 'message': logentry } ], sequenceToken=nextSeqTokem ) nextSeqTokem = response["nextSequenceToken"] waittime = random.randint(1,100) return context.get_remaining_time_in_millis() SchedulerPolicyLambda: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !GetAtt WriteLogsFunction.Arn Principal: events.amazonaws.com SchedulerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: events.amazonaws.com Action: sts:AssumeRole SchedulerPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub ${AWS::StackName}_scheduler_policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'lambda:InvokeFunction' - 'lambda:InvokeAsync' Resource: - !Sub '${WriteLogsFunction.Arn}' Roles: - !Ref 'SchedulerRole' EventScheduler: Type: AWS::Events::Rule Properties: Description: Triggers the data generation RoleArn: !GetAtt SchedulerRole.Arn ScheduleExpression: rate(1 minute) Targets : - Id: !Sub SchedulerRole Arn: !GetAtt WriteLogsFunction.Arn LoadSampleDataFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Runtime: python2.7 Description: '' MemorySize: 512 Timeout: 600 Role: !GetAtt TransformKinesisFunctionRole.Arn Environment: Variables: BUCKET_NAME: !Ref IngestionBucket SOURCE_BUCKET_NAME: arc326-instructions GLUE_SVC_ROLE: !Ref GlueDevServiceRole Code: ZipFile: !Sub | import boto3 import random import string import uuid import httplib import urlparse import json import base64 import hashlib import os import cfnresponse s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3 = boto3.resource('s3') srcB = 'arc326-instructions' def lambda_handler(event, context): try: print("Received event: " + json.dumps(event, indent=2)) return process_cfn(event, context) except Exception as e: print("EXCEPTION", e) rData = {} rData['Data'] = "Create Failed" cfnresponse.send(event, { 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'] }, cfnresponse.FAILED, rData) def delete_files(event, content): bkt = os.environ['BUCKET_NAME'] marker = '' maxKeys = 100 isTrunc = True s3_bucket = s3_client.list_objects_v2(Bucket=bkt, MaxKeys=maxKeys) while isTrunc: isTrunc = s3_bucket['IsTruncated'] objects = [] for obj in s3_bucket['Contents'] : objects.append({"Key": obj["Key"]}) response = s3_client.delete_objects( Bucket=bkt, Delete={ 'Objects': objects, 'Quiet': True } ) if isTrunc: marker = s3_bucket['NextContinuationToken'] s3_bucket = s3_client.list_objects_v2(Bucket=bkt, ContinuationToken=marker, MaxKeys=maxKeys) return "Success" def process_cfn(event, context): bkt = os.environ['BUCKET_NAME'] response = { 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Status': 'IN_PROCESS', } if event['RequestType'] == 'Delete': delete_files(event, context) rData = {} rData['Data'] = "Delete Succcessful" return cfnresponse.send(event, context, cfnresponse.SUCCESS, rData) copy_files(event, context) glue_response = glue_client.get_dev_endpoint(EndpointName=event['ResourceProperties']['GlueDevEndpoint']) rData = {} rData['Data'] = "Create Succcessful" rData['GlueYarnEndpointAddress'] = glue_response['DevEndpoint']['YarnEndpointAddress'].split('.', 3)[0] rData['GluePrivateAddress'] = glue_response['DevEndpoint']['PrivateAddress'].split('.', 3)[0] return cfnresponse.send(event, context, cfnresponse.SUCCESS, rData) def copy_files(event, context): stackName = event['ResourceProperties']['StackName'] bkt = os.environ['BUCKET_NAME'] gluerole = os.environ['GLUE_SVC_ROLE'] response = s3_client.copy_object( Bucket=bkt, CopySource={'Bucket': srcB, 'Key': 'sample-data/useractivity.csv'}, Key='raw/useractivity/useractivity.csv' ) response = s3_client.copy_object( Bucket=bkt, CopySource={'Bucket': srcB, 'Key': 'sample-data/zipcodedata.csv'}, Key='raw/zipcodes/zipcodedata.csv' ) response = s3_client.copy_object( Bucket=bkt, ACL='public-read', CopySource={'Bucket': srcB, 'Key': 'data-gen/profiles.json'}, Key='data-gen/profiles.json' ) response = s3_client.copy_object( Bucket=bkt, CopySource={'Bucket': srcB, 'Key': 'sample-data/userprofile.csv'}, Key='raw/userprofile/userprofile.csv' ) replaceText(bkt, stackName, gluerole, 'instructions/instructions-template.html', 'instructions/instructions.html') replaceText(bkt, stackName, gluerole, 'instructions/labs-template.ipynb', 'instructions/labs.ipynb') def replaceText(bkt, stackName, gluerole, src, dst): src = s3.Object(srcB, src) html = src.get()['Body'].read().decode('utf-8') html = html.replace('^ingestionbucket^', bkt).replace('^stackname^', stackName).replace('^gluerole^', gluerole) destination = s3.Object(bkt, dst) result = destination.put(Body=html, ACL='public-read', ContentDisposition='inline', ContentType='text/html') return "Success" VPC: Type: AWS::EC2::VPC Properties: CidrBlock: 10.1.0.0/16 EnableDnsHostnames: true Tags: - Key: Application Value: !Ref 'AWS::StackId' - Key: Name Value: !Sub DataLakeVpc-${AWS::StackName} PublicDataLakeVpcSubnet: Type: AWS::EC2::Subnet Properties: VpcId: !Ref 'VPC' CidrBlock: 10.1.0.0/24 MapPublicIpOnLaunch: true AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: "" Tags: - Key: Application Value: !Ref 'AWS::StackId' - Key: Name Value: !Sub Public-DataLakeVpc-${AWS::StackName} InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Application Value: !Ref 'AWS::StackId' AttachGateway: Type: AWS::EC2::VPCGatewayAttachment Properties: VpcId: !Ref 'VPC' InternetGatewayId: !Ref 'InternetGateway' RouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref 'VPC' Tags: - Key: Application Value: !Ref 'AWS::StackId' - Key: Name Value: !Sub Public-DataLakeVpc-${AWS::StackName} Route: Type: AWS::EC2::Route DependsOn: AttachGateway Properties: RouteTableId: !Ref 'RouteTable' DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref 'InternetGateway' SubnetRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref 'PublicDataLakeVpcSubnet' RouteTableId: !Ref 'RouteTable' NetworkAcl: Type: AWS::EC2::NetworkAcl Properties: VpcId: !Ref 'VPC' Tags: - Key: Application Value: !Ref 'AWS::StackId' InboundHTTPNetworkAclEntry: Type: AWS::EC2::NetworkAclEntry Properties: NetworkAclId: !Ref 'NetworkAcl' RuleNumber: '100' Protocol: '-1' RuleAction: allow Egress: 'false' CidrBlock: 0.0.0.0/0 OutboundHTTPNetworkAclEntry: Type: AWS::EC2::NetworkAclEntry Properties: NetworkAclId: !Ref 'NetworkAcl' RuleNumber: '100' Protocol: '-1' RuleAction: allow Egress: 'true' CidrBlock: 0.0.0.0/0 PublicDataLakeVpcSubnetNetworkAclAssociation: Type: AWS::EC2::SubnetNetworkAclAssociation Properties: SubnetId: !Ref 'PublicDataLakeVpcSubnet' NetworkAclId: !Ref 'NetworkAcl' GlueSGIngress: Type: AWS::EC2::SecurityGroupIngress DependsOn: GlueSecurityGroup Properties: GroupId: !Ref GlueSecurityGroup IpProtocol: -1 FromPort: -1 ToPort: -1 SourceSecurityGroupId: !Ref GlueSecurityGroup GlueSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: Enable SSH access via port 22 SecurityGroupIngress: - IpProtocol: tcp FromPort: '22' ToPort: '22' CidrIp: '0.0.0.0/0' - IpProtocol: tcp FromPort: '80' ToPort: '80' CidrIp: 0.0.0.0/0 - IpProtocol: tcp FromPort: '443' ToPort: '443' CidrIp: 0.0.0.0/0 Tags: - Key: Application Value: !Ref 'AWS::StackId' - Key: Name Value: !Sub DataLakeSecurityGroup-${AWS::StackName} GlueDevServiceRole: Type: AWS::IAM::Role Properties: ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AWSGlueConsoleSageMakerNotebookFullAccess - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: ec2.amazonaws.com Action: sts:AssumeRole - Effect: Allow Principal: Service: glue.amazonaws.com Action: sts:AssumeRole - Effect: Allow Principal: Service: sagemaker.amazonaws.com Action: sts:AssumeRole GlueDevRolePolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub ${AWS::StackName}_glue_endpoint_policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 's3:*' - 'DescribeSubnets:*' - 'logs:*' Resource: - '*' Roles: - !Ref 'GlueDevServiceRole' S3Endpoint: Type: AWS::EC2::VPCEndpoint Properties: PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: '*' Action: - '*' Resource: - '*' RouteTableIds: - !Ref RouteTable ServiceName: !Sub com.amazonaws.${AWS::Region}.s3 VpcId: !Ref VPC GlueDevEndpoint: Type: AWS::Glue::DevEndpoint Properties: EndpointName: !Sub datalake-${AWS::StackName} PublicKey: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCyQ19lVS4Td9LT66vTlIBxD4youu1fZJKnJP+k0aBMKZ3W2GPpq31VmPQEqJbfYPWPRRRagSiWlzvk/3Y3DSCnAhmU/XGla+JTkITTA6fmn1o9ymwyg0mrmdMHZK16yd9f++dfzJ5fdspd53N4a0KGJooST4YOXk4RanakelL3tjePqHFpg0uShgqeOaP9aaU9LpE39Q77TFWFfqfjve5HH33NEmBMphmnvHsOWcx7eydCPIN17xnClJDCf8GvELVB6ef2qGWipP7YaM0UTjlaiBV7Q5ciCfFnAf1FMykzYUnHK9tE5aTOI8XOst0syPEbhegPSxOhrDf5Y08gC7ut gillemi@9801a79dae39' RoleArn: !GetAtt GlueDevServiceRole.Arn SecurityGroupIds: - !Ref GlueSecurityGroup SubnetId: !Ref PublicDataLakeVpcSubnet NotebookProfile: Type: "AWS::IAM::InstanceProfile" Properties: Roles: - !Ref GlueDevServiceRole LoadSampleData: Type: Custom::LoadSampleData DependsOn: - IngestionBucket Properties: ServiceToken: !GetAtt LoadSampleDataFunction.Arn StackName: !Ref AWS::StackName GlueDevEndpoint: !Ref GlueDevEndpoint Outputs: RedshiftRole: Description: The role that redshift uses to access data Value: !GetAtt RedshiftServiceRole.Arn WorkshopInstructionsUrl: Description: Follow the link for the instructions for the serverless datalake workshop. Value: !Sub https://s3.${AWS::Region}.amazonaws.com/${IngestionBucket}/instructions/instructions.html