AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: > This stack creates: a. A DynamoDB table named – ProductPriority with a few items loaded. b. An S3 bucket named – demo-bucket-. c. A Lambda function named demo-glue-script-creator-lambda d. A Lambda function named demo-reference-data-change-handler e. A Kinesis data stream - SourceKinesisStream f. An AWS Glue Data Catalog database called “my-database” g. 2 data catalog tables h. A Glue job called demo-glue-job- i. An IAM role for the Lambdas to access Kinesis, S3 and DynamoDB Streams j. An IAM role for the Glue job to access Kinesis, S3 and DynamoDB Resources: S3Bucket: Type: AWS::S3::Bucket Properties: AccessControl: "LogDeliveryWrite" BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Join - '-' - - 'demo-bucket' - !Ref AWS::AccountId LoggingFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.lambda_handler FunctionName: 'demo-reference-data-change-handler' Policies: - KinesisCrudPolicy: StreamName: !Ref KinesisStream - DynamoDBStreamReadPolicy: TableName: !Ref DynamoDBTable StreamName: !Select [3, !Split ["/", !GetAtt DynamoDBTable.StreamArn]] InlineCode: | import json import boto3 from datetime import datetime import calendar import random import time import os print('Loading function') region = os.environ['AWS_REGION'] my_stream_name = 'SourceKinesisStream' kinesis_client = boto3.client('kinesis', region_name=region) def put_to_stream(): payload = {"dish": "CHANGE_FLAG","cost": 0,"customer_id": 0} print(payload) put_response = kinesis_client.put_record( StreamName=my_stream_name, Data=json.dumps(payload), PartitionKey='CHANGE_FLAG' ) def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) for record in event['Records']: put_to_stream() num_records = len(event['Records']) return f'Successfully processed {num_records} records.' ReservedConcurrentExecutions: 2 DynamoDBTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "item" AttributeType: "S" - AttributeName: "price" AttributeType: "N" TableName: "ProductPriority" KeySchema: - AttributeName: "item" KeyType: "HASH" - AttributeName: "price" KeyType: "RANGE" ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 StreamSpecification: StreamViewType: "KEYS_ONLY" SSESpecification: KMSMasterKeyId: alias/aws/dynamodb SSEEnabled: true SSEType: KMS PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true BillingMode: PROVISIONED CustomResourceDataAndFileCreator: Type: Custom::CustomResource Properties: ServiceToken: !GetAtt 'glueScriptCreationFunction.Arn' bucket_nm: !Ref S3Bucket ScriptLocation: 'code/glue/demo-glue-job.py' DynamoTblNm: !Ref DynamoDBTable glueScriptCreationFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.lambda_handler FunctionName: 'demo-glue-script-creator-lambda' ReservedConcurrentExecutions: 2 Policies: - DynamoDBCrudPolicy: TableName: !Ref DynamoDBTable - S3CrudPolicy: BucketName: !Ref S3Bucket InlineCode: | import json import boto3 import urllib import os def send_response(event, context, response): """Send a response to CloudFormation to handle the custom resource lifecycle""" response_body = { 'Status': response, 'Reason': 'See details in CloudWatch Log Stream: ' + \ context.log_stream_name, 'PhysicalResourceId': context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], } print('RESPONSE BODY: \n' + json.dumps(response_body)) data = json.dumps(response_body).encode('utf-8') req = urllib.request.Request( event['ResponseURL'], data, headers={'Content-Length': len(data), 'Content-Type': ''}) req.get_method = lambda: 'PUT' try: with urllib.request.urlopen(req) as resp: print(f'response.status: {resp.status}, ' + f'response.reason: {resp.reason}') print('response from cfn: ' + resp.read().decode('utf-8')) except urllib.error.URLError: raise Exception('Received non-200 response while sending response to AWS CloudFormation') return True def lambda_handler(event, context): # TODO implement script = """import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import DataFrame, Row import datetime from awsglue import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME","tgt_s3_bkt"]) sc = SparkContext() glueContext = GlueContext(SparkContext.getOrCreate("default")) spark = glueContext.spark_session logger = glueContext.get_logger() job = Job(glueContext) job.init(args["JOB_NAME"], args) # The S3 bucket name where the target data is stored bucket_nm=args["tgt_s3_bkt"] # Read Glue Catalog Table built on the Kinesis Data Stream df_SourceKinesisStream = glueContext.create_data_frame.from_catalog( database="my-database", table_name="source-kinesis-stream-tbl", additional_options={"startingPosition": "latest", "inferSchema": "false"}, transformation_ctx="df_SourceKinesisStream", ) # Initialize the count of times the DynamoDB refrence data is refreshed num_of_dynamo_refreshes = 0 logger.info("customlog - Finished reading dataframes") def processBatch(data_frame, batchId): # Declare global variables. Required since the reference data is initialized conditionally within the batch process method global num_of_dynamo_refreshes global dyf_dynamodb_reference logger.info("Starting microbatch...") if data_frame.count() > 0: # Check if there are any changes to the reference data change_flag = data_frame.where("dish=='CHANGE_FLAG'").count() # Create dataframe containing only messages from the real time source excluding changle flags, if any df_source_messages = data_frame.where("dish<>'CHANGE_FLAG'") # If there are changes or if this is the first run, refresh the reference data if change_flag > 0 or num_of_dynamo_refreshes==0 : # For debugging print("#####There has been a change in the reference data!!#####") dyf_dynamodb_reference = glueContext.create_dynamic_frame.from_catalog( database="my-database", table_name="productpriority" ) num_of_dynamo_refreshes=num_of_dynamo_refreshes + 1 # If there are other messages to be processed, perform the join if df_source_messages.count()>0: # Print the reference data for deubgging only dyf_dynamodb_reference_df = dyf_dynamodb_reference.toDF() dyf_dynamodb_reference_df.show() # Perform the join using Spark inner join df_join = df_source_messages.join(dyf_dynamodb_reference_df, data_frame.dish==dyf_dynamodb_reference_df.item,"inner") # Convert output of join to a Glue Dynamic Frame dyf_join = DynamicFrame.fromDF(df_join, glueContext, "from_data_frame1") # Select only required fields dyf_SelectFields = SelectFields.apply( frame=dyf_join, paths=["cost", "price", "priority", "customer_id", "item"], transformation_ctx="dyf_SelectFields", ) # Path where the target data should be stored s3_path = f"s3://{bucket_nm}/demo-final-data/" # Define the sink object s3_sink = glueContext.getSink( path=s3_path, connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["item", "priority"], enableUpdateCatalog=True, transformation_ctx="s3_sink", ) logger.info("Sink Established") s3_sink.setCatalogInfo( catalogDatabase="my-database", catalogTableName="demo-final-data" ) logger.info("Catalog Established") s3_sink.setFormat("csv") logger.info("Format Established") s3_sink.writeFrame(dyf_SelectFields) logger.info("Written Batch to S3") # Process Microbatches of real time data glueContext.forEachBatch( frame=df_SourceKinesisStream, batch_function=processBatch, options={ "windowSize": "10 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, ) job.commit() """ try: encoded_string = script.encode("utf-8") s3=boto3.resource("s3") print(event) s3_bucket = event['ResourceProperties']['bucket_nm'] script_location = event['ResourceProperties']['ScriptLocation'] s3.Object(s3_bucket, script_location).put(Body=encoded_string) client = boto3.resource('dynamodb') table = client.Table(event['ResourceProperties']['DynamoTblNm']) print(table.table_status) table.put_item( Item= {'item': 'salad','price': 100, 'priority':'high'} ) table.put_item( Item= {'item': 'pizza','price': 100, 'priority':'low'} ) table.put_item( Item= {'item': 'burger','price': 100, 'priority':'medium'} ) table.put_item( Item= {'item': 'pasta','price': 100, 'priority':'very high'} ) table.put_item( Item= {'item': 'ice-cream','price': 100, 'priority':'very very high'} ) response='SUCCESS' except: response='FAILED' send_response(event, context, response) KinesisStream: Type: "AWS::Kinesis::Stream" Properties: Name: "SourceKinesisStream" RetentionPeriodHours: 24 ShardCount: 4 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis DynamoDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: # The maximum number of DB items to send to Lambda BatchSize: 1 Enabled: True EventSourceArn: !GetAtt DynamoDBTable.StreamArn FunctionName: !GetAtt LoggingFunction.Arn # Always start at the tail of the Stream StartingPosition: LATEST GlueRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "AWSGlueServiceRoleDefaultdemo" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"glue.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Policies: - PolicyName: GlueS3AccessPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: ['s3:*','s3-object-lambda:*'] Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - /* ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" - "arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess" - "arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess" Description: "Allows Glue to call AWS services on your behalf. " GlueDatabase: Type: "AWS::Glue::Database" Properties: DatabaseInput: Name: "my-database" CatalogId: !Ref "AWS::AccountId" GlueCrawler: Type: "AWS::Glue::Crawler" Properties: Name: "demo-crawler" Role: !Ref GlueRole Targets: DynamoDBTargets: - Path: "ProductPriority" DatabaseName: "my-database" SchemaChangePolicy: UpdateBehavior: "UPDATE_IN_DATABASE" DeleteBehavior: "DEPRECATE_IN_DATABASE" GlueTable: Type: "AWS::Glue::Table" Properties: DatabaseName: "my-database" CatalogId: !Ref "AWS::AccountId" TableInput: TableType: "EXTERNAL_TABLE" Parameters: classification: "json" StorageDescriptor: Columns: - Name: "dish" Type: "string" - Name: "cost" Type: "decimal" - Name: "customer_id" Type: "int" Location: !Ref KinesisStream InputFormat: "org.apache.hadoop.mapred.TextInputFormat" OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" Compressed: false NumberOfBuckets: 0 SerdeInfo: SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe" Parameters: paths: "cost,customer_id,dish" Parameters: streamARN: !GetAtt KinesisStream.Arn typeOfData: "kinesis" StoredAsSubDirectories: false Retention: 0 Name: "source-kinesis-stream-tbl" GlueTable2: Type: "AWS::Glue::Table" Properties: DatabaseName: "my-database" CatalogId: !Ref "AWS::AccountId" TableInput: Owner: "owner" TableType: "EXTERNAL_TABLE" Parameters: CrawlerSchemaDeserializerVersion: "1.0" CrawlerSchemaSerializerVersion: "1.0" UPDATED_BY_CRAWLER: "demo-crawler" averageRecordSize: "24" classification: "dynamodb" compressionType: "none" hashKey: "item" rangeKey: "price" recordCount: "5" sizeKey: "121" typeOfData: "table" StorageDescriptor: Columns: - Name: "item" Type: "string" - Name: "price" Type: "bigint" - Name: "priority" Type: "string" Location: !GetAtt DynamoDBTable.Arn Compressed: false NumberOfBuckets: -1 SerdeInfo: Parameters: {} Parameters: CrawlerSchemaDeserializerVersion: "1.0" CrawlerSchemaSerializerVersion: "1.0" UPDATED_BY_CRAWLER: "demo-crawler" averageRecordSize: "24" classification: "dynamodb" compressionType: "none" hashKey: "item" rangeKey: "price" recordCount: "5" sizeKey: "121" typeOfData: "table" StoredAsSubDirectories: false Retention: 0 Name: "productpriority" GlueJob: Type: "AWS::Glue::Job" Properties: Name: !Join - '-' - - 'demo-glue-job' - !Ref AWS::AccountId Description: "" Role: !Ref GlueRole ExecutionProperty: MaxConcurrentRuns: 1 Command: Name: "gluestreaming" ScriptLocation: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/glue/demo-glue-job.py' PythonVersion: "3" DefaultArguments: --TempDir: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/temporary/' --class: "GlueApp" --enable-continuous-cloudwatch-log: "true" --enable-glue-datacatalog: "true" --enable-metrics: "true" --enable-spark-ui: "true" --job-bookmark-option: "job-bookmark-disable" --job-language: "python" --tgt_s3_bkt: !Ref S3Bucket --spark-event-logs-path: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/sparkHistoryLogs/' MaxRetries: 1 GlueVersion: "3.0" NumberOfWorkers: 2 WorkerType: "G.1X" Outputs: LoggingFunctionName: Value: !Ref LoggingFunction DynamoDBTableName: Value: !Ref DynamoDBTable S3Bucket: Value: !Ref S3Bucket