# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: "2010-09-09" Description: "Tennessee Eastman Process Analytics with Apache Flink and AWS RandomCutForest" Parameters: GitRepositoryUrl: Description: The blog Github repository URL Type: String Default: 'https://github.com/aws-samples/flink-industrial-anomaly-detector.git' TimeStreamIngestBatchSize: Type: Number Default: 50 MinValue: 10 MaxValue: 200 Description: Timestream Ingestion batch size RcfShingleSize: Type: Number Default: 1 MinValue: 1 MaxValue: 1000 Description: The number of points to store in a shingle RcfShingleCyclic: Type: String Default: "false" AllowedValues: - true - false Description: If true, the shingle will use cyclic updates RcfNumberOfTrees: Type: Number Default: 50 MinValue: 50 MaxValue: 100 Description: The number of trees in this forest RcfSampleSize: Type: Number Default: 256 MinValue: 100 MaxValue: 10000 Description: The sample size used by stream samplers in this forest RcfLambda: Type: Number Default: 0.000390625 MinValue: 0.000001 MaxValue: 0.1 Description: The decay factor used by stream samplers in this forest. Defaults to `1 / (10 * sampleSize)` - https://github.com/aws/random-cut-forest-by-aws/tree/main/Java RcfRandomSeed: Type: Number Default: 42 MinValue: 10 MaxValue: 100 Description: A seed value used to initialize the random number generators in this forest. Mappings: SourceCode: General: RawCSVPrefix: 'tep-raw-csv/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/' Resources: Cloud9Env: Type: AWS::Cloud9::EnvironmentEC2 Properties: InstanceType: t3.large ImageId: amazonlinux-2-x86_64 #OwnerArn: 'arn:aws:sts:::assumed-role/' Name: FlinkAnomalyTestEnv Repositories: - PathComponent: /flinkAnomalySteps RepositoryUrl: !Sub ${GitRepositoryUrl} RawMetricsBucket: DeletionPolicy: Retain Type: AWS::S3::Bucket Properties: PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: "S3 access logging not required in this example." - id: W41 reason: "simulation data in this bucket is for demo purposes only - encryption not needed." - id: W51 reason: "simulation data in this bucket is for demo purposes only - bucket policy not required." KinesisSourceStream: Type: AWS::Kinesis::Stream Properties: Name: tep-ingest RetentionPeriodHours: 24 ShardCount: 2 Metadata: cfn_nag: rules_to_suppress: - id: W28 reason: "DataStream holding test/simulation data only for demo purposes - explicit stream naming chosen bcse. of hard-coded C++ client program." - id: W49 reason: "DataStream holding test/simulation data only for demo purposes - encryption not needed." TepDb: Type: AWS::Timestream::Database Properties: DatabaseName: kdaflink TepData: Type: AWS::Timestream::Table Properties: DatabaseName: !Ref TepDb TableName: kinesisdata1 RetentionProperties: MemoryStoreRetentionPeriodInHours: "24" MagneticStoreRetentionPeriodInDays: "7" RawMetricsCSVDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: ExtendedS3DestinationConfiguration: BucketARN: !GetAtt RawMetricsBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 10 CompressionFormat: 'UNCOMPRESSED' EncryptionConfiguration: NoEncryptionConfig: 'NoEncryption' Prefix: !FindInMap - SourceCode - General - RawCSVPrefix ErrorOutputPrefix: errors/csv/ RoleARN: !GetAtt RawMetricsDeliveryStreamRole.Arn ProcessingConfiguration: Enabled: 'true' Processors: - Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt FirehoseCsvHeaderLambda.Arn Type: Lambda Metadata: cfn_nag: rules_to_suppress: - id: W88 reason: "Raw metrics processed here are simulation and demo data - no server side encryption (SSE) needed." FirehoseCsvHeaderLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.8 Handler: index.lambda_handler MemorySize: 128 Timeout: 3 Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import json import boto3 import base64 def lambda_handler(event, context): output = [] for record in event['records']: output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': record['data'] } output.append(output_record) record0 = base64.b64decode(output[0]['data']).decode('utf-8') record0 = ('faultNumber,simulationRun,sample,xmeas_1,xmeas_2,xmeas_3,xmeas_4,xmeas_5,xmeas_6,xmeas_7,xmeas_8,' +'xmeas_9,xmeas_10,xmeas_11,xmeas_12,xmeas_13,xmeas_14,xmeas_15,xmeas_16,xmeas_17,xmeas_18,' +'xmeas_19,xmeas_20,xmeas_21,xmeas_22,xmeas_23,xmeas_24,xmeas_25,xmeas_26,xmeas_27,xmeas_28,' +'xmeas_29,xmeas_30,xmeas_31,xmeas_32,xmeas_33,xmeas_34,xmeas_35,xmeas_36,xmeas_37,xmeas_38,' +'xmeas_39,xmeas_40,xmeas_41,' +'xmv_1,xmv_2,xmv_3,xmv_4,xmv_5,xmv_6,xmv_7,xmv_8,xmv_9,xmv_10,xmv_11' +'\n'+record0+'\n') output[0]['data'] = base64.b64encode(record0.encode('utf-8')) print('Processed {} records.'.format(len(event['records']))) return {'records': output} Metadata: cfn_nag: rules_to_suppress: - id: W58 reason: "Cloudwatch logging permissions handled in `LambdaRole` IAM role." - id: W89 reason: "Stack for demo purposes - VPC not used." - id: W92 reason: "Lambda data processing for demo purposes - no ReservedConcurrentExecutions setting needed." FlinkRCFApp: Type: AWS::KinesisAnalyticsV2::Application Properties: RuntimeEnvironment: FLINK-1_13 ServiceExecutionRole: !Sub ${KinesisAnalyticsServiceRole.Arn} ApplicationConfiguration: ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: __S3_FLINK_APP_BUCKET_ARN__ FileKey: __FLINK_APPLICATION_S3_PATH__ CodeContentType: ZIPFILE EnvironmentProperties: PropertyGroups: - PropertyGroupId: FlinkApplicationProperties PropertyMap: InputStreamName: !Ref KinesisSourceStream SideCSVDeliveryStreamName: !Ref RawMetricsCSVDeliveryStream TimeStreamRegion: !Sub ${AWS::Region} TimeStreamDbName: !Ref TepDb TimeStreamTableName: !Sub ${TepData.Name} TimeStreamIngestBatchSize: !Sub ${TimeStreamIngestBatchSize} KinesisRegion: !Sub ${AWS::Region} RcfShingleSize: !Sub ${RcfShingleSize} RcfShingleCyclic: !Sub ${RcfShingleCyclic} RcfNumberOfTrees: !Sub ${RcfNumberOfTrees} RcfSampleSize: !Sub ${RcfSampleSize} RcfLambda: !Sub ${RcfLambda} RcfRandomSeed: !Sub ${RcfRandomSeed} FlinkApplicationConfiguration: MonitoringConfiguration: ConfigurationType: CUSTOM LogLevel: INFO MetricsLevel: TASK ApplicationSnapshotConfiguration: SnapshotsEnabled: true KinesisAnalyticsLogging: Type: "AWS::KinesisAnalyticsV2::ApplicationCloudWatchLoggingOption" Properties: ApplicationName: !Sub ${FlinkRCFApp} CloudWatchLoggingOption: LogStreamARN: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${FlinkLogGroup}:log-stream:${FlinkLogStream} FlinkLogGroup: Type: AWS::Logs::LogGroup Properties: RetentionInDays: 7 Metadata: cfn_nag: rules_to_suppress: - id: W84 reason: "Flink App handles simulation/demo data - no encryption or KMS key required." FlinkLogStream: Type: AWS::Logs::LogStream Properties: LogGroupName: !Ref FlinkLogGroup Cloud9EnvRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: | { "Statement": [{ "Effect": "Allow", "Principal": { "Service": [ "ec2.amazonaws.com" ]}, "Action": [ "sts:AssumeRole" ] }] } Path: / Policies: - PolicyName: Cloud9EnvPermissions PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:*", "Resource": "*" }, { "Effect": "Allow", "Action": "kinesis:*", "Resource": "*" }, { "Effect": "Allow", "Action": "timestream:*", "Resource": "*" }, { "Effect": "Allow", "Action": "firehose:*", "Resource": "${RawMetricsCSVDeliveryStream.Arn}" }, { "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" ] }, { "Effect": "Allow", "Action": [ "logs:DescribeLogStreams", "logs:PutLogEvents" ], "Resource": [ "${FlinkLogGroup.Arn}" ] } ] } Cloud9InstanceProfile: Type: "AWS::IAM::InstanceProfile" Properties: Path: "/" Roles: - !Ref Cloud9EnvRole LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: | { "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Action": [ "sts:AssumeRole" ] } ] } Path: / Policies: - PolicyName: 'LambdaExecutionPolicy' PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:*" ] } ] } RawMetricsDeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: | { "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "firehose.amazonaws.com" ] }, "Action": [ "sts:AssumeRole" ] } ] } Path: / Policies: # Puts objects in RawMetricsBucket - PolicyName: 'RawMetricsS3UploadPolicy' PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:ListBucketMultipartUploads" ], "Resource": [ "${RawMetricsBucket.Arn}", "${RawMetricsBucket.Arn}/", "${RawMetricsBucket.Arn}/*" ] }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": [ "${FirehoseCsvHeaderLambda.Arn}*" ] } ] } KinesisAnalyticsServiceRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: | { "Statement": [{ "Effect": "Allow", "Principal": { "Service": [ "kinesisanalytics.amazonaws.com" ]}, "Action": [ "sts:AssumeRole" ] }] } Policies: - PolicyName: root PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "__S3_FLINK_APP_BUCKET_ARN__/*" ] }, { "Effect": "Allow", "Action": "kinesis:*", "Resource": "*" }, { "Effect": "Allow", "Action": "timestream:*", "Resource": "*" }, { "Effect": "Allow", "Action": "firehose:*", "Resource": "${RawMetricsCSVDeliveryStream.Arn}" }, { "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" ] }, { "Effect": "Allow", "Action": [ "logs:DescribeLogStreams", "logs:PutLogEvents" ], "Resource": [ "${FlinkLogGroup.Arn}" ] } ] } Outputs: 01IAMCloud9Profile: Description: Created IAM Instance profile holding IAM role for attaching to cloud9 EC2 instance Value: !Ref Cloud9InstanceProfile 02EC2Cloud9: Description: Created Cloud9 EC2 Instance Value: Fn::Join: - '' - - "aws-cloud9-" - !Sub ${Cloud9Env.Name} - "-" - !Ref Cloud9Env 03Cloud9EnvUrl: Description: Cloud9 Environment URL Value: Fn::Join: - '' - - "https://" - !Ref AWS::Region - ".console.aws.amazon.com/cloud9/ide/" - !Ref Cloud9Env - "?region=" - !Ref AWS::Region 04FlinkApp: Description: Flink app Name Value: !Ref FlinkRCFApp 05KinesisStream: Description: TEP simulation ingestion stream Value: !Ref KinesisSourceStream 06FLinkAppStartCmd: Description: aws cli command to start deployed Flink Application Value: Fn::Join: - '' - - "aws kinesisanalyticsv2 start-application --application-name" - " " - !Ref FlinkRCFApp - " " - "--run-configuration FlinkRunConfiguration={AllowNonRestoredState=true}" 07TesimDockerImageS3Url: Description: tar.gz of tesim simulation docker image Value: __TESIM_DOCKER_IMAGE_S3_URL__