Description: > This template deploys a serverless application for automating lakeformation permissions Parameters: Prefix: Description: An environment name that will be prefixed to resource names Type: String Default: "" Environment: Description: SDLC environment Type: String Default: "" ManifestsBucketName: Description: Manifests bucket name Type: String Default: "manifests" LFAdmin: Description: SDLC environment Type: String Default: "Admin" LFDatabaseName: Type: String Default: lf_automation LFTableName: Type: String Default: nyc_taxi_data OrgId: Description: OrgId Type: String TestUserPassword: Type: String Description: Password for all test users. NoEcho: true Resources: LFRoutingLambda: Type: AWS::Lambda::Function Properties: FunctionName: "lakefomation_automation" Description: "Lakeformation Automation Lambda" Role: !GetAtt LFAutomationRole.Arn Handler: "index.lambda_handler" Runtime: "python3.8" ReservedConcurrentExecutions: 10 Timeout: 300 Environment: Variables: ACCOUNT_ID: !Ref AWS::AccountId REGION: !Ref AWS::Region ENV: !Ref Environment PREFIX: !Ref Prefix Code: ZipFile: | import json import boto3 import logging import re import time import os from datetime import datetime from urllib.parse import unquote_plus logger = logging.getLogger() logger.setLevel(logging.INFO) class Error(Exception): """Base class for other exceptions""" pass class LFAttributeError(Error): """Raised when one or more mandatory Lake Formation Permission Perameters are Missing""" pass def parse_s3_event(s3_event): """ Parses the S3 event Arguments: s3 event -- dict Returns: dict -- metadata dictionary with buckename, key """ return { 'bucket': s3_event['s3']['bucket']['name'], 'key': unquote_plus(s3_event['s3']['object']['key']), 'size': s3_event['s3']['object']['size'], 'last_modified_date': s3_event['eventTime'].split('.')[0]+'+00:00', 'timestamp': int(round(datetime.utcnow().timestamp()*1000, 0)) } def read_s3_content(bucket, key): """ Reads the contents of s3 object Arguments: bucket {str} -- Name of the bucket key {str} -- object key Returns: contents of s3 object """ try: s3 = boto3.resource('s3') obj = s3.Object(bucket, key) s3_content = obj.get()['Body'].read().decode('utf-8') s3_content = json.loads(obj.get()['Body'].read().decode('utf-8')) return s3_content except Exception as e: logger.error('Exception while reading data from s3::/{}/{}'.format(bucket, key)) raise e def generate_db_perm(perm_record): """ Creates a db perm json for granting discribe DB to cross account Arguments: perm_record {dict} -- a single perm records from incoming manifest file Returns: db_perm record -- {dict} Sample db_perm record: { 'AccountID': 'centralCatalogAccount #', 'Principal': 'consumptionAccount #', 'Table': { 'DatabaseName': 'dbname', 'TableWildcard': {} }, 'Permissions': ['SELECT', 'DESCRIBE'], 'PermissionsWithGrantOption': ['SELECT', 'DESCRIBE'], 'AccessType': 'grant' } """ logger.info('Generating DB_Perm record for {}'.format(perm_record)) arn_pattern = '^arn:(?P[^:\n]*):(?P[^:\n]*):(?P[^:\n]*):(?P[^:\n]*):(?P(?P[^:\/\n]*)[:\/])?(?P.*)$' arn_regex = re.compile(arn_pattern) regex_obj = arn_regex.match(perm_record['Principal']) if regex_obj: db_perm = {} table_json = {} table_wild_Card = {} db_perm['AccountID'] = os.environ['ACCOUNT_ID'] db_perm['Principal'] = regex_obj.group(4) if 'Table' in perm_record: if 'DatabaseName' not in perm_record['Table']: raise LFAttributeError table_json['DatabaseName'] = perm_record['Table']['DatabaseName'] elif 'TableWithColumns' in perm_record: if 'DatabaseName' not in perm_record['TableWithColumns']: raise LFAttributeError table_json['DatabaseName'] = perm_record['TableWithColumns']['DatabaseName'] else: raise LFAttributeError table_json['TableWildcard'] = table_wild_Card db_perm['Table'] = table_json db_perm['Permissions'] = ["SELECT", "DESCRIBE"] db_perm['PermissionsWithGrantOption'] = ["SELECT", "DESCRIBE"] db_perm['AccessType'] = "grant" return db_perm else: logger.error('Permissions Principal is not valid raising LFAttributeError') raise LFAttributeError def publish_sns(record): """ Publishes the message to central perm SNS Topic Arguments: perm_record {dict} -- perm record to publish Returns: SNS Response {dict} """ sns_client = boto3.client('sns') response_to_sns = { "perms_to_set" : record } logger.info('record ---> {} '.format(record)) logger.info('sending event to sns ---> {} '.format(response_to_sns)) response = sns_client.publish( TopicArn='arn:aws:sns:{}:{}:lakeformation-automation'.format(os.environ['REGION'], os.environ['ACCOUNT_ID']), Message= json.dumps(response_to_sns), MessageStructure='string', MessageAttributes={ 'account_id': { 'DataType': 'String', 'StringValue': str(record['AccountID']) } } ) logger.info('response from sns ---> {} '.format(response)) return response def lambda_handler(event, context): app = os.environ['PREFIX'] env = os.environ['ENV'] acc_id = os.environ['ACCOUNT_ID'] region = os.environ['REGION'] arn_pattern = '^arn:(?P[^:\n]*):(?P[^:\n]*):(?P[^:\n]*):(?P[^:\n]*):(?P(?P[^:\/\n]*)[:\/])?(?P.*)$' arn_regex = re.compile(arn_pattern) try: logger.info('Received {} messages'.format(len(event['Records']))) logger.info('messages {}'.format(event)) for record in event['Records']: event_body = json.loads(record['body'])['Records'][0] message = parse_s3_event(event_body) s3_content = read_s3_content(message['bucket'], message['key']) for perm_record in s3_content['Records']: regex_obj = arn_regex.match(perm_record['Principal']) if perm_record['AccessType'] == 'grant': if regex_obj.group(4) != acc_id: response = publish_sns(generate_db_perm(perm_record)) if response['ResponseMetadata']['HTTPStatusCode'] == 200: logger.info('DB Perm Record Published to sns {}'.format(s3_content)) time.sleep(3) response = publish_sns(perm_record) logger.info('response of actual perm block -- {}'.format(response)) logger.info('Processing Permissions for perm json started --> {} '.format(s3_content)) except Exception as e: raise e LFEventSourceMapping: Type: AWS::Lambda::EventSourceMapping DependsOn: LFAutomationPolicy Properties: BatchSize: 1 FunctionName: !Ref LFRoutingLambda EventSourceArn: !GetAtt LFRoutingQueue.Arn LFRoutingLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LFRoutingLambda Principal: sqs.amazonaws.com SourceArn: !GetAtt LFRoutingQueue.Arn LFAutomationRole: Type: AWS::IAM::Role Properties: RoleName: "lakeformation_automation" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" LFAutomationPolicy: Type: AWS::IAM::Policy Properties: PolicyName: lakeformation_automation Roles: - !Ref LFAutomationRole PolicyDocument: Version: "2012-10-17" Statement: - Sid: AllowSQS Effect: Allow Action: - sqs:ReceiveMessage - sqs:List* - sqs:Get* - sqs:Delete* Resource: !GetAtt LFRoutingQueue.Arn - Sid: AllowSNS Effect: Allow Action: - sns:Publish* Resource: !Ref LFTopic - Sid: LakeFormationDataAccessPermissionsForS3 Effect: Allow Action: - s3:*Object Resource: !Join ['',['arn:aws:s3:::',!Ref ManifestBucket, '/*' ]] - Sid: LakeFormationDataAccessPermissionsForS3ListBucket Effect: Allow Action: - s3:ListBucket Resource: !Join ['',['arn:aws:s3:::',!Ref ManifestBucket, '/*' ]] - Sid: Logging Action: - logs:CreateLog* - logs:PutLogEvents Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/lakefomation_automation:*" Effect: Allow - Sid: KMS Action: - kms:GenerateDataKey - kms:Decrypt Resource: !GetAtt DataCMK.Arn Effect: Allow DataCMK: Type: AWS::KMS::Key Properties: Description: key to encrypt manifest data EnableKeyRotation: true KeyPolicy: Version: '2012-10-17' Id: default Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: AWS: !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] Action: kms:* Resource: '*' - Sid: S3Permissions Effect: Allow Principal: Service: "s3.amazonaws.com" Action: kms:* Resource: '*' - Sid: SNSPermissions Effect: Allow Principal: Service: "sns.amazonaws.com" Action: kms:* Resource: '*' ManifestBucket: Type: AWS::S3::Bucket DependsOn: LFRoutingQueuePolicy Properties: BucketName: !Join [ "-", [ !Ref ManifestsBucketName, !Ref "AWS::AccountId" ]] BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' NotificationConfiguration: QueueConfigurations: - Event: 's3:ObjectCreated:*' Queue: !GetAtt LFRoutingQueue.Arn Filter: S3Key: Rules: - Name: suffix Value: .json ManifestBucketPolicy: Type: 'AWS::S3::BucketPolicy' Properties: Bucket: !Ref ManifestBucket PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:GetObject' - 's3:PutObject' - 's3:PutObjectAcl' Effect: Allow Resource: !Join - '' - - 'arn:aws:s3:::' - !Ref ManifestBucket - /* Principal: AWS: - !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] LFTopic: Type: AWS::SNS::Topic Properties: DisplayName: lakeformation-automation TopicName: lakeformation-automation KmsMasterKeyId: !Ref DataCMK LFTopicPolicy: Type: AWS::SNS::TopicPolicy Properties: Topics: - !Ref LFTopic PolicyDocument: Version: "2012-10-17" Statement: - Sid: AllowLocalToPublish Effect: Allow Principal: AWS: !Ref AWS::AccountId Action: sns:Publish Resource: !GetAtt LFRoutingLambda.Arn - Sid: AllowConsumptionAccountsToSubscribe Effect: Allow Principal: AWS: "*" Action: - SNS:Subscribe - SNS:GetTopicAttributes Resource: "*" Condition: StringEquals: aws:PrincipalOrgID: !Ref OrgId LFRoutingQueue: Type: AWS::SQS::Queue Properties: QueueName: lf-automation VisibilityTimeout: 300 KmsMasterKeyId: !Ref DataCMK RedrivePolicy: deadLetterTargetArn: !GetAtt LFRoutingDLQ.Arn maxReceiveCount: 1 LFRoutingQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: Queues: - !Ref LFRoutingQueue PolicyDocument: Version: "2012-10-17" Statement: - Sid: "Allow-s3-notifications" Effect: "Allow" Principal: Service: "s3.amazonaws.com" Action: "sqs:SendMessage" Resource: !GetAtt LFRoutingQueue.Arn LFRoutingDLQ: Type: AWS::SQS::Queue Properties: QueueName: lf-automation-dlq MessageRetentionPeriod: 1209600 VisibilityTimeout: 60 KmsMasterKeyId: !Ref DataCMK LFPermissionsLambda: Type: AWS::Lambda::Function Properties: FunctionName: "lakeformation_permissions" Description: "Lakeformation Permissions Lambda" Role: !GetAtt DatalakeAdminRole.Arn Handler: "index.lambda_handler" Runtime: "python3.8" Timeout: 300 ReservedConcurrentExecutions: 10 Environment: Variables: ACCOUNT_ID: !Ref AWS::AccountId REGION: !Ref AWS::Region ENV: !Ref Environment PREFIX: !Ref Prefix FOUNDATION_ACCOUNT_ID: !Ref AWS::AccountId Code: ZipFile: | import json import boto3 import logging import os from botocore.config import Config logger = logging.getLogger() logger.setLevel(logging.INFO) class Error(Exception): """Base class for other exceptions""" pass class LFAttributeError(Error): """Raised when one or more mandatory Lake Formation Permission Perameters are Missing""" pass def grant_db_describe(principal, database): """ Grants 'DESCRIBE' on database to the Principal Arguments: principal {str} -- Principal to which DB describe is needed database {str} -- Database Name Returns: response {dict} -- response from Lakeformation API call """ Name = database permissions = ['DESCRIBE'] database_json = {} Database = { 'Name': database } database_json['Database'] = Database client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) logger.info('Granting DB Describe on resource {} for Principal {}' .format(principal, database )) response= client.grant_permissions(Principal=principal, Resource=database_json, Permissions=permissions) logger.info('DB DESCRIBE Grant Response {}'.format(response)) return response def buildjson(event): """ builds the json event consumed by Lakeformation API Arguments: event {dict} -- event that is pushed to account specific queue Returns: principal_json {dict} -- (sample event below) Principal={ 'DataLakePrincipalIdentifier': 'string' } table_json {dict} -- (sample event below) 'Table': { 'CatalogId': 'string', 'DatabaseName': 'string', 'Name': 'string', 'TableWildcard': {} } tableWithColumns_json {dict} -- (sample event below) 'TableWithColumns': { 'CatalogId': 'string', 'DatabaseName': 'string', 'Name': 'string', 'ColumnNames': [ 'string', ], 'ColumnWildcard': { 'ExcludedColumnNames': [ 'string', ] } } perm_json {dict} -- (sample event below) Permissions=[ 'ALL'|'SELECT'|'ALTER'|'DROP'|'DELETE'|'INSERT'|'DESCRIBE'| 'CREATE_DATABASE'|'CREATE_TABLE'|'DATA_LOCATION_ACCESS'|'CREATE_TAG'| 'ALTER_TAG'|'DELETE_TAG'|'DESCRIBE_TAG'|'ASSOCIATE_TAG', ] perm_grant_json {dict} -- PermissionsWithGrantOption=[ 'ALL'|'SELECT'|'ALTER'|'DROP'|'DELETE'|'INSERT'|'DESCRIBE'| 'CREATE_DATABASE'|'CREATE_TABLE'|'DATA_LOCATION_ACCESS'|'CREATE_TAG'| 'ALTER_TAG'|'DELETE_TAG'|'DESCRIBE_TAG'|'ASSOCIATE_TAG', ] """ principal_json = {} table_json = {} tableWithColumns_json = {} perm_json = {} perm_grant_json = {} if 'Principal' in event: principal_json['DataLakePrincipalIdentifier'] = event['Principal'] else: raise LFAttributeError if 'Table' in event: if 'DatabaseName' not in event['Table']: raise LFAttributeError table_json['DatabaseName'] = event['Table']['DatabaseName'] # Need to create a env variable Foundations Account ID table_json['CatalogId'] = os.environ['ACCOUNT_ID'] response = grant_db_describe(principal_json, table_json['DatabaseName']) if 'foundation_' in table_json['DatabaseName']: table_json['DatabaseName']=table_json['DatabaseName'].split('foundation_')[1] if 'Name' in event['Table']: table_json['Name'] = event['Table']['Name'] elif 'TableWildcard' in event['Table']: table_json['TableWildcard'] = event['Table']['TableWildcard'] else: raise LFAttributeError elif 'TableWithColumns' in event: if 'DatabaseName' not in event['TableWithColumns']: raise LFAttributeError tableWithColumns_json['DatabaseName'] = event['TableWithColumns']['DatabaseName'] tableWithColumns_json['CatalogId'] = os.environ['ACCOUNT_ID'] response = grant_db_describe(principal_json, tableWithColumns_json['DatabaseName']) if 'foundation_' in tableWithColumns_json['DatabaseName']: tableWithColumns_json['DatabaseName']=tableWithColumns_json['DatabaseName'].split('foundation_')[1] if 'Name' not in event['TableWithColumns']: raise LFAttributeError tableWithColumns_json['Name'] = event['TableWithColumns']['Name'] if 'ColumnNames' in event['TableWithColumns']: tableWithColumns_json['ColumnNames'] = event['TableWithColumns']['ColumnNames'] elif 'ColumnWildcard' in event['TableWithColumns']: tableWithColumns_json['ColumnWildcard'] = event['TableWithColumns']['ColumnWildcard'] else: raise LFAttributeError else: raise LFAttributeError if 'Permissions' in event: perm_lit = ["SELECT", "DESCRIBE"] if list(set(perm_lit) - set(event['Permissions'])): logger.info('Found permissions other than SELECT and DESCRIBE ignoring them') perm_json['Permissions'] = perm_lit else: perm_json['Permissions'] = event['Permissions'] else: LFAttributeError if 'PermissionsWithGrantOption' in event: perm_grant_json['PermissionsWithGrantOption'] = ["SELECT", "DESCRIBE"] return principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json def grant_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json): """ Grants the specified permissions to the Pricncipal on the Respective resources Arguments: principal_json {dict} -- Principal which requries grant table_json {dict} -- Resource to grant permissions tableWithColumns_json {dict} -- Resource to grant permissions perm_json {dict} -- permissions that are applied to the resource perm_grant_json {dict} -- grantable permission on the resource Returns: response {dict} -- Response from Lakeformation API call """ logger.info('Granting Lakeformation Permissions ....') try: resource = {} if table_json: resource['Table'] = table_json elif tableWithColumns_json: resource['TableWithColumns'] = tableWithColumns_json if perm_grant_json: perm_with_grant = perm_grant_json['PermissionsWithGrantOption'] else: perm_with_grant = [] client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) response= client.grant_permissions(Principal=principal_json, Resource=resource, Permissions=perm_json['Permissions'], PermissionsWithGrantOption=perm_with_grant) logger.info('Grant permissions API response: {}'.format(response)) return response except Exception as e: logger.info("lambda Failed") raise e def revoke_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json): """ Revokes the specified permissions to the Pricncipal on the Respective resources Arguments: principal_json {dict} -- Principal which requries grant table_json {dict} -- Resource to grant permissions tableWithColumns_json {dict} -- Resource to grant permissions perm_json {dict} -- permissions that are applied to the resource perm_grant_json {dict} -- grantable permission on the resource Returns: response {dict} -- Response from Lakeformation API call """ logger.info('Revoking Lakeformation Permissions ...') try: resource = {} if table_json: resource['Table'] = table_json elif tableWithColumns_json: resource['TableWithColumns'] = tableWithColumns_json client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) response= client.revoke_permissions(Principal=principal_json, Resource=resource, Permissions=perm_json['Permissions']) logger.info('Revoke permissions API response: {}'.format(response)) return response except Exception as e: logger.info("Revoke permissions Method failed with exception {}".format(e)) raise e def lambda_handler(event, context): try: logger.info('Received {} messages'.format(len(event['Records']))) logger.info('messages {}'.format(event)) for record in event['Records']: event_body = json.loads(json.loads(record['body'])['Message'])['perms_to_set'] logger.info('Processing Permissions for: {}'.format(event_body)) principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json = buildjson(event_body) logger.info('created permissions JSONs - principal json : {},table_json {},tableWithColumns_json {}, perm_json {} ' .format(principal_json, table_json, tableWithColumns_json, perm_json)) if event_body['AccessType'].lower() == 'grant': logger.info('Calling Grant permissions for {} on resource {} or {} permissions {}'.format(principal_json, table_json, tableWithColumns_json, perm_json)) response = grant_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json) elif event_body['AccessType'].lower() == 'revoke': logger.info('Calling Revoke permissions for {} on resource {} or {} permissions {}'.format(principal_json, table_json, tableWithColumns_json, perm_json)) response = revoke_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json, perm_grant_json) else: raise LFAttributeError except Exception as e: logger.error("Fatal error", exc_info=True) raise e return LFPermissionsLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LFPermissionsLambda Principal: sqs.amazonaws.com SourceArn: !GetAtt LFPermissionsQueue.Arn LFPermissionsEventSourceMapping: Type: AWS::Lambda::EventSourceMapping DependsOn: DatalakeAdminPolicy Properties: BatchSize: 1 FunctionName: !Ref LFPermissionsLambda EventSourceArn: !GetAtt LFPermissionsQueue.Arn RemoveDataCatalogDefaultSettingsLambda: Type: AWS::Lambda::Function Properties: Description: Custom Resource Lambda that loads the data bucket Handler: index.handler Runtime: python3.7 Timeout: 300 Role: !GetAtt RemoveDataCatalogDefaultSettingsRole.Arn ReservedConcurrentExecutions: 3 Code: ZipFile: !Sub | import os import boto3 import json import cfnresponse import urllib.request def handler(event, context): try: client = boto3.client('lakeformation') principals = [ 'arn:aws:iam::${AWS::AccountId}:role/${LFAdmin}', '${DatalakeAdminRole.Arn}' ] dladmins = [{'DataLakePrincipalIdentifier': principal} for principal in principals] response = client.put_data_lake_settings( DataLakeSettings={ 'DataLakeAdmins': dladmins, 'CreateDatabaseDefaultPermissions': [], 'CreateTableDefaultPermissions': [] } ) # signal cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, context.log_stream_name) except Exception as err: print("Error in Custom Resource", err) # signal cfnresponse.send(event, context, cfnresponse.FAILED, {}, context.log_stream_name) RemoveDataCatalogDefaultSettingsRole: Type: AWS::IAM::Role Properties: RoleName: "remove-data-catalog-default-settings" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: AllowLakeformationSettings PolicyDocument: Version: "2012-10-17" Statement: Effect: Allow Action: - lakeformation:*DataLakeSettings Resource: "*" - PolicyName: AllowBasicCloudwatchLogs PolicyDocument: Version: "2012-10-17" Statement: Effect: Allow Action: - logs:Create* - logs:PutLogEvents Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/RemoveDataCatalogDefault:*" RemoveDataCatalogDefaultSettings: Type: Custom::RemoveDataCatalogDefaultSettings Properties: ServiceToken: !GetAtt RemoveDataCatalogDefaultSettingsLambda.Arn DatalakeAdminRole: Type: AWS::IAM::Role Properties: RoleName: "lakeformation-admin" ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLakeFormationDataAdmin - arn:aws:iam::aws:policy/AWSLakeFormationCrossAccountManager AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" DatalakeAdminPolicy: Type: AWS::IAM::Policy Properties: PolicyName: "lakeformation-permissions" Roles: - !Ref DatalakeAdminRole PolicyDocument: Version: "2012-10-17" Statement: - Sid: AllowSQS Effect: Allow Action: - sqs:ReceiveMessage - sqs:List* - sqs:Get* - sqs:Delete* Resource: !GetAtt LFPermissionsQueue.Arn - Sid: AllowSNSPublish Effect: Allow Action: - sns:Publish* Resource: !Ref LFTopic - Sid: LakeFormationDataAccessPermissionsForS3 Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:DeleteObject Resource: - !GetAtt s3BucketPri.Arn - !Join [ '', [!GetAtt s3BucketPri.Arn, '/*']] - Sid: LakeFormationDataAccessPermissionsForS3ListBucket Effect: Allow Action: - s3:ListBucket Resource: - !GetAtt s3BucketPri.Arn - !Join [ '', [!GetAtt s3BucketPri.Arn, '*']] - Sid: AllowLogs Effect: Allow Action: - logs:Create* - logs:PutLogEvents Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/lakeformation_permissions:*" - Sid: CMKPerms Effect: Allow Action: - kms:Decrypt Resource: !GetAtt DataCMK.Arn ConsumptionTestingRole: Type: AWS::IAM::Role Properties: RoleName: consumption-testing AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: AWS: - !Ref AWS::AccountId Action: - "sts:AssumeRole" ConsumptionTestingPolicy: Type: AWS::IAM::Policy Properties: PolicyName: consumption-testing Roles: - !Ref ConsumptionTestingRole PolicyDocument: Version: "2012-10-17" Statement: - Action: - s3:ListAllMyBuckets - s3:ListBucket - s3:HeadBucket Effect: Allow Resource: - !Sub "arn:aws:s3:::${lfs3athenaoutput}" - !Sub "arn:aws:s3:::${lfs3athenaoutput}/*" LFPermissionsQueue: Type: AWS::SQS::Queue Properties: QueueName: lakeformation-permissions VisibilityTimeout: 300 KmsMasterKeyId: !Ref DataCMK RedrivePolicy: deadLetterTargetArn: !GetAtt LFPermissionsDLQ.Arn maxReceiveCount: 1 LFPermissionsDLQ: Type: AWS::SQS::Queue Properties: QueueName: lakeformation-permissions-dlq MessageRetentionPeriod: 1209600 VisibilityTimeout: 60 KmsMasterKeyId: !Ref DataCMK LFPermissionsQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: Queues: - !Ref LFPermissionsQueue PolicyDocument: Version: "2012-10-17" Id: "CrossAccount" Statement: - Sid: "Allow-xacct-messages-from-SNS" Effect: "Allow" Principal: Service: "sns.amazonaws.com" Action: "sqs:SendMessage" Resource: !GetAtt LFPermissionsQueue.Arn Condition: ArnEquals: aws:SourceArn: !Ref LFTopic TopicSubscription: Type: AWS::SNS::Subscription Properties: Endpoint: !GetAtt LFPermissionsQueue.Arn FilterPolicy: account_id: - !Ref AWS::AccountId Protocol: sqs TopicArn: !Ref LFTopic LoadDataBucketRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - !Ref LoadDataS3Policy LoadDataS3Policy: Type: AWS::IAM::ManagedPolicy Properties: Description: S3 and KMS Key Access for the Load Data Lambda Path: / PolicyDocument: Version: 2012-10-17 Statement: - Effect: "Allow" Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: - !GetAtt s3BucketPri.Arn - !Join [ '', [!GetAtt s3BucketPri.Arn, '/*']] - "arn:aws:s3:::nyc-tlc/*" DataAccessPolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: S3 and KMS Key Access Path: / Roles: - !Ref iamRolePriDataLakeAdmin PolicyDocument: Version: 2012-10-17 Statement: - Effect: "Allow" Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: - !GetAtt s3BucketPri.Arn - !Join [ '', [!GetAtt s3BucketPri.Arn, '/*']] AthenaQueryOutputPolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: Policy allowing access to Athena output S3 Bucket PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: - !Sub "arn:aws:s3:::${lfs3athenaoutput}" - !Sub "arn:aws:s3:::${lfs3athenaoutput}/*" LoadDataBucketLambda: Type: AWS::Lambda::Function DependsOn: LoadDataS3Policy Properties: Description: Custom Resource Lambda that loads the data bucket Handler: index.handler Runtime: python3.7 ReservedConcurrentExecutions: 5 Timeout: 300 Role: !GetAtt LoadDataBucketRole.Arn Code: ZipFile: | import os import boto3 import cfnresponse import urllib.request def handler(event, context): try: bucket_name = os.environ.get("DATA_BUCKET") # copy data to s3 bucket s3 = boto3.resource('s3') data_copy_source = { 'Bucket': 'nyc-tlc', 'Key': 'trip data/yellow_tripdata_2020-06.csv' } bucket = s3.Bucket(bucket_name) bucket.copy(data_copy_source, 'glue/nyctaxi/yellow_tripdata_2020-06.csv') # signal cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, context.log_stream_name) except Exception as err: print("Error in Custom Resource", err) # signal cfnresponse.send(event, context, cfnresponse.FAILED, {}, context.log_stream_name) Environment: Variables: DATA_BUCKET: !Ref s3BucketPri LoadDataBucket: Type: Custom::LoadDataBucket Properties: ServiceToken: !GetAtt LoadDataBucketLambda.Arn s3AccessPolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: Policy allowing access to the created S3 bucket GDC and its keys PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: - !GetAtt s3BucketPri.Arn - !Join [ '', [!GetAtt s3BucketPri.Arn, '/*']] glueDatabasePri: DependsOn: - "RemoveDataCatalogDefaultSettings" Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Description: Glue Database for Lake Formation Cross Account Name: !Ref LFDatabaseName Parameters: { "CreateTableDefaultPermissions": "" } glueTablePri: Type: AWS::Glue::Table DependsOn: glueDatabasePri Properties: DatabaseName: !Ref LFDatabaseName CatalogId: !Ref AWS::AccountId TableInput: Name: !Ref LFTableName Description: NYC Taxi Trips TableType: EXTERNAL_TABLE Parameters: { "classification": "csv" } StorageDescriptor: OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Columns: - Name: vendorid Type: bigint - Name: lpep_pickup_datetime Type: string - Name: lpep_dropoff_datetime Type: string - Name: store_and_fwd_flag Type: string - Name: ratecodeid Type: bigint - Name: pulocationid Type: bigint - Name: dolocationid Type: bigint - Name: passenger_count Type: bigint - Name: trip_distance Type: double - Name: fare_amount Type: double - Name: extra Type: double - Name: mta_tax Type: double - Name: tolls_amount Type: double - Name: ehail_fee Type: string - Name: improvement_surcharge Type: double - Name: total_amount Type: double - Name: payment_type Type: bigint - Name: trip_type Type: bigint InputFormat: org.apache.hadoop.mapred.TextInputFormat Location: !Sub "s3://${s3BucketPri}/" SerdeInfo: Parameters: field.delim: "," SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe s3BucketPri: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true s3BucketPriPolicy: Type: 'AWS::S3::BucketPolicy' Properties: Bucket: !Ref s3BucketPri PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:GetObject' - 's3:PutObject' - 's3:PutObjectAcl' Effect: Allow Resource: !Join - '' - - 'arn:aws:s3:::' - !Ref s3BucketPri - /* Principal: AWS: - !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] lfresource: Type: AWS::LakeFormation::Resource Properties: ResourceArn: !GetAtt s3BucketPri.Arn UseServiceLinkedRole: true lfs3athenaoutput: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true lfs3athenaoutputPolicy: Type: 'AWS::S3::BucketPolicy' Properties: Bucket: !Ref lfs3athenaoutput PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:GetObject' - 's3:PutObject' - 's3:PutObjectAcl' Effect: Allow Resource: !Join - '' - - 'arn:aws:s3:::' - !Ref lfs3athenaoutput - /* Principal: AWS: - !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] lfAthenaWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: LakeFormationCrossAccount Description: Workgroup to access Lake Formation Cross Account State: ENABLED WorkGroupConfiguration: ResultConfiguration: OutputLocation: !Sub "s3://${lfs3athenaoutput}/" iamRolePriDataLakeAdmin: Type: AWS::IAM::Role Properties: RoleName: "LakeFormationPrimaryAdmin" Path: "/" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" Action: "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLakeFormationDataAdmin - arn:aws:iam::aws:policy/AmazonAthenaFullAccess - arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsReadOnlyAccess - arn:aws:iam::aws:policy/AWSLakeFormationCrossAccountManager - !Ref lfprirampolicy - !Ref LFServiceLinkedPolicy - !Ref AthenaQueryOutputPolicy LFServiceLinkedPolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: Policy allowing LakeFormation register locations and update the role AWSServiceRoleForLakeFormationDataAccess PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: iam:CreateServiceLinkedRole Resource: "*" Condition: StringEquals: iam:AWSServiceName: lakeformation.amazonaws.com - Effect: Allow Action: iam:PutRolePolicy Resource: !Sub "arn:aws:iam::${AWS::AccountId}:role/aws-service-role/lakeformation.amazonaws.com/AWSServiceRoleForLakeFormationDataAccess" lfprirampolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: Policy allowing access to RAM, required to share tables cross account with Lake Formation PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - ram:*ResourceShareInvitation - organizations:DescribeAccount - ec2:DescribeAvailabilityZones - ram:EnableSharingWithAwsOrganization Resource: "*" AdminUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLakeFormationCrossAccountManager - arn:aws:iam::aws:policy/AmazonEC2ReadOnlyAccess UserName: lf-admin Groups: - !Ref DefaultIAMUserGroup AdminUserPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: LF-DataLake-Admin-Policy Groups: - !Ref LFAdminGroup PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - lakeformation:*Resource - lakeformation:Get* - lakeformation:Batch* - lakeformation:List* - lakeformation:*Permissions - lakeformation:PutDataLakeSettings - cloudtrail:DescribeTrails - cloudtrail:LookupEvents - glue:*Database - glue:SearchTables - glue:*Table - glue:Get* - glue:List* - glue:Batch* - glue:DeleteWorkflow - glue:StartWorkflowRun - s3:List* - s3:GetBucket* - iam:List* - iam:Get* - tag:Get* - ec2:*SecurityGroupEgress - ec2:*SecurityGroupIngress - iam:ChangePassword Resource: "*" - Effect: Allow Action: iam:PassRole Resource: - arn:aws:iam::*:role/LF-GlueServiceRole - arn:aws:iam::*:role/LF-EMR-Notebook* DeveloperUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-developer Groups: - !Ref DefaultIAMUserGroup AthenaQueryResultPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: LF-Athena-Query-Result-Policy Groups: - !Ref DefaultIAMUserGroup PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:Put* - s3:Get* - s3:List*" Resource: !Sub "arn:aws:s3:::${lfs3athenaoutput}/*" - Effect: Allow Action: - iam:ChangePassword Resource: "*" TaxiManagerUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-taxi-manager Groups: - !Ref DefaultIAMUserGroup BusinessAnalystUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-business-analyst Groups: - !Ref DefaultIAMUserGroup LFAdminGroup: Type: AWS::IAM::Group Properties: GroupName: lf-admins DefaultIAMUserGroup: Type: AWS::IAM::Group Properties: GroupName: lf-default-users Outputs: TopicArn: Value: !Ref LFTopic