AWSTemplateFormatVersion: 2010-09-09 Description: Build pipeline to build the amazon-kinesis-analytics-taxi-consumer Flink application Parameters: ArtifactBucket: Description: Bucket containing the generated jar file Type: String FlinkVersion: Description: Flink version to build Type: String Default: 1.11.1 AllowedPattern: \d\.\d{1,3}\.\d{1,3} FlinkScalaVersion: Description: Scala version for Flink Type: String Default: 2.11 AllowedPattern: \d\.\d\d Release: Description: Github branch or release to be used for the consumer application Type: String Default: master Outputs: FlinkApplicationCopyCommand: Description: AWS cli command to copy the kinesis replay jar Value: !Sub aws s3 cp --recursive --exclude '*' --include 'amazon-kinesis-analytics-taxi-consumer-*.jar' 's3://${ArtifactBucket}/' . FlinkApplicationS3Path: Value: amazon-kinesis-analytics-taxi-consumer-1.0-SNAPSHOT.jar Resources: BuildPipeline: Type: AWS::CodePipeline::Pipeline Properties: RoleArn: !GetAtt CodePipelineServiceRole.Arn Stages: - Name: Source Actions: - Name: FlinkKinesisConnectorSourceAction ActionTypeId: Category: Source Owner: AWS Version: "1" Provider: S3 OutputArtifacts: - Name: FlinkKinesisConnector Configuration: S3Bucket: !Ref ArtifactBucket S3ObjectKey: !Sub flink-connector-kinesis-${FlinkVersion}.zip RunOrder: 1 - Name: TaxiConsumerSourceAction ActionTypeId: Category: Source Owner: AWS Version: "1" Provider: S3 OutputArtifacts: - Name: TaxiConsumerSource Configuration: S3Bucket: !Ref ArtifactBucket S3ObjectKey: sources/amazon-kinesis-analytics-taxi-consumer.zip RunOrder: 1 - Name: BuildTaxiConsumer Actions: - Name: BuildTaxiConsumer InputArtifacts: - Name: FlinkKinesisConnector - Name: TaxiConsumerSource OutputArtifacts: - Name: BuildTaxiConsumerOutput ActionTypeId: Category: Build Owner: AWS Version: "1" Provider: CodeBuild Configuration: ProjectName: !Ref TaxiConsumerBuildProject PrimarySource: TaxiConsumerSource RunOrder: 1 - Name: Copy Actions: - Name: CopyTaxiConsumer InputArtifacts: - Name: BuildTaxiConsumerOutput ActionTypeId: Category: Deploy Owner: AWS Version: "1" Provider: S3 Configuration: BucketName: !Ref ArtifactBucket Extract: true RunOrder: 1 - Name: NotifyCloudformation ActionTypeId: Category: Invoke Owner: AWS Version: "1" Provider: Lambda Configuration: FunctionName: !Ref NotifyWaitConditionLambdaFunction RunOrder: 2 ArtifactStore: Type: S3 Location: !Ref ArtifactBucket BuildCompleteWaitCondition: Type: AWS::CloudFormation::WaitCondition Properties: Count: 1 Handle: !Ref BuildCompleteWaitHandle Timeout: "900" BuildCompleteWaitHandle: Type: AWS::CloudFormation::WaitConditionHandle TaxiConsumerBuildProject: Type: AWS::CodeBuild::Project Properties: ServiceRole: !GetAtt CodeBuildServiceRole.Arn Artifacts: Type: CODEPIPELINE Environment: Type: LINUX_CONTAINER ComputeType: BUILD_GENERAL1_LARGE Image: aws/codebuild/java:openjdk-11 Source: Type: CODEPIPELINE BuildSpec: !Sub | version: 0.2 phases: pre_build: commands: - mvn install:install-file -B -Dfile=$CODEBUILD_SRC_DIR_FlinkKinesisConnector/flink-connector-kinesis_${FlinkScalaVersion}-${FlinkVersion}.jar -DpomFile=$CODEBUILD_SRC_DIR_FlinkKinesisConnector/flink-connector-kinesis_${FlinkScalaVersion}-${FlinkVersion}.pom.xml build: commands: - 'cd amazon-kinesis-analytics-taxi-consumer-* || :' - mvn clean package -B -Dflink.version=${FlinkVersion} artifacts: files: - target/amazon-kinesis-analytics-taxi-consumer-*.jar - amazon-kinesis-analytics-taxi-consumer-*/target/amazon-kinesis-analytics-taxi-consumer-*.jar discard-paths: yes TimeoutInMinutes: 5 NotifyWaitConditionLambdaFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: !Sub | import json import boto3 import urllib.request code_pipeline = boto3.client('codepipeline') def handler(event, context): job_id = event['CodePipeline.job']['id'] url = '${BuildCompleteWaitHandle}' headers = { "Content-Type": "" } data = { "Status": "SUCCESS", "Reason": "Compilation Succeeded", "UniqueId": "TaxiConsumerBuildProject", "Data": "Compilation Succeeded" } try: req = urllib.request.Request(url, headers=headers, data=bytes(json.dumps(data), encoding="utf-8"), method='PUT') response = urllib.request.urlopen(req) code_pipeline.put_job_success_result(jobId=job_id) except Exception as e: print("failure: " + str(e)) code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'}) Runtime: python3.7 Timeout: 10 DownloadSources: Type: Custom::DownloadSources Properties: ServiceToken: !GetAtt DownloadSourcesFunction.Arn DownloadSourcesFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: !Sub | import boto3 import cfnresponse from urllib.request import urlopen def handler(event, context): s3 = boto3.client('s3') taxi_consumer_source = urlopen('https://github.com/aws-samples/amazon-kinesis-analytics-taxi-consumer/archive/${Release}.zip') s3.put_object(Bucket='${ArtifactBucket}',Key='sources/amazon-kinesis-analytics-taxi-consumer.zip',Body=taxi_consumer_source.read()) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) Runtime: python3.7 Timeout: 60 CodePipelineServiceRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: | { "Statement": [{ "Effect": "Allow", "Principal": { "Service": [ "codepipeline.amazonaws.com" ]}, "Action": [ "sts:AssumeRole" ] }] } Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:GetObjectVersion - s3:GetBucketVersioning - Resource: - !Sub ${TaxiConsumerBuildProject.Arn} Effect: Allow Action: - codebuild:StartBuild - codebuild:BatchGetBuilds - Resource: !Sub ${NotifyWaitConditionLambdaFunction.Arn} Effect: Allow Action: - lambda:InvokeFunction - lambda:InvokeAsync CodeBuildServiceRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: | { "Statement": [{ "Effect": "Allow", "Principal": { "Service": [ "codebuild.amazonaws.com" ]}, "Action": [ "sts:AssumeRole" ] }] } Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: - arn:aws:logs:*:*:log-group:/aws/codebuild/* - arn:aws:logs:*:*:log-group:/aws/codebuild/*:log-stream:* Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:GetObject - s3:PutObject - s3:GetObjectVersion - s3:ListBucket LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: "*" Effect: Allow Action: - codepipeline:PutJobSuccessResult - codepipeline:PutJobFailureResult - Resource: - arn:aws:logs:*:*:log-group:/aws/lambda/* - arn:aws:logs:*:*:log-group:/aws/lambda/*:log-stream:* Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:PutObject