AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: EventBridge Pipe with Amazon MSK Cluster as the source and Lambda function as target Parameters: MSKCluster: Type: String Description: 'MSK Cluster' MSKTopic: Type: String Description: 'MSK Topic' PrivateSubnetMSKOne: Type: String Description: 'Private Subnet One' PrivateSubnetMSKTwo: Type: String Description: 'Private Subnet Two' Resources: # Role + Permissions for Pipes PipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pipes.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: SourcePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kafka:DescribeCluster - kafka:DescribeClusterV2 - kafka:GetBootstrapBrokers - kafka-cluster:Connect - kafka-cluster:DescribeGroup - kafka-cluster:AlterGroup - kafka-cluster:DescribeTopic - kafka-cluster:ReadData - kafka-cluster:DescribeClusterDynamicConfiguration Resource: !Ref MSKCluster - Effect: Allow Action: - ec2:DescribeNetworkInterfaces - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - ec2:DescribeVpcs Resource: "*" - Effect: Allow Action: - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeNetworkInterfaces Resource: "*" Condition: StringEqualsIfExists: "ec2:SubnetID": - !Ref PrivateSubnetMSKOne - !Ref PrivateSubnetMSKTwo - PolicyName: TargetPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: !GetAtt TargetFunction.Arn - PolicyName: CWLogging PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: "*" TargetFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: InlineCode: | exports.handler = async (event) => { // Iterate through records let msg for (let index in event) { console.log(event[index]) msg = Buffer.from(event[index].value, 'base64').toString() console.log('Message:', msg) } } Handler: index.handler Runtime: nodejs16.x Policies: - AWSLambdaBasicExecutionRole Pipe: Type: AWS::Pipes::Pipe Properties: Name: msk-pipe-lambda Description: "EventBridge Pipe with Amazon MSK Cluster as the source and Lambda function as target" RoleArn: !GetAtt PipeRole.Arn Source: !Ref MSKCluster SourceParameters: ManagedStreamingKafkaParameters: StartingPosition: LATEST BatchSize: 10 TopicName: !Ref MSKTopic Target: !GetAtt TargetFunction.Arn