AWSTemplateFormatVersion: '2010-09-09' Description: 'Healthcare data lake - core resources' Parameters: BucketName: Description: Bucket for ingesting and processing healthcare data Type: String ArtifactBucketName: Description: Bucket for holding software artifacts Type: String Resources: #-------------------------------------------------------------- Data Lake bucket Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Ref BucketName BucketEncryption: # Encryption at rest ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: # Block all public access BlockPublicAcls: true IgnorePublicAcls: true BlockPublicPolicy: true RestrictPublicBuckets: true VersioningConfiguration: Status: Enabled BucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref Bucket PolicyDocument: Version: 2012-10-17 Statement: - Action: 's3:*' # Only allow secure transport Effect: Deny Principal: '*' Resource: - !Sub arn:aws:s3:::${Bucket} - !Sub arn:aws:s3:::${Bucket}/* Condition: Bool: 'aws:SecureTransport': false ArtifactBucket: Type: AWS::S3::Bucket Properties: BucketName: !Ref ArtifactBucketName BucketEncryption: # Encryption at rest ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: # Block all public access BlockPublicAcls: true IgnorePublicAcls: true BlockPublicPolicy: true RestrictPublicBuckets: true ArtifactBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref ArtifactBucket PolicyDocument: Version: 2012-10-17 Statement: - Action: 's3:*' # Only allow secure transport Effect: Deny Principal: '*' Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Condition: Bool: 'aws:SecureTransport': false #-------------------------------------------------------------- Catalog to go with the data lake buckets Catalog: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "message_id" AttributeType: "S" KeySchema: - AttributeName: "message_id" KeyType: "HASH" TableName: !Sub ${AWS::StackName}-Catalog BillingMode: PAY_PER_REQUEST #-------------------------------------------------------------- Communication / hub topic (use attributes to determine filtering) Topic: Type: AWS::SNS::Topic Properties: TopicName: !Sub ${AWS::StackName}-Topic DisplayName: "Pub-Sub-Hub" KmsMasterKeyId: alias/aws/sns # Using default key #-------------------------------------------------------------- Write messages to the data lake and catalog LambdaFunction: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${AWS::StackName}_write_messages" Description: Write messages to data lake and catalog Code: ZipFile: | import boto3, json, os, logging from botocore.exceptions import ClientError from boto3.dynamodb.conditions import Key, Attr logger = logging.getLogger() logger.setLevel(logging.INFO) # Initialize clients outside of the main function to reuse between calls sns = boto3.client('sns') s3 = boto3.client('s3') catalog = boto3.resource('dynamodb').Table(os.environ['catalog']) def lambda_handler(event, context): logger.info(event) # Get data that was passed from SNS data = { 'MessageId': event['Records'][0]['Sns']['MessageId'], 'Message':event['Records'][0]['Sns']['Message'], 'MessageAttributes': event['Records'][0]['Sns']['MessageAttributes'] } logger.info(data) logger.info(data['MessageAttributes']) msg = data['Message'] #---------- Write to the data lake # Determine the attributes protocol = data['MessageAttributes']['protocol']['Value'] format_type = data['MessageAttributes']['format']['Value'] source = data['MessageAttributes']['source']['Value'] event = data['MessageAttributes']['event']['Value'] if event == 'ingested': zone = 'ingestion' elif event == 'staged': zone = 'staging' elif event == 'error': zone = 'error' if format_type == 'json': content_type = "application/json; charset=utf-8" else: content_type = "text/plain; charset=utf-8" # Assemble the key key = zone + '/protocol=' + protocol + '/' + data['MessageId'] +"." +format_type logger.info('Key: {}'.format(key)) # Write to the bucket s3.put_object( Bucket = os.environ['bucket_name'], Key=key, Body=msg, ContentType=content_type ) logger.info("Message written to S3 bucket") # Update the catalog catalog.put_item( Item={ 'message_id': data['MessageId'], 'bucket': os.environ['bucket_name'], 'key': key, 'source': source } ) logger.info("Catalog updated") Handler: index.lambda_handler Role: !GetAtt LambdaRole.Arn Runtime: python3.9 Environment: Variables: catalog: !Ref Catalog bucket_name: !Ref Bucket LambdaLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Join ['/', ['/aws/lambda', !Ref LambdaFunction]] RetentionInDays: 1 # Keep logs for a short duration LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: [lambda.amazonaws.com] Action: ['sts:AssumeRole'] ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole # Provides access to CloudWatch for logging Policies: - PolicyName: s3 PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: ['s3:PutObject', 's3:PutObjectTagging'] Resource: !Sub arn:aws:s3:::${Bucket}/* # Fn::Join: ["", ["arn:aws:s3:::", !Ref Bucket, "/*"]] - PolicyName: dynamodb PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: ['dynamodb:*'] Resource: !GetAtt Catalog.Arn # Subscription for Lambda function to SNS topic Subscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref Topic Endpoint: !GetAtt LambdaFunction.Arn Protocol: lambda # Permission for SNS topic to invoke this Lambda LambdaResourcePolicy: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref LambdaFunction Principal: sns.amazonaws.com Action: "lambda:InvokeFunction" SourceArn: !Ref Topic Outputs: Bucket: Value: !Ref Bucket Export: Name: !Sub ${AWS::StackName}-Bucket ArtifactBucket: Value: !Ref ArtifactBucket Export: Name: !Sub ${AWS::StackName}-ArtifactBucket Topic: Value: !Ref Topic Export: Name: !Sub ${AWS::StackName}-Topic Catalog: Value: !Ref Catalog Export: Name: !Sub ${AWS::StackName}-Catalog CatalogArn: Value: !GetAtt Catalog.Arn Export: Name: !Sub ${AWS::StackName}-CatalogArn