AWSTemplateFormatVersion: 2010-09-09

Description: Build pipeline to build the amazon-kinesis-analytics-taxi-consumer Flink application

Parameters:
  ArtifactBucket:
    Description: Bucket containing the generated jar file
    Type: String

  FlinkVersion:
    Description: Flink version to build
    Type: String
    Default: 1.11.1
    AllowedPattern: \d\.\d{1,3}\.\d{1,3}

  FlinkScalaVersion:
    Description: Scala version for Flink
    Type: String
    Default: 2.11
    AllowedPattern: \d\.\d\d

  Release:
    Description: Github branch or release to be used for the consumer application
    Type: String
    Default: master


Outputs:
  FlinkApplicationCopyCommand:
    Description: AWS cli command to copy the kinesis replay jar
    Value: !Sub aws s3 cp --recursive --exclude '*' --include 'amazon-kinesis-analytics-taxi-consumer-*.jar' 's3://${ArtifactBucket}/' .

  FlinkApplicationS3Path:
    Value: amazon-kinesis-analytics-taxi-consumer-1.0-SNAPSHOT.jar


Resources:
  BuildPipeline:
    Type: AWS::CodePipeline::Pipeline
    Properties:
      RoleArn: !GetAtt CodePipelineServiceRole.Arn
      Stages:
        - Name: Source
          Actions:
            - Name: FlinkKinesisConnectorSourceAction
              ActionTypeId:
                Category: Source
                Owner: AWS
                Version: "1"
                Provider: S3
              OutputArtifacts:
                - Name: FlinkKinesisConnector
              Configuration:
                S3Bucket: !Ref ArtifactBucket
                S3ObjectKey: !Sub flink-connector-kinesis-${FlinkVersion}.zip
              RunOrder: 1          
            - Name: TaxiConsumerSourceAction
              ActionTypeId:
                Category: Source
                Owner: AWS
                Version: "1"
                Provider: S3
              OutputArtifacts:
                - Name: TaxiConsumerSource
              Configuration:
                S3Bucket: !Ref ArtifactBucket
                S3ObjectKey: sources/amazon-kinesis-analytics-taxi-consumer.zip
              RunOrder: 1
        - Name: BuildTaxiConsumer
          Actions:
            - Name: BuildTaxiConsumer
              InputArtifacts:
                - Name: FlinkKinesisConnector
                - Name: TaxiConsumerSource
              OutputArtifacts:
                - Name: BuildTaxiConsumerOutput
              ActionTypeId:
                Category: Build
                Owner: AWS
                Version: "1"
                Provider: CodeBuild
              Configuration:
                ProjectName: !Ref TaxiConsumerBuildProject
                PrimarySource: TaxiConsumerSource
              RunOrder: 1
        - Name: Copy
          Actions:
            - Name: CopyTaxiConsumer
              InputArtifacts:
                - Name: BuildTaxiConsumerOutput
              ActionTypeId:
                Category: Deploy
                Owner: AWS
                Version: "1"
                Provider: S3
              Configuration:
                BucketName: !Ref ArtifactBucket
                Extract: true
              RunOrder: 1
            - Name: NotifyCloudformation
              ActionTypeId:
                Category: Invoke
                Owner: AWS
                Version: "1"
                Provider: Lambda
              Configuration:
                FunctionName: !Ref NotifyWaitConditionLambdaFunction
              RunOrder: 2
      ArtifactStore:
        Type: S3
        Location: !Ref ArtifactBucket


  BuildCompleteWaitCondition:
    Type: AWS::CloudFormation::WaitCondition
    Properties: 
      Count: 1
      Handle: !Ref BuildCompleteWaitHandle
      Timeout: "900"

  BuildCompleteWaitHandle: 
    Type: AWS::CloudFormation::WaitConditionHandle 


  TaxiConsumerBuildProject:
    Type: AWS::CodeBuild::Project
    Properties:
      ServiceRole: !GetAtt CodeBuildServiceRole.Arn
      Artifacts:
        Type: CODEPIPELINE
      Environment:
        Type: LINUX_CONTAINER
        ComputeType: BUILD_GENERAL1_LARGE
        Image: aws/codebuild/java:openjdk-11
      Source:
        Type: CODEPIPELINE
        BuildSpec: !Sub |
          version: 0.2

          phases:
            pre_build:
              commands:
                - mvn install:install-file -B -Dfile=$CODEBUILD_SRC_DIR_FlinkKinesisConnector/flink-connector-kinesis_${FlinkScalaVersion}-${FlinkVersion}.jar -DpomFile=$CODEBUILD_SRC_DIR_FlinkKinesisConnector/flink-connector-kinesis_${FlinkScalaVersion}-${FlinkVersion}.pom.xml
            build:
              commands:
                - 'cd amazon-kinesis-analytics-taxi-consumer-* || :'
                - mvn clean package -B -Dflink.version=${FlinkVersion}

          artifacts:
            files:
              - target/amazon-kinesis-analytics-taxi-consumer-*.jar
              - amazon-kinesis-analytics-taxi-consumer-*/target/amazon-kinesis-analytics-taxi-consumer-*.jar
            discard-paths: yes
      TimeoutInMinutes: 5


  NotifyWaitConditionLambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: !Sub |
          import json
          import boto3
          import urllib.request

          code_pipeline = boto3.client('codepipeline')

          def handler(event, context):
            job_id = event['CodePipeline.job']['id']

            url = '${BuildCompleteWaitHandle}'
            headers = { "Content-Type": "" }
            data = { "Status": "SUCCESS", "Reason": "Compilation Succeeded", "UniqueId": "TaxiConsumerBuildProject", "Data": "Compilation Succeeded" }

            try:
              req = urllib.request.Request(url, headers=headers, data=bytes(json.dumps(data), encoding="utf-8"), method='PUT')
              response = urllib.request.urlopen(req)

              code_pipeline.put_job_success_result(jobId=job_id)
            except Exception as e:
              print("failure: " + str(e))
              code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'})

      Runtime: python3.7
      Timeout: 10


  DownloadSources:
    Type: Custom::DownloadSources
    Properties:
      ServiceToken: !GetAtt DownloadSourcesFunction.Arn

  DownloadSourcesFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: !Sub |
          import boto3
          import cfnresponse
          from urllib.request import urlopen

          def handler(event, context):
            s3 = boto3.client('s3')

            taxi_consumer_source = urlopen('https://github.com/aws-samples/amazon-kinesis-analytics-taxi-consumer/archive/${Release}.zip')
            s3.put_object(Bucket='${ArtifactBucket}',Key='sources/amazon-kinesis-analytics-taxi-consumer.zip',Body=taxi_consumer_source.read())

            cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
      Runtime: python3.7
      Timeout: 60


  CodePipelineServiceRole:
    Type: AWS::IAM::Role
    Properties:
      Path: /
      AssumeRolePolicyDocument: |
        {
            "Statement": [{
                "Effect": "Allow",
                "Principal": { "Service": [ "codepipeline.amazonaws.com" ]},
                "Action": [ "sts:AssumeRole" ]
            }]
        }
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Resource:
                  - !Sub arn:aws:s3:::${ArtifactBucket}
                  - !Sub arn:aws:s3:::${ArtifactBucket}/*
                Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:GetObject
                  - s3:GetObjectVersion
                  - s3:GetBucketVersioning
              - Resource: 
                  - !Sub ${TaxiConsumerBuildProject.Arn}
                Effect: Allow
                Action:
                  - codebuild:StartBuild
                  - codebuild:BatchGetBuilds
              - Resource: !Sub ${NotifyWaitConditionLambdaFunction.Arn}
                Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:InvokeAsync


  CodeBuildServiceRole:
    Type: AWS::IAM::Role
    Properties:
      Path: /
      AssumeRolePolicyDocument: |
        {
            "Statement": [{
                "Effect": "Allow",
                "Principal": { "Service": [ "codebuild.amazonaws.com" ]},
                "Action": [ "sts:AssumeRole" ]
            }]
        }
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Resource: 
                  - arn:aws:logs:*:*:log-group:/aws/codebuild/*
                  - arn:aws:logs:*:*:log-group:/aws/codebuild/*:log-stream:*
                Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
              - Resource: 
                  - !Sub arn:aws:s3:::${ArtifactBucket}
                  - !Sub arn:aws:s3:::${ArtifactBucket}/*
                Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                  - s3:GetObjectVersion
                  - s3:ListBucket


  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: /
      Policies:
      - PolicyName: root
        PolicyDocument:
          Version: 2012-10-17
          Statement:
          - Resource: "*"
            Effect: Allow
            Action:
              - codepipeline:PutJobSuccessResult
              - codepipeline:PutJobFailureResult
          - Resource:
              - arn:aws:logs:*:*:log-group:/aws/lambda/*
              - arn:aws:logs:*:*:log-group:/aws/lambda/*:log-stream:*
            Effect: Allow
            Action:
              - logs:CreateLogGroup
              - logs:CreateLogStream
              - logs:PutLogEvents
          - Resource: 
              - !Sub arn:aws:s3:::${ArtifactBucket}
              - !Sub arn:aws:s3:::${ArtifactBucket}/*
            Effect: Allow
            Action:
              - s3:PutObject