AWSTemplateFormatVersion: '2010-09-09' Description: "SatCom Kinesis, Lambda, OpenSearch pipeline Given an input Kinesis Data Stream, run an inline Lambda injecting some anomaly events, and handling geo_point for lat-long mapping. Next we create an OpenSearch domain for visualization" # parameters to pass to the CFN Parameters: SatComAssetsS3Bucket: Type: String Description: Holds helper assets eg Glue python transforms Default: "satcom-pipeline-assets" KdfLambdaZipName: Type: String Description: Name of the KDS lambda zip file Default: "kds-scripts/" OpenSearchAllowedIPs: Description: "Comma-delimited list of IP addresses accessing OpenSearch domain" Type: CommaDelimitedList Default: "" # creation of AWS resources Resources: # Kinesis Data Stream with Lambda transformation # Note - due to # we are using PROVISIONED default mode (lowest common denominator) KdsDataStream: Type: AWS::Kinesis::Stream Properties: RetentionPeriodHours: 24 ShardCount: 2 #StreamModeDetails: # StreamMode: ON_DEMAND Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName", KdsDataStream] ] InboundStreamLambdaFunctionEventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 200 MaximumBatchingWindowInSeconds: 10 Enabled: true EventSourceArn: !GetAtt KdsDataStream.Arn FunctionName: !GetAtt KdsProcessLambdaFunction.Arn StartingPosition: TRIM_HORIZON ProcessLambdaFunctionExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - Action: - sts:AssumeRole ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole - !Sub arn:${AWS::Partition}:iam::aws:policy/AmazonOpenSearchServiceFullAccess KdsProcessLambdaFunction: Type: AWS::Lambda::Function Properties: Description: Lambda function to process/transform KDS records as they come in Handler: lambda_function.lambda_handler Timeout: 60 Role: !GetAtt ProcessLambdaFunctionExecutionRole.Arn Code: S3Bucket: !Ref SatComAssetsS3Bucket S3Key: !Ref KdfLambdaZipName Runtime: python3.9 Environment: Variables: endpoint: !GetAtt SatComOpenSearchServiceDomain.DomainEndpoint Layers: - !Ref KdsLambdaLayer Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName", KdsProcessLambdaFunction] ] KdsLambdaLayer: Type: AWS::Lambda::LayerVersion Properties: CompatibleRuntimes: - python3.8 - python3.9 Content: S3Bucket: !Ref SatComAssetsS3Bucket S3Key: "kds-scripts/" Description: "requests, requests_aws4auth, opensearch-py modules" LayerName: requests_opensearchpy_layer # now create the OpenSearch domain for indexing SatComOpenSearchServiceDomain: Type: AWS::OpenSearchService::Domain Properties: ClusterConfig: InstanceCount: 2 InstanceType: '' AdvancedSecurityOptions: Enabled: false DomainEndpointOptions: EnforceHTTPS: true EBSOptions: EBSEnabled: true VolumeSize: 10 VolumeType: gp3 AccessPolicies: Version: '2012-10-17' Statement: - Effect: 'Allow' Principal: AWS: '*' Action: 'es:*' Resource: !Sub "arn:${AWS::Partition}:es:${AWS::Region}:${AWS::AccountId}:domain/*" Condition: IpAddress: aws:SourceIp: !Ref OpenSearchAllowedIPs Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName", SatComOpenSearchServiceDomain] ] Outputs: InboundStreamName: Value: !Ref KdsDataStream SatComOpenSearchEndpoint: Value: !GetAtt SatComOpenSearchServiceDomain.DomainEndpoint