Description: > This template deploys a serverless applications for automating lakeformation permissions on consumption accout Parameters: Prefix: Description: An environment name that will be prefixed to resource names Type: String Default: "" Environment: Description: SDLC environment Type: String Default: "" LFAdmin: Description: SDLC environment Type: String Default: "Admin" TopicArn: Description: Topic arn in central account Type: String CentralAccount: Type: Number TestUserPassword: Type: String Description: Password for all test users. NoEcho: true Resources: LFPermissionsLambda: Type: AWS::Lambda::Function Properties: FunctionName: "lakeformation_permissions" Description: "Lakeformation Permissions Lambda" Role: !GetAtt DatalakeAdminRole.Arn Handler: "index.lambda_handler" Runtime: "python3.8" Timeout: 300 ReservedConcurrentExecutions: 10 Environment: Variables: ACCOUNT_ID: !Ref AWS::AccountId REGION: !Ref AWS::Region FOUNDATION_ACCOUNT_ID: !Ref CentralAccount Code: ZipFile: | import json import boto3 import logging import os from botocore.config import Config logger = logging.getLogger() logger.setLevel(logging.INFO) acc_id = os.environ['ACCOUNT_ID'] region = os.environ['REGION'] f_acc_id = os.environ['FOUNDATION_ACCOUNT_ID'] class Error(Exception): """Base class for other exceptions""" pass class LFAttributeError(Error): """Raised when one or more mandatory Lake Formation Permission Perameters are Missing""" pass def check_db_exist(glue_client, database): """ Check if Database exist on consumption Arguments: glue_client -- glue client object database {str} -- database Name Returns - True/False {bool} """ try: response = glue_client.get_database(Name=database) return True except glue_client.exceptions.EntityNotFoundException: return False except Exception as e: ErrorMessage = str(e) logger.info("Exception occured while checking for DB existance - {}".format(ErrorMessage)) raise e def grant_db_describe(principal, database): """ Grants 'DESCRIBE' on database to the Principal Arguments: principal {str} -- Principal to which DB describe is needed database {str} -- Database Name Returns: response {dict} -- response from Lakeformation API call """ consumption_acct = acc_id Name = database permissions = ['DESCRIBE'] database_json = {} if 'foundation_' not in database: database='foundation_'+database glue_client = boto3.client('glue') db_exist = check_db_exist(glue_client,database) if not db_exist: # Resource link creation on Consumption account foundations_catalog = f_acc_id logger.info("{} database doesn't exist, creating Resource Link(DB)".format(database)) response = glue_client.create_database( DatabaseInput= { 'Name': database, 'TargetDatabase': { 'CatalogId': foundations_catalog, 'DatabaseName': database.split('foundation_')[1] } } ) if response['ResponseMetadata']['HTTPStatusCode'] == 200: logger.info('Successfully create Resource Link --> {}'.format(database.split('foundation_')[1])) Database = { 'CatalogId': consumption_acct, 'Name': database } database_json['Database'] = Database client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) logger.info('Granting DB Describe on resource {} for Principal {} with Consumption ACC ID {}' .format(principal, database, consumption_acct )) response= client.grant_permissions(Principal=principal, Resource=database_json, Permissions=permissions) logger.info('DB DESCRIBE Grant Response {}'.format(response)) return response def buildjson(event): """ builds the json event consumed by Lakeformation API Arguments: event {dict} -- event that is pushed to account specific queue Returns: principal_json {dict} -- (sample event below) Principal={ 'DataLakePrincipalIdentifier': 'string' } table_json {dict} -- (sample event below) 'Table': { 'CatalogId': 'string', 'DatabaseName': 'string', 'Name': 'string', 'TableWildcard': {} } tableWithColumns_json {dict} -- (sample event below) 'TableWithColumns': { 'CatalogId': 'string', 'DatabaseName': 'string', 'Name': 'string', 'ColumnNames': [ 'string', ], 'ColumnWildcard': { 'ExcludedColumnNames': [ 'string', ] } } perm_json {dict} -- (sample event below) Permissions=[ 'ALL'|'SELECT'|'ALTER'|'DROP'|'DELETE'|'INSERT'|'DESCRIBE'| 'CREATE_DATABASE'|'CREATE_TABLE'|'DATA_LOCATION_ACCESS'|'CREATE_TAG'| 'ALTER_TAG'|'DELETE_TAG'|'DESCRIBE_TAG'|'ASSOCIATE_TAG', ] perm_grant_json {dict} -- PermissionsWithGrantOption=[ 'ALL'|'SELECT'|'ALTER'|'DROP'|'DELETE'|'INSERT'|'DESCRIBE'| 'CREATE_DATABASE'|'CREATE_TABLE'|'DATA_LOCATION_ACCESS'|'CREATE_TAG'| 'ALTER_TAG'|'DELETE_TAG'|'DESCRIBE_TAG'|'ASSOCIATE_TAG', ] """ principal_json = {} table_json = {} tableWithColumns_json = {} perm_json = {} if 'Principal' in event: principal_json['DataLakePrincipalIdentifier'] = event['Principal'] else: raise LFAttributeError if 'Table' in event: if 'DatabaseName' not in event['Table']: raise LFAttributeError table_json['DatabaseName'] = event['Table']['DatabaseName'] # Need to create a env variable Foundations Account ID table_json['CatalogId'] = f_acc_id response = grant_db_describe(principal_json, table_json['DatabaseName']) if 'foundation_' in table_json['DatabaseName']: table_json['DatabaseName']=table_json['DatabaseName'].split('foundation_')[1] if 'Name' in event['Table']: table_json['Name'] = event['Table']['Name'] elif 'TableWildcard' in event['Table']: table_json['TableWildcard'] = event['Table']['TableWildcard'] else: raise LFAttributeError elif 'TableWithColumns' in event: if 'DatabaseName' not in event['TableWithColumns']: raise LFAttributeError tableWithColumns_json['DatabaseName'] = event['TableWithColumns']['DatabaseName'] tableWithColumns_json['CatalogId'] = f_acc_id response = grant_db_describe(principal_json, tableWithColumns_json['DatabaseName']) if 'foundation_' in tableWithColumns_json['DatabaseName']: tableWithColumns_json['DatabaseName']=tableWithColumns_json['DatabaseName'].split('foundation_')[1] if 'Name' not in event['TableWithColumns']: raise LFAttributeError tableWithColumns_json['Name'] = event['TableWithColumns']['Name'] if 'ColumnNames' in event['TableWithColumns']: tableWithColumns_json['ColumnNames'] = event['TableWithColumns']['ColumnNames'] elif 'ColumnWildcard' in event['TableWithColumns']: tableWithColumns_json['ColumnWildcard'] = event['TableWithColumns']['ColumnWildcard'] else: raise LFAttributeError else: raise LFAttributeError if 'Permissions' in event: perm_lit = ["SELECT"] if list(set(perm_lit) - set(event['Permissions'])): logger.info('Found permissions other than SELECT and DESCRIBE ignoring them') perm_json['Permissions'] = perm_lit else: perm_json['Permissions'] = event['Permissions'] else: LFAttributeError return principal_json, table_json, tableWithColumns_json, perm_json def grant_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json): """ Grants the specified permissions to the Pricncipal on the Respective resources Arguments: principal_json {dict} -- Principal which requries grant table_json {dict} -- Resource to grant permissions tableWithColumns_json {dict} -- Resource to grant permissions perm_json {dict} -- permissions that are applied to the resource Returns: response {dict} -- Response from Lakeformation API call """ logger.info('Granting Lakeformation Permissions ....') try: resource = {} if table_json: resource['Table'] = table_json elif tableWithColumns_json: resource['TableWithColumns'] = tableWithColumns_json client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) response= client.grant_permissions(Principal=principal_json, Resource=resource, Permissions=perm_json['Permissions']) logger.info('Grant permissions API response: {}'.format(response)) return response except Exception as e: logger.info("Grant permissions Method failed with exception {}".format(e)) raise e def revoke_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json): """ Revokes the specified permissions to the Pricncipal on the Respective resources Arguments: principal_json {dict} -- Principal which requries grant table_json {dict} -- Resource to grant permissions tableWithColumns_json {dict} -- Resource to grant permissions perm_json {dict} -- permissions that are applied to the resource Returns: response {dict} -- Response from Lakeformation API call """ logger.info('Revoking Lakeformation Permissions ....') try: resource = {} if table_json: resource['Table'] = table_json elif tableWithColumns_json: resource['TableWithColumns'] = tableWithColumns_json client = boto3.client('lakeformation', config=Config(connect_timeout=5, read_timeout=60, retries={'max_attempts': 20})) response= client.revoke_permissions(Principal=principal_json, Resource=resource, Permissions=perm_json['Permissions']) logger.info('Revoke permissions API response: {}'.format(response)) return response except Exception as e: logger.info("Revoke permissions Method failed with exception {}".format(e)) raise e def lambda_handler(event, context): try: logger.info('Received {} messages'.format(len(event['Records']))) logger.info('messages {}'.format(event)) for record in event['Records']: event_body = json.loads(json.loads(record['body'])['Message'])['perms_to_set'] logger.info('Processing Permissions for: {}'.format(event_body)) principal_json, table_json, tableWithColumns_json, perm_json = buildjson(event_body) logger.info('created permissions JSONs - principal json : {},table_json {},tableWithColumns_json {}, perm_json {} ' .format(principal_json, table_json, tableWithColumns_json, perm_json)) if event_body['AccessType'].lower() == 'grant': logger.info('Calling Grant permissions for {} on resource {} or {} permissions {}'.format(principal_json, table_json, tableWithColumns_json, perm_json)) response = grant_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json) elif event_body['AccessType'].lower() == 'revoke': logger.info('Calling Grant permissions for {} on resource {} or {} permissions {}'.format(principal_json, table_json, tableWithColumns_json, perm_json)) response = revoke_lf_permissions(principal_json, table_json, tableWithColumns_json, perm_json) else: raise LFAttributeError except Exception as e: logger.error("Fatal error", exc_info=True) raise e return LFPermissionsLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LFPermissionsLambda Principal: sqs.amazonaws.com SourceArn: !GetAtt LFPermissionsQueue.Arn LFEventSourceMapping: Type: AWS::Lambda::EventSourceMapping DependsOn: DatalakeAdminPolicy Properties: BatchSize: 1 FunctionName: !Ref LFPermissionsLambda EventSourceArn: !GetAtt LFPermissionsQueue.Arn RemoveDataCatalogDefaultSettingsLambda: Type: AWS::Lambda::Function Properties: Description: Custom Resource Lambda that loads the data bucket Handler: index.handler Runtime: python3.7 Timeout: 300 ReservedConcurrentExecutions: 5 Role: !GetAtt RemoveDataCatalogDefaultSettingsRole.Arn Code: ZipFile: !Sub | import os import boto3 import json import cfnresponse import urllib.request def handler(event, context): try: client = boto3.client('lakeformation') principals = [ 'arn:aws:iam::${AWS::AccountId}:role/${LFAdmin}', '${DatalakeAdminRole.Arn}' ] dladmins = [{'DataLakePrincipalIdentifier': principal} for principal in principals] response = client.put_data_lake_settings( DataLakeSettings={ 'DataLakeAdmins': dladmins, 'CreateDatabaseDefaultPermissions': [], 'CreateTableDefaultPermissions': [], } ) # signal cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, context.log_stream_name) except Exception as err: print("Error in Custom Resource", err) # signal cfnresponse.send(event, context, cfnresponse.FAILED, {}, context.log_stream_name) RemoveDataCatalogDefaultSettingsRole: Type: AWS::IAM::Role Properties: RoleName: "remove-data-catalog-default-settings" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: AllowLakeformationSettings PolicyDocument: Version: "2012-10-17" Statement: Effect: Allow Action: - lakeformation:GetDataLakeSettings - lakeformation:PutDataLakeSettings Resource: "*" - PolicyName: AllowBasicCloudwatchLogs PolicyDocument: Version: "2012-10-17" Statement: Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/RemoveDataCatalogDefault:*" RemoveDataCatalogDefaultSettings: Type: Custom::RemoveDataCatalogDefaultSettings Properties: ServiceToken: !GetAtt RemoveDataCatalogDefaultSettingsLambda.Arn DatalakeAdminRole: Type: AWS::IAM::Role Properties: RoleName: "lakeformation-admin" ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLakeFormationDataAdmin AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" DatalakeAdminPolicy: Type: AWS::IAM::Policy Properties: PolicyName: "lakeformation-permissions" Roles: - !Ref DatalakeAdminRole PolicyDocument: Version: "2012-10-17" Statement: - Sid: AllowSQS Effect: Allow Action: - sqs:ReceiveMessage - sqs:List* - sqs:Get* - sqs:Delete* Resource: !GetAtt LFPermissionsQueue.Arn - Sid: AllowLogs Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/lakeformation_permissions:*" - Sid: KMS Action: - kms:Decrypt Resource: !GetAtt DataCMK.Arn Effect: Allow DataCMK: Type: AWS::KMS::Key Properties: Description: key to encrypt manifest data EnableKeyRotation: true KeyPolicy: Version: '2012-10-17' Id: default Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: AWS: !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] Action: kms:* Resource: '*' - Sid: SNSPermissions Effect: Allow Principal: Service: "sns.amazonaws.com" Action: kms:* Resource: '*' LFPermissionsQueue: Type: AWS::SQS::Queue Properties: QueueName: lakeformation-permissions VisibilityTimeout: 300 KmsMasterKeyId: !Ref DataCMK RedrivePolicy: deadLetterTargetArn: !GetAtt LFPermissionsDLQ.Arn maxReceiveCount: 1 LFPermissionsDLQ: Type: AWS::SQS::Queue Properties: QueueName: lakeformation-permissions-dlq MessageRetentionPeriod: 1209600 VisibilityTimeout: 60 KmsMasterKeyId: !Ref DataCMK LFPermissionsQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: Queues: - !Ref LFPermissionsQueue PolicyDocument: Version: "2012-10-17" Id: "CrossAccount" Statement: - Sid: "Allow-xacct-messages-from-SNS" Effect: "Allow" Principal: Service: "sns.amazonaws.com" Action: "sqs:SendMessage" Resource: !GetAtt LFPermissionsQueue.Arn Condition: ArnEquals: aws:SourceArn: !Ref TopicArn TopicSubscription: Type: AWS::SNS::Subscription Properties: Endpoint: !GetAtt LFPermissionsQueue.Arn FilterPolicy: account_id: - !Ref AWS::AccountId Protocol: sqs TopicArn: !Ref TopicArn # Demo Resources # Remove if generalizing lfs3athenaoutput: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true lfs3athenaoutputPolicy: Type: 'AWS::S3::BucketPolicy' Properties: Bucket: !Ref lfs3athenaoutput PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:GetObject' - 's3:PutObject' - 's3:PutObjectAcl' Effect: Allow Resource: !Join - '' - - 'arn:aws:s3:::' - !Ref lfs3athenaoutput - /* Principal: AWS: - !Join [ ':', ['arn:aws:iam:', !Ref 'AWS::AccountId', 'root']] lfAthenaWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: LakeFormationCrossAccount Description: Workgroup to access Lake Formation Cross Account State: ENABLED WorkGroupConfiguration: ResultConfiguration: OutputLocation: !Sub "s3://${lfs3athenaoutput}/" # Demo IAM Users AdminUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLakeFormationCrossAccountManager - arn:aws:iam::aws:policy/AmazonEC2ReadOnlyAccess UserName: lf-admin Groups: - !Ref DefaultIAMUserGroup AdminUserPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: LF-DataLake-Admin-Policy Groups: - !Ref LFAdminGroup PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - lakeformation:UpdateResource - lakeformation:GetEffectivePermissionsForPath - lakeformation:BatchGrantPermissions - lakeformation:GetDataAccess - lakeformation:RegisterResource - lakeformation:ListPermissions - lakeformation:BatchRevokePermissions - lakeformation:ListResources - lakeformation:DescribeResource - lakeformation:GrantPermissions - lakeformation:GetDataLakeSettings - lakeformation:DeregisterResource - lakeformation:PutDataLakeSettings - lakeformation:RevokePermissions - cloudtrail:DescribeTrails - cloudtrail:LookupEvents - glue:GetDatabase - glue:CreateDatabase - glue:UpdateDatabase - glue:DeleteDatabase - glue:SearchTables - glue:CreateTable - glue:UpdateTable - glue:DeleteTable - glue:Get* - glue:List* - glue:BatchGetWorkflows - glue:DeleteWorkflow - glue:GetWorkflowRuns - glue:StartWorkflowRun - glue:GetWorkflow - s3:ListBucket - s3:GetBucketLocation - s3:ListAllMyBuckets - s3:GetBucketAcl - iam:ListUsers - iam:ListRoles - iam:GetRole - iam:GetRolePolicy - tag:Get* - glue:BatchGetCrawlers - ec2:AuthorizeSecurityGroupEgress - ec2:AuthorizeSecurityGroupIngress - ec2:RevokeSecurityGroupEgress - ec2:RevokeSecurityGroupIngress - iam:ChangePassword Resource: "*" - Effect: Allow Action: iam:PassRole Resource: - arn:aws:iam::*:role/LF-GlueServiceRole - arn:aws:iam::*:role/LF-EMR-Notebook* AthenaQueryResultPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: LF-Athena-Query-Result-Policy Groups: - !Ref DefaultIAMUserGroup PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:Put* - s3:Get* - s3:List*" Resource: !Sub "arn:aws:s3:::${lfs3athenaoutput}/*" - Effect: Allow Action: - iam:ChangePassword Resource: "*" DeveloperUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-developer Groups: - !Ref DefaultIAMUserGroup TaxiManagerUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-taxi-manager Groups: - !Ref DefaultIAMUserGroup BusinessAnalystUser: Type: AWS::IAM::User Properties: Path: "/" LoginProfile: Password: !Ref TestUserPassword PasswordResetRequired: true ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess UserName: lf-business-analyst Groups: - !Ref DefaultIAMUserGroup LFAdminGroup: Type: AWS::IAM::Group Properties: GroupName: lf-admins DefaultIAMUserGroup: Type: AWS::IAM::Group Properties: GroupName: lf-default-users