AWSTemplateFormatVersion: '2010-09-09' Description: "SatCom Firehose, Lambda, S3, Glue Analytics pipeline Given an input Kinesis Firehose Delivery Stream, the data lands in S3 with an inline Lambda injecting some anomaly events. A Glue Crawler runs across the partitioned data to get the schema. Next a Glue job transforms the data, and dumps the results in a Results S3 bucket for Analytics use NOTE - create 2 buckets (1 input, 1 output) in advance - pls specify bucketnames with NO underscores, NO dashes to avoid downstream Glue issues" # parameters to pass to the CFN Parameters: InputS3Bucket: Type: String Description: Input data from the Kinesis Firehose Data Generator. Create this in advance Default: "satcomkdfbucket" ConstraintDescription: Pls specify input bucket with NO underscores, NO dashes to avoid downstream Glue issues OutputS3Bucket: Type: String Description: Output data post Glue transforms. Create this in advance Default: "satcomkdfbucketresults" ConstraintDescription: Pls specify input bucket with NO underscores, NO dashes to avoid downstream Glue issues SatComAssetsS3Bucket: Type: String Description: Holds helper assets eg Glue python transforms Default: "satcom-pipeline-assets" KdfLambdaZipName: Type: String Description: Name of the KDF lambda zip file Default: "kdf-scripts/satcom-wshop-kdf-lambda-py.zip" # creation of AWS resources Resources: # Roles and policies for the Kinesis Firehose Data Stream FirehoseDeliveryRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' FirehoseDeliveryPolicy: Type: AWS::IAM::Policy Properties: PolicyName: satcom_firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Join - '' - - 'arn:' - !Ref "AWS::Partition" - ':s3:::' - !Ref InputS3Bucket - !Join - '' - - 'arn:' - !Ref "AWS::Partition" - ':s3:::' - !Ref InputS3Bucket - '/*' Roles: - !Ref FirehoseDeliveryRole InvokeLambdaPolicy: Type: AWS::IAM::Policy Properties: PolicyName: satcom_firehose_lambda_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: - !GetAtt FirehoseProcessLambdaFunction.Arn Roles: - !Ref FirehoseDeliveryRole # Delivery Stream with Lambda transformation SatComFirehoseDelivStrm: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: satcom-kdf DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:' - !Ref "AWS::Partition" - ':s3:::' - !Ref InputS3Bucket BufferingHints: SizeInMBs: 1 IntervalInSeconds: 60 CloudWatchLoggingOptions: Enabled: true LogGroupName: "satcom-kdf" LogStreamName: "S3Delivery" CompressionFormat: UNCOMPRESSED ErrorOutputPrefix: year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type} Prefix: year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ ProcessingConfiguration: Enabled: true Processors: - Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt FirehoseProcessLambdaFunction.Arn - ParameterName: BufferSizeInMBs ParameterValue: 0.5 - ParameterName: BufferIntervalInSeconds ParameterValue: 60 Type: Lambda RoleARN: !GetAtt FirehoseDeliveryRole.Arn Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName", SatComFirehoseDelivStrm] ] # create the Lambda fxn that the KDF Stream references # first create the Lambda Execution Role FirehoseProcessLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole FirehoseProcessLambdaFunction: Type: AWS::Lambda::Function Properties: Description: Lambda function to process/transform KDF records as they come in Handler: lambda_function.lambda_handler Timeout: 60 Role: !GetAtt FirehoseProcessLambdaExecutionRole.Arn Code: S3Bucket: !Ref SatComAssetsS3Bucket S3Key: !Ref KdfLambdaZipName Runtime: python3.8 Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName", FirehoseProcessLambdaFunction] ] # Now create the Glue resources - Database, Crawlers, ETL job, workflow automation # start with the role we will use for all Glue operations GlueServiceRole: Type: AWS::IAM::Role Properties: RoleName: !Sub GlueServiceRole-${AWS::StackName} Description: Role for the AWS Glue job. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSGlueServiceRole - !Sub arn:${AWS::Partition}:iam::aws:policy/AmazonS3FullAccess - !Sub arn:${AWS::Partition}:iam::aws:policy/AmazonKinesisFullAccess # next create the top-level Databases we will use for the Glue Catalogs SatComInputGlueCatalogDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: satcom-glue-cat-db Description: Database catalog create by Cloudformation for SatCom Data Input # next create the top-level Databases we will use for the Glue Catalogs SatComOutputGlueCatalogDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: satcom-glue-cat-db-results Description: Database catalog create by Cloudformation for SatCom Data Results # create the crawler to crawl the source data to attain the schema SatComInputGlueCrawler: Type: AWS::Glue::Crawler Properties: Name: satcom-input-glue-crawl Role: !GetAtt GlueServiceRole.Arn DatabaseName: !Ref SatComInputGlueCatalogDatabase Targets: S3Targets: - Path: !Sub "s3://${InputS3Bucket}/" # Glue job that uses the generated Crawl, does Filter transform, and outputs to a new S3 table & Glue Catalog # NOTE - run the satcom-input-glue-crawl BEFORE invoking the Glue job!! SatComGlueJob: Type: AWS::Glue::Job Properties: Name: !Sub '${AWS::StackName}_glue_job' Description: Glue job that takes in S3 satcom input, does Filter Transform and puts out results to S3 and Glue Catalog Role: !GetAtt GlueServiceRole.Arn GlueVersion: 3.0 Command: Name: glueetl PythonVersion: 3 ScriptLocation: !Sub 's3://${SatComAssetsS3Bucket}/glue-scripts/glue_filter_transform.py' WorkerType: G.2X NumberOfWorkers: 10 ExecutionProperty: MaxConcurrentRuns: 1 Tags: Name : !Join - "-" - - Ref: AWS::StackName - SatComGlueJob DefaultArguments: --job-bookmark-option: job-bookmark-enable --job-language: python --enable-auto-scaling: true --enable-job-insights: true --enable-continuous-cloudwatch-log: true --enable-metrics: true --enable-glue-datacatalog: true --input_gluedatabase: !Ref SatComInputGlueCatalogDatabase --input_gluetable: !Ref InputS3Bucket --output_gluedatabase: !Ref SatComOutputGlueCatalogDatabase --output_gluetable: !Ref OutputS3Bucket --output_path: !Sub 's3://${OutputS3Bucket}/'