AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: > This stack creates: a. A DynamoDB table named – ProductPriority with a few items loaded. b. An S3 bucket named – demo-bucket-. c. A Lambda function named demo-glue-script-creator-lambda d. A Lambda function named demo-reference-data-change-handler e. A Kinesis data stream - SourceKinesisStream f. An AWS Glue Data Catalog database called “my-database” g. 2 data catalog tables h. A Glue job called demo-glue-job- i. An IAM role for the Lambdas to access Kinesis, S3 and DynamoDB Streams j. An IAM role for the Glue job to access Kinesis, S3 and DynamoDB Resources: S3Bucket: Type: AWS::S3::Bucket Properties: AccessControl: "LogDeliveryWrite" BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Join - '-' - - 'demo-bucket' - !Ref AWS::AccountId LoggingFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.lambda_handler FunctionName: 'demo-reference-data-change-handler' Environment: Variables: s3BucketNm: !Ref S3Bucket Policies: - DynamoDBStreamReadPolicy: TableName: !Ref DynamoDBTable StreamName: !Select [3, !Split ["/", !GetAtt DynamoDBTable.StreamArn]] - S3CrudPolicy: BucketName: !Ref S3Bucket InlineCode: | import json import boto3 from datetime import datetime import calendar import random import time import os print('Loading function') region = os.environ['AWS_REGION'] bucket = os.environ['s3BucketNm'] my_stream_name = 'SourceKinesisStream' kinesis_client = boto3.client('kinesis', region_name=region) def put_to_s3(): string = "changed" encoded_string = string.encode("utf-8") bucket_name = bucket file_name = "CHANGE_FLAG" s3_path = "change_flags/" + file_name s3 = boto3.resource("s3") s3.Bucket(bucket_name).put_object(Key=s3_path, Body=encoded_string) def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) for record in event['Records']: put_to_s3() num_records = len(event['Records']) return f'Successfully processed {num_records} records.' ReservedConcurrentExecutions: 2 DynamoDBTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "item" AttributeType: "S" - AttributeName: "price" AttributeType: "N" TableName: "ProductPriority" KeySchema: - AttributeName: "item" KeyType: "HASH" - AttributeName: "price" KeyType: "RANGE" ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 StreamSpecification: StreamViewType: "KEYS_ONLY" SSESpecification: KMSMasterKeyId: alias/aws/dynamodb SSEEnabled: true SSEType: KMS PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true BillingMode: PROVISIONED CustomResourceDataAndFileCreator: Type: Custom::CustomResource Properties: ServiceToken: !GetAtt 'glueScriptCreationFunction.Arn' bucket_nm: !Ref S3Bucket ScriptLocation: 'code/glue/demo-glue-job.py' DynamoTblNm: !Ref DynamoDBTable glueScriptCreationFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.lambda_handler FunctionName: 'demo-glue-script-creator-lambda' ReservedConcurrentExecutions: 2 Policies: - DynamoDBCrudPolicy: TableName: !Ref DynamoDBTable - S3CrudPolicy: BucketName: !Ref S3Bucket InlineCode: | import json import boto3 import urllib import os def send_response(event, context, response): """Send a response to CloudFormation to handle the custom resource lifecycle""" response_body = { 'Status': response, 'Reason': 'See details in CloudWatch Log Stream: ' + \ context.log_stream_name, 'PhysicalResourceId': context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], } print('RESPONSE BODY: \n' + json.dumps(response_body)) data = json.dumps(response_body).encode('utf-8') req = urllib.request.Request( event['ResponseURL'], data, headers={'Content-Length': len(data), 'Content-Type': ''}) req.get_method = lambda: 'PUT' try: with urllib.request.urlopen(req) as resp: print(f'response.status: {resp.status}, ' + f'response.reason: {resp.reason}') print('response from cfn: ' + resp.read().decode('utf-8')) except urllib.error.URLError: raise Exception('Received non-200 response while sending response to AWS CloudFormation') return True def lambda_handler(event, context): # TODO implement script = """import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import boto3 from botocore.exceptions import ClientError from pyspark.sql import DataFrame, Row import datetime from awsglue import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME","tgt_s3_bkt"]) sc = SparkContext() glueContext = GlueContext(SparkContext.getOrCreate("default")) spark = glueContext.spark_session logger = glueContext.get_logger() job = Job(glueContext) job.init(args["JOB_NAME"], args) def monitorChangeFlag(): change_flag=False # Default to false try: client = boto3.client("s3") s3 = boto3.resource('s3') obj = client.head_object(Bucket=bucket_nm, Key=change_key) change_flag=True s3.Object(bucket_nm, change_key).delete() # Delete the change flag to avoid redundant refreshes except ClientError as exc: if exc.response['Error']['Code'] != '404': change_flag=False return change_flag def refreshReferenceData(): dyf_dynamodb_reference_df = glueContext.create_dynamic_frame.from_catalog( database="my-database", table_name="productpriority" ) df_dynamodb_reference = dyf_dynamodb_reference_df.toDF() return df_dynamodb_reference # The S3 bucket name where the target data is stored bucket_nm=args["tgt_s3_bkt"] change_key = 'change_flags/CHANGE_FLAG' # Read Glue Catalog Table built on the Kinesis Data Stream df_SourceKinesisStream = glueContext.create_data_frame.from_catalog( database="my-database", table_name="source-kinesis-stream-tbl", additional_options={"startingPosition": "latest", "inferSchema": "false"}, transformation_ctx="df_SourceKinesisStream", ) df_dynamodb_reference = refreshReferenceData() def processBatch(data_frame, batchId): # Declare global variables. Required since the reference data is initialized conditionally within the batch process method global df_dynamodb_reference logger.info("Starting microbatch...") hasChanged = monitorChangeFlag() # If there are changes or if this is the first run, refresh the reference data if hasChanged or df_dynamodb_reference is None: df_dynamodb_reference = refreshReferenceData() # Perform the join using Spark inner join df_join = data_frame.join(df_dynamodb_reference, data_frame.dish==df_dynamodb_reference.item,"inner") # Convert output of join to a Glue Dynamic Frame dyf_join = DynamicFrame.fromDF(df_join, glueContext, "from_data_frame1") # Select only required fields dyf_SelectFields = SelectFields.apply( frame=dyf_join, paths=["cost", "price", "priority", "customer_id", "item"], transformation_ctx="dyf_SelectFields", ) # Path where the target data should be stored s3_path = f"s3://{bucket_nm}/demo-final-data/" # Define the sink object s3_sink = glueContext.getSink( path=s3_path, connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["item", "priority"], enableUpdateCatalog=True, transformation_ctx="s3_sink", ) logger.info("Sink Established") s3_sink.setCatalogInfo( catalogDatabase="my-database", catalogTableName="demo-final-data" ) logger.info("Catalog Established") s3_sink.setFormat("csv") logger.info("Format Established") s3_sink.writeFrame(dyf_SelectFields) logger.info("Written Batch to S3") # Process Microbatches of real time data glueContext.forEachBatch( frame=df_SourceKinesisStream, batch_function=processBatch, options={ "windowSize": "10 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, ) job.commit() """ try: encoded_string = script.encode("utf-8") s3=boto3.resource("s3") print(event) s3_bucket = event['ResourceProperties']['bucket_nm'] script_location = event['ResourceProperties']['ScriptLocation'] s3.Object(s3_bucket, script_location).put(Body=encoded_string) client = boto3.resource('dynamodb') table = client.Table(event['ResourceProperties']['DynamoTblNm']) print(table.table_status) table.put_item( Item= {'item': 'salad','price': 100, 'priority':'high'} ) table.put_item( Item= {'item': 'pizza','price': 100, 'priority':'low'} ) table.put_item( Item= {'item': 'burger','price': 100, 'priority':'medium'} ) table.put_item( Item= {'item': 'pasta','price': 100, 'priority':'very high'} ) table.put_item( Item= {'item': 'ice-cream','price': 100, 'priority':'very very high'} ) response='SUCCESS' except: response='FAILED' send_response(event, context, response) KinesisStream: Type: "AWS::Kinesis::Stream" Properties: Name: "SourceKinesisStream" RetentionPeriodHours: 24 ShardCount: 4 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis DynamoDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: # The maximum number of DB items to send to Lambda BatchSize: 1 Enabled: True EventSourceArn: !GetAtt DynamoDBTable.StreamArn FunctionName: !GetAtt LoggingFunction.Arn # Always start at the tail of the Stream StartingPosition: LATEST GlueRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "AWSGlueServiceRoleDefaultdemo" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"glue.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Policies: - PolicyName: GlueS3AccessPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: ['s3:*','s3-object-lambda:*'] Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - /* ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" - "arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess" - "arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess" Description: "Allows Glue to call AWS services on your behalf. " GlueDatabase: Type: "AWS::Glue::Database" Properties: DatabaseInput: Name: "my-database" CatalogId: !Ref "AWS::AccountId" GlueCrawler: Type: "AWS::Glue::Crawler" Properties: Name: "demo-crawler" Role: !Ref GlueRole Targets: DynamoDBTargets: - Path: "ProductPriority" DatabaseName: "my-database" SchemaChangePolicy: UpdateBehavior: "UPDATE_IN_DATABASE" DeleteBehavior: "DEPRECATE_IN_DATABASE" GlueTable: Type: "AWS::Glue::Table" Properties: DatabaseName: "my-database" CatalogId: !Ref "AWS::AccountId" TableInput: TableType: "EXTERNAL_TABLE" Parameters: classification: "json" StorageDescriptor: Columns: - Name: "dish" Type: "string" - Name: "cost" Type: "decimal" - Name: "customer_id" Type: "int" Location: !Ref KinesisStream InputFormat: "org.apache.hadoop.mapred.TextInputFormat" OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" Compressed: false NumberOfBuckets: 0 SerdeInfo: SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe" Parameters: paths: "cost,customer_id,dish" Parameters: streamARN: !GetAtt KinesisStream.Arn typeOfData: "kinesis" StoredAsSubDirectories: false Retention: 0 Name: "source-kinesis-stream-tbl" GlueTable2: Type: "AWS::Glue::Table" Properties: DatabaseName: "my-database" CatalogId: !Ref "AWS::AccountId" TableInput: Owner: "owner" TableType: "EXTERNAL_TABLE" Parameters: CrawlerSchemaDeserializerVersion: "1.0" CrawlerSchemaSerializerVersion: "1.0" UPDATED_BY_CRAWLER: "demo-crawler" averageRecordSize: "24" classification: "dynamodb" compressionType: "none" hashKey: "item" rangeKey: "price" recordCount: "5" sizeKey: "121" typeOfData: "table" StorageDescriptor: Columns: - Name: "item" Type: "string" - Name: "price" Type: "bigint" - Name: "priority" Type: "string" Location: !GetAtt DynamoDBTable.Arn Compressed: false NumberOfBuckets: -1 SerdeInfo: Parameters: {} Parameters: CrawlerSchemaDeserializerVersion: "1.0" CrawlerSchemaSerializerVersion: "1.0" UPDATED_BY_CRAWLER: "demo-crawler" averageRecordSize: "24" classification: "dynamodb" compressionType: "none" hashKey: "item" rangeKey: "price" recordCount: "5" sizeKey: "121" typeOfData: "table" StoredAsSubDirectories: false Retention: 0 Name: "productpriority" GlueJob: Type: "AWS::Glue::Job" Properties: Name: !Join - '-' - - 'demo-glue-job' - !Ref AWS::AccountId Description: "" Role: !Ref GlueRole ExecutionProperty: MaxConcurrentRuns: 1 Command: Name: "gluestreaming" ScriptLocation: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/glue/demo-glue-job.py' PythonVersion: "3" DefaultArguments: --TempDir: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/temporary/' --class: "GlueApp" --enable-continuous-cloudwatch-log: "true" --enable-glue-datacatalog: "true" --enable-metrics: "true" --enable-spark-ui: "true" --job-bookmark-option: "job-bookmark-disable" --job-language: "python" --tgt_s3_bkt: !Ref S3Bucket --spark-event-logs-path: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/sparkHistoryLogs/' MaxRetries: 1 GlueVersion: "3.0" NumberOfWorkers: 2 WorkerType: "G.1X" Outputs: LoggingFunctionName: Value: !Ref LoggingFunction DynamoDBTableName: Value: !Ref DynamoDBTable S3Bucket: Value: !Ref S3Bucket