# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: "2010-09-09" # # Note: # For the Lambda code use the provided kinesis_scaling.zip and place it in the # root of your S3CodeBucket. You can also build your own with the following # steps (using go 1.15.x) or the build script in the repo: # GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o main scale.go # zip kinesis_scaling.zip main # Parameters: S3CodeBucket: Type: String Description: S3 Bucket containing the compiled 'kinesis_scaling.zip' golang Lambda Default: kinesis-scaling-code-bucket S3CodeKey: Type: String Description: S3 filename containing the 'kinesis_scaling.zip' golang Lambda Default: kinesis_scaling.zip KinesisScaleUpThreshold: Type: Number Description: At what total usage should streams scale up? Handles both bytes/sec and records/sec (1.0 = 100%) Default: 0.75 KinesisScaleDownThreshold: Type: Number Description: At what total usage should streams scale down? Handles both bytes/sec and records/sec (1.0 = 100%) Default: 0.25 KinesisScaleDownMinIterAgeMins: Type: Number Description: Do not scale down the stream if it is this many minutes behind. Example 30 minutes (-1 to disable) Default: -1 ScalingDryRun: Type: String Description: If this is set to true, the lambda function will not scale the stream. This flag can be used to monitor how the stack performs without actually scaling the stream. Default: false AllowedValues: - true - false Resources: ############################################################################### # # Auto Scaling Lambda # ############################################################################### AutoScalingLambda: DependsOn: AutoScalingLambdaRole Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${AWS::StackName}-kinesis-scaling' Runtime: go1.x Role: !GetAtt AutoScalingLambdaRole.Arn Handler: main ReservedConcurrentExecutions: 1 # Dont allow multiple copies to run simultaneously MemorySize: 512 Timeout: 900 Code: S3Bucket: !Ref S3CodeBucket S3Key: !Ref S3CodeKey Environment: Variables: THROTTLE_RETRY_MIN_SLEEP: 1 THROTTLE_RETRY_MAX_SLEEP: 3 THROTTLE_RETRY_COUNT: 30 SCALE_PERIOD_MINS: 5 # kinesis_period_mins: cloudwatch datapoint period in minutes (default is 5, changing this is not recommended) SCALE_UP_THRESHOLD: !Ref KinesisScaleUpThreshold SCALE_UP_EVALUATION_PERIOD: 5 # 25 Minutes / kinesis_period_mins. Evaluate scale up for 25 minutes. SCALE_UP_DATAPOINTS_REQUIRED: 5 # 25 Minutes / kinesis_period_mins. 5 out of 5 datapoints required. Takes 25 minutes above threshold to scale up. SCALE_DOWN_THRESHOLD: !Ref KinesisScaleDownThreshold SCALE_DOWN_EVALUATION_PERIOD: 60 # 300 Minutes / kinesis_period_mins. Evaluate scale down for 300 minutes. SCALE_DOWN_DATAPOINTS_REQUIRED: 57 # 285 Minutes / kinesis_period_mins. 57 out of 60 datapoints required. Takes 285 minutes below threshold to scale down. SCALE_DOWN_MIN_ITER_AGE_MINS: !Ref KinesisScaleDownMinIterAgeMins PROCESSING_LAMBDA_ARN: "" # Empty = No Op, or if scaling a single stream, set this to the lambda consumer arn to automatically reserve concurrency as the stream scales PROCESSING_LAMBDAS_PER_SHARD: 5 # Lambdas per shard used by the above processing lambda DRY_RUN: !Ref ScalingDryRun # Dry run flag (true or false) indicating whether streams will be scaling or not. AutoScalingLambdaAsyncConfig: Type: AWS::Lambda::EventInvokeConfig DependsOn: AutoScalingLambda Properties: FunctionName: !Sub '${AWS::StackName}-kinesis-scaling' MaximumRetryAttempts: 0 # We do not want any retries of the scaling function if it errors out, alarms will re-trigger it Qualifier: $LATEST # # Auto Scaling Lambda IAM Role # AutoScalingLambdaRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: AllowCreateCloudWatchAlarms PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 'cloudwatch:DescribeAlarms' - 'cloudwatch:GetMetricData' - 'cloudwatch:ListMetrics' - 'cloudwatch:PutMetricAlarm' - 'cloudwatch:PutMetricData' - 'cloudwatch:ListTagsForResource' - 'cloudwatch:SetAlarmState' - 'cloudwatch:TagResource' Resource: - !Sub 'arn:aws:cloudwatch:${AWS::Region}:${AWS::AccountId}:alarm:${AWS::StackName}-AutoScalingStream*' - PolicyName: AllowLoggingToCloudWatch PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: - !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${AWS::StackName}-kinesis-scaling:*' - PolicyName: AllowReadFromKinesis PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 'kinesis:DescribeStreamSummary' - 'kinesis:AddTagsToStream' - 'kinesis:ListTagsForStream' - 'kinesis:UpdateShardCount' Resource: - !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${AWS::StackName}-AutoScalingStream*' - PolicyName: AllowPublishToSNS PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 'sns:Publish' Resource: - !Sub 'arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}-kinesis-scaling-topic' - PolicyName: AllowChangeFunctionConcurrencyForLambda PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 'lambda:PutFunctionConcurrency' - 'lambda:DeleteFunctionConcurrency' Resource: - !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-your-kinesis-consumer-function-name-here' ############################################################################### # # Auto Scaling SNS Topic # ############################################################################### AutoScalingSNSTopic: Type: AWS::SNS::Topic DependsOn: AutoScalingLambda Properties: Subscription: - Endpoint: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-kinesis-scaling' Protocol: "lambda" TopicName: !Sub '${AWS::StackName}-kinesis-scaling-topic' AutoScalingSNSTopicLambdaPerm: Type: AWS::Lambda::Permission Properties: Action: 'lambda:InvokeFunction' FunctionName: !Sub '${AWS::StackName}-kinesis-scaling' Principal: "sns.amazonaws.com" SourceArn: !Ref AutoScalingSNSTopic ############################################################################### # # Kinesis Data Stream 01 # ############################################################################### AutoScalingStream01: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 # # Kinesis Data Stream 01 - Scale Up Alarm # AutoScalingStream01ScaleUpAlarm: Type: AWS::CloudWatch::Alarm DependsOn: [ AutoScalingStream01, AutoScalingSNSTopic ] Properties: AlarmName: !Join ['', [!Ref AutoScalingStream01, '-scale-up']] AlarmDescription: 'Stream throughput has gone above the scale up threshold' ComparisonOperator: GreaterThanOrEqualToThreshold Threshold: !Ref KinesisScaleUpThreshold EvaluationPeriods: 5 # 25 Minutes / kinesis_period_mins. Evaluate scale up for 25 minutes. DatapointsToAlarm: 5 # 25 Minutes / kinesis_period_mins. 5 out of 5 datapoints required. Takes 25 minutes above threshold to scale up. AlarmActions: [ !Ref AutoScalingSNSTopic ] Metrics: - Id: s1 ReturnData: False Label: ShardCount Expression: 1 - Id: m1 ReturnData: False Label: IncomingBytes MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingBytes Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream01 - Id: m2 ReturnData: False Label: IncomingRecords MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingRecords Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream01 - Id: e1 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingBytes Expression: FILL(m1,0) - Id: e2 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingRecords Expression: FILL(m2,0) - Id: e3 ReturnData: False Label: IncomingBytesUsageFactor Expression: e1/(1024*1024*60*5*s1) # e1/(1024*1024*60*${kinesis_period_mins}*s1) - Id: e4 ReturnData: False Label: IncomingRecordsUsageFactor Expression: e2/(1000*60*5*s1) # e2/(1000*60*${kinesis_period_mins}*s1) - Id: e5 ReturnData: True Label: MaxIncomingUsageFactor Expression: MAX([e3,e4]) # Take the highest usage factor between bytes/sec and records/sec # # Kinesis Data Stream 02 - Scale Down Alarm # AutoScalingStream01ScaleDownAlarm: Type: AWS::CloudWatch::Alarm DependsOn: [ AutoScalingStream01, AutoScalingSNSTopic ] Properties: AlarmName: !Join ['', [!Ref AutoScalingStream01, '-scale-down']] AlarmDescription: 'Stream throughput has gone below the scale down threshold' ComparisonOperator: LessThanThreshold Threshold: -1 # Set to -1 for 1 shard stream, as we can't scale down from here. Gets updated by the auto scaling lambda later EvaluationPeriods: 60 # 300 Minutes / kinesis_period_mins. Evaluate scale down for 300 minutes. DatapointsToAlarm: 57 # 285 Minutes / kinesis_period_mins. 57 out of 60 datapoints required. Takes 285 minutes below threshold to scale down. AlarmActions: [ !Ref AutoScalingSNSTopic ] Metrics: - Id: s1 ReturnData: False Label: ShardCount Expression: 1 - Id: s2 ReturnData: False Label: IteratorAgeMinutesToBlockScaledowns Expression: !Ref KinesisScaleDownMinIterAgeMins - Id: m1 ReturnData: False Label: IncomingBytes MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingBytes Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream01 - Id: m2 ReturnData: False Label: IncomingRecords MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingRecords Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream01 - Id: m3 ReturnData: False Label: GetRecords.IteratorAgeMilliseconds MetricStat: Stat: Maximum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: GetRecords.IteratorAgeMilliseconds Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream01 - Id: e1 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingBytes Expression: FILL(m1,0) - Id: e2 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingRecords Expression: FILL(m2,0) - Id: e3 ReturnData: False Label: IncomingBytesUsageFactor Expression: e1/(1024*1024*60*5*s1) # e1/(1024*1024*60*${kinesis_period_mins}*s1) - Id: e4 ReturnData: False Label: IncomingRecordsUsageFactor Expression: e2/(1000*60*5*s1) # e2/(1000*60*${kinesis_period_mins}*s1) - Id: e5 ReturnData: False Label: IteratorAgeAdjustedFactor Expression: !Sub (FILL(m3,0)/1000/60)*(${KinesisScaleDownThreshold}/s2) # We want to block scaledowns when IterAge is > 30 mins, multiply IterAge so 30 mins = - Id: e6 ReturnData: True Label: MaxIncomingUsageFactor Expression: MAX([e3,e4,e5]) # Take the highest usage factor between bytes/sec, records/sec, and adjusted iterator age ############################################################################### # # Kinesis Data Stream 02 # ############################################################################### AutoScalingStream02: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 # # Kinesis Data Stream 02 - Scale Up Alarm # AutoScalingStream02ScaleUpAlarm: Type: AWS::CloudWatch::Alarm DependsOn: [ AutoScalingStream02, AutoScalingSNSTopic ] Properties: AlarmName: !Join ['', [!Ref AutoScalingStream02, '-scale-up']] AlarmDescription: 'Stream throughput has gone above the scale up threshold' ComparisonOperator: GreaterThanOrEqualToThreshold Threshold: !Ref KinesisScaleUpThreshold EvaluationPeriods: 5 # 25 Minutes / kinesis_period_mins. Evaluate scale up for 25 minutes. DatapointsToAlarm: 5 # 25 Minutes / kinesis_period_mins. 5 out of 5 datapoints required. Takes 25 minutes above threshold to scale up. AlarmActions: [ !Ref AutoScalingSNSTopic ] Metrics: - Id: s1 ReturnData: False Label: ShardCount Expression: 1 - Id: m1 ReturnData: False Label: IncomingBytes MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingBytes Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream02 - Id: m2 ReturnData: False Label: IncomingRecords MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingRecords Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream02 - Id: e1 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingBytes Expression: FILL(m1,0) - Id: e2 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingRecords Expression: FILL(m2,0) - Id: e3 ReturnData: False Label: IncomingBytesUsageFactor Expression: e1/(1024*1024*60*5*s1) # e1/(1024*1024*60*${kinesis_period_mins}*s1) - Id: e4 ReturnData: False Label: IncomingRecordsUsageFactor Expression: e2/(1000*60*5*s1) # e2/(1000*60*${kinesis_period_mins}*s1) - Id: e5 ReturnData: True Label: MaxIncomingUsageFactor Expression: MAX([e3,e4]) # Take the highest usage factor between bytes/sec and records/sec # # Kinesis Data Stream 02 - Scale Down Alarm # AutoScalingStream02ScaleDownAlarm: Type: AWS::CloudWatch::Alarm DependsOn: [ AutoScalingStream02, AutoScalingSNSTopic ] Properties: AlarmName: !Join ['', [!Ref AutoScalingStream02, '-scale-down']] AlarmDescription: 'Stream throughput has gone below the scale down threshold' ComparisonOperator: LessThanThreshold Threshold: -1 # Set to -1 for 1 shard stream, as we can't scale down from here. Gets updated by the auto scaling lambda later EvaluationPeriods: 60 # 300 Minutes / kinesis_period_mins. Evaluate scale down for 300 minutes. DatapointsToAlarm: 57 # 285 Minutes / kinesis_period_mins. 57 out of 60 datapoints required. Takes 285 minutes below threshold to scale down. AlarmActions: [ !Ref AutoScalingSNSTopic ] Metrics: - Id: s1 ReturnData: False Label: ShardCount Expression: 1 - Id: s2 ReturnData: False Label: IteratorAgeMinutesToBlockScaledowns Expression: !Ref KinesisScaleDownMinIterAgeMins - Id: m1 ReturnData: False Label: IncomingBytes MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingBytes Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream02 - Id: m2 ReturnData: False Label: IncomingRecords MetricStat: Stat: Sum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: IncomingRecords Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream02 - Id: m3 ReturnData: False Label: GetRecords.IteratorAgeMilliseconds MetricStat: Stat: Maximum Period: 300 # Must match kinesis_period_mins: 5 minutes. Convert to seconds. 300 seconds. Metric: MetricName: GetRecords.IteratorAgeMilliseconds Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref AutoScalingStream02 - Id: e1 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingBytes Expression: FILL(m1,0) - Id: e2 ReturnData: False Label: FillMissingDataPointsWithZeroForIncomingRecords Expression: FILL(m2,0) - Id: e3 ReturnData: False Label: IncomingBytesUsageFactor Expression: e1/(1024*1024*60*5*s1) # e1/(1024*1024*60*${kinesis_period_mins}*s1) - Id: e4 ReturnData: False Label: IncomingRecordsUsageFactor Expression: e2/(1000*60*5*s1) # e2/(1000*60*${kinesis_period_mins}*s1) - Id: e5 ReturnData: False Label: IteratorAgeAdjustedFactor Expression: !Sub (FILL(m3,0)/1000/60)*(${KinesisScaleDownThreshold}/s2) # We want to block scaledowns when IterAge is > 30 mins, multiply IterAge so 30 mins = - Id: e6 ReturnData: True Label: MaxIncomingUsageFactor Expression: MAX([e3,e4,e5]) # Take the highest usage factor between bytes/sec, records/sec, and adjusted iterator age