--- AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Store DynamoDB stream records to S3 and enable search from Athena Globals: Function: AutoPublishAlias: live DeploymentPreference: Type: AllAtOnce Metadata: AWS::ServerlessRepo::Application: Name: dynamodb-history-storer Description: Store DynamoDB stream records to S3 and enable search from Athena Author: Tomoyuki Morita SpdxLicenseId: Apache-2.0 LicenseUrl: LICENSE ReadmeUrl: README.md Labels: ['DynamoDB','Glue','Athena'] HomePageUrl: https://github.com/aws-samples/dynamodb-history-storer SemanticVersion: 0.5.0 SourceCodeUrl: https://github.com/aws-samples/dynamodb-history-storer Parameters: Stage: Description: > This parameter is used as suffix for some resource names. Multiple stack can be installed to one account by putting different stage names. Type: String Default: prod LambdaLogRetentionInDays: Type: Number Default: 14 HistoryRetentionInDays: Type: Number Default: 90 ExtractedAttributes: Description: > Attributes listed in this parameter will be extracted from the NewImage and output to the Glue table. Note it won't be automatically reflected to the Glue table definition, and you need to modify it manually to search from Athena. Type: CommaDelimitedList Default: '' Resources: DynamoDbHistoryStorer: Type: AWS::Serverless::Function Properties: FunctionName: Fn::Sub: 'DynamoDBHistoryStorer-${Stage}' Handler: index.handler Runtime: nodejs14.x Policies: - DynamoDBReadPolicy: TableName: '*' - DynamoDBStreamReadPolicy: TableName: '*' StreamName: '*' - FirehoseWritePolicy: DeliveryStreamName: {Ref: FirehoseStream} CodeUri: .build/pkg/ Description: Timeout: 60 Environment: Variables: FIREHOSE_STREAM_NAME: {Ref: FirehoseStream} STAGE: {Ref: Stage} EXTRACTED_ATTRIBUTES: Fn::Join: - ',' - {Ref: ExtractedAttributes} LambdaLogGroup: Type: AWS::Logs::LogGroup DependsOn: DynamoDbHistoryStorer Properties: RetentionInDays: {Ref: "LambdaLogRetentionInDays"} LogGroupName: Fn::Join: ['', ['/aws/lambda/', {Ref: DynamoDbHistoryStorer}]] LambdaExecutionErrorAlarm: Type: AWS::CloudWatch::Alarm DependsOn: DynamoDbHistoryStorer Properties: AlarmDescription: Alarm when lambda execution continuously failed AlarmName: Fn::Sub: '${DynamoDbHistoryStorer}-Errors' Namespace: AWS/Lambda MetricName: Errors Dimensions: - Name: FunctionName Value: {Ref: DynamoDbHistoryStorer} Statistic: Sum Period: '60' Threshold: '5' EvaluationPeriods: '3' ComparisonOperator: GreaterThanThreshold KinesisFirehoseDeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'firehose.amazonaws.com' Action: - sts:AssumeRole Policies: - PolicyName: PublishToS3 PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: ['s3:*'] Resource: - Fn::Sub: arn:${AWS::Partition}:s3:::${S3Bucket} - Fn::Sub: arn:${AWS::Partition}:s3:::${S3Bucket}/* GlueTableAccessRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'firehose.amazonaws.com' Action: - sts:AssumeRole Policies: - PolicyName: GlueReadOnlyPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:GetTables - glue:GetTable - glue:GetTableVersions Resource: - Fn::Sub: arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:* FirehoseStream: Type: AWS::KinesisFirehose::DeliveryStream DependsOn: [S3Bucket, GlueDatabase, GlueTable] Properties: DeliveryStreamName: Fn::Sub: 'DynamoDBHistoryStream-${Stage}' DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: Fn::GetAtt: [S3Bucket, Arn] RoleARN: Fn::GetAtt: [KinesisFirehoseDeliveryStreamRole, Arn] Prefix: 'data/' BufferingHints: IntervalInSeconds: 60 SizeInMBs: 64 CloudWatchLoggingOptions: Enabled: false CompressionFormat: UNCOMPRESSED DataFormatConversionConfiguration: Enabled: true InputFormatConfiguration: Deserializer: OpenXJsonSerDe: ColumnToJsonKeyMappings: CaseInsensitive: True OutputFormatConfiguration: Serializer: ParquetSerDe: Compression: SNAPPY SchemaConfiguration: CatalogId: {Ref: 'AWS::AccountId'} DatabaseName: {Ref: GlueDatabase} Region: {Ref: 'AWS::Region'} RoleARN: Fn::GetAtt: [GlueTableAccessRole, Arn] TableName: {Ref: GlueTable} VersionId: LATEST GlueDatabase: Type: AWS::Glue::Database Properties: DatabaseInput: Name: Fn::Sub: 'dynamodb_history_storer_${Stage}' Description: "Database with tables to store history of DynamoDB tables" CatalogId: Ref: AWS::AccountId GlueTable: Type: AWS::Glue::Table DependsOn: [GlueDatabase, S3Bucket] Properties: TableInput: Name: dynamodb_history TableType: EXTERNAL_TABLE PartitionKeys: - Name: datehour Type: string StorageDescriptor: StoredAsSubDirectories: false Location: Fn::Sub: 's3://${S3Bucket}/data/' Columns: - Name: TableName Type: string - Name: HashKey Type: string - Name: SortKey Type: string - Name: NewImage Type: string - Name: OldImage Type: string - Name: EventName Type: string - Name: Timestamp Type: bigint - Name: SequenceNumber Type: string InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: true SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Parameters: serialization.format: 1 Parameters: projection.enabled: true projection.datehour.type: date projection.datehour.range: 2018/01/01/00,NOW projection.datehour.format: yyyy/MM/dd/HH projection.datehour.interval: 1 projection.datehour.interval.unit: HOURS storage.location.template: Fn::Sub: 's3://${S3Bucket}/data/${!datehour}' DatabaseName: {Ref: GlueDatabase} CatalogId: Ref: AWS::AccountId S3Bucket: Type: AWS::S3::Bucket Properties: AccessControl: Private LifecycleConfiguration: Rules: - ExpirationInDays: {Ref: HistoryRetentionInDays} Status: Enabled BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ExampleQuery: Type: AWS::Athena::NamedQuery Properties: Database: {Ref: GlueDatabase} Name: DynamoDBHistory-QueryExample Description: Example query for searching history. 'tablename' and 'datehour' condition need to be modified before run it. QueryString: Fn::Sub: | SELECT tablename, eventname, from_unixtime(timestamp) AS at, hashkey, sortkey, newimage FROM ${GlueDatabase}.${GlueTable} WHERE datehour > '2019/01/10/00' AND datehour < '2019/01/10/24' -- AND tablename = '' -- AND eventname = '' -- AND hashkey = '' -- AND sortkey = '' ORDER BY at LIMIT 100; Outputs: StackArn: Value: Ref: AWS::StackId Description: Use this as the stack_arn in your cloud_formation_deployment_stack override. LambdaFunctionName: Value: Ref: DynamoDbHistoryStorer Description: Lambda function name for registering event source LambdaExecutionErrorAlarm: Value: Ref: LambdaExecutionErrorAlarm