# # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify,merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # ''' Clear all AWS SSO users/groups/permission sets created by AWS Control Tower. ''' import logging import sys from time import sleep from botocore.exceptions import ClientError import boto3 ORG = boto3.client('organizations') STS = boto3.client('sts') SSS = boto3.client('s3') ID = boto3.client('identitystore') SSO = boto3.client('sso') SSO_ADMIN = boto3.client('sso-admin') REGION = SSS.meta.region_name CT_PS_LIST = ['AWSServiceCatalogAdminFullAccess', 'AWSReadOnlyAccess', 'AWSPowerUserAccess', 'AWSServiceCatalogEndUserAccess', 'AWSOrganizationsFullAccess', 'AWSAdministratorAccess'] CT_GROUPS = ['AWSAccountFactory', 'AWSAuditAccountAdmins', 'AWSControlTowerAdmins', 'AWSLogArchiveAdmins', 'AWSLogArchiveViewers', 'AWSSecurityAuditors', 'AWSSecurityAuditPowerUsers', 'AWSServiceCatalogAdmins'] LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) HANDLER = logging.StreamHandler(sys.stdout) HANDLER.setLevel(logging.DEBUG) LOGGER.addHandler(HANDLER) def get_organization_info(): ''' Return the organization information ''' try: output = ORG.describe_organization() return output except ClientError as exe: LOGGER.error('Unable to retrieve organization details: %s', str(exe)) def get_root_email_id(): ''' Return the email-id associated with the management account ''' output = get_organization_info() if 'Organization' in output: output = output['Organization']['MasterAccountEmail'] return output def get_root_account_id(): ''' Return the AWS account-id of the management account ''' output = get_organization_info() if 'Organization' in output: output = output['Organization']['MasterAccountId'] return output def get_org_id(): ''' Return the organization id ''' output = get_organization_info() if 'Organization' in output: output = output['Organization']['Id'] return output def get_account_id(): ''' Return the AWS account id where the method is executed ''' try: return STS.get_caller_identity()['Account'] except ClientError as exe: LOGGER.error('Unable to get account-id: %s', str(exe)) def list_accounts(value='Email'): '''Return list of all AWS accounts in an organization''' result = list() emails = list() try: paginator = ORG.get_paginator('list_accounts') iterator = paginator.paginate() except ClientError as exe: LOGGER.error('Unable to list instances %s', str(exe)) for page in iterator: result += page['Accounts'] for account in result: emails.append(account[value]) return emails def running_in_root(): ''' Return True if running in the management account ''' if get_account_id() == get_root_account_id(): return True def list_instances(): '''Return list of all instances''' result = list() try: paginator = SSO_ADMIN.get_paginator('list_instances') iterator = paginator.paginate() except ClientError as exe: LOGGER.error('Unable to list instances %s', str(exe)) for page in iterator: result += page['Instances'] return result def get_instance(keyname='IdentityStoreId'): ''' Return the value of KeyName ''' inst_list = list_instances() inst_count = len(inst_list) output = None if inst_count > 0: output = inst_list[-1][keyname] return output def generate_uid_map(): ''' Return user name to user-id mapping ''' result = dict() ids_id = get_instance() ct_users = list_accounts() for user in ct_users: filter_list = [{ "AttributePath": "UserName", "AttributeValue": user }] try: output = ID.list_users(IdentityStoreId=ids_id, Filters=filter_list) if len(output['Users']) > 0: u_id = output['Users'][-1]['UserId'] result[user] = u_id except ClientError as exe: LOGGER.error('Unable to ger user id for %s: %s', user, str(exe)) return result def generate_gid_map(): ''' Return group name to group-id mapping ''' ids_id = get_instance() result = dict() for group in CT_GROUPS: filters = [{ "AttributePath": "DisplayName", "AttributeValue": group }] try: output = ID.list_groups(IdentityStoreId=ids_id, Filters=filters) if len(output['Groups']) > 0: grp_id = output['Groups'][-1]['GroupId'] result[group] = grp_id except ClientError as exe: LOGGER.error('Unable to get group id: %s', str(exe)) return result def get_permission_set_list(): ''' Return list of permission set Arns ''' ps_list = list() inst_arn = get_instance('InstanceArn') try: paginator = SSO_ADMIN.get_paginator('list_permission_sets') iterator = paginator.paginate(InstanceArn=inst_arn) except ClientError as exe: LOGGER.error('Unable to list permission sets %s', str(exe)) for page in iterator: ps_list += page['PermissionSets'] return ps_list def generate_ps_map(): ''' Return permission set name to Arn mapping ''' ps_list = get_permission_set_list() inst_arn = get_instance('InstanceArn') result = dict() for ps_arn in ps_list: output = SSO_ADMIN.describe_permission_set( InstanceArn=inst_arn, PermissionSetArn=ps_arn) ps_name = output['PermissionSet']['Name'] result[ps_name] = ps_arn return result def get_permission_set_arn(ps_name): ''' Return permission set Arn of a permission set ''' ps_map = generate_ps_map() if ps_name in ps_map: return ps_map[ps_name] def get_accounts_for_ps(psname): ''' Return list of accounts assigned to a permission set ''' acct_list = list() inst_arn = get_instance('InstanceArn') ps_arn = get_permission_set_arn(psname) try: paginator = SSO_ADMIN.get_paginator( 'list_accounts_for_provisioned_permission_set') iterator = paginator.paginate(InstanceArn=inst_arn, PermissionSetArn=ps_arn) except ClientError as exe: LOGGER.error('Unable to list permission sets %s', str(exe)) for page in iterator: acct_list += page['AccountIds'] return acct_list def get_name(id_map, arn_name): ''' Return permission set for a given arn ''' return next((name for name, arn in id_map.items() if arn == arn_name), None) def deletion_completed(request_id): ''' Return True if completed ''' result = None inst_arn = get_instance('InstanceArn') try: output = SSO_ADMIN.describe_account_assignment_deletion_status( InstanceArn=inst_arn, AccountAssignmentDeletionRequestId=request_id) del_status = output['AccountAssignmentDeletionStatus'] result = del_status['Status'] if result != 'IN_PROGRESS': if result == 'FAILED': reason = del_status['FailureReason'] if 'Assignment not found' not in reason: LOGGER.info('Status: %s, Reason: %s', result, reason) return True except ClientError as exe: LOGGER.error('Unable to get deletion status: %s', str(exe)) def delete_permissionset_arn(ps_arn): ''' Delete a permission set and return result ''' result = False inst_arn = get_instance('InstanceArn') try: result = SSO_ADMIN.delete_permission_set(InstanceArn=inst_arn, PermissionSetArn=ps_arn) LOGGER.info('Deleted permission set: %s', result) except ClientError as exe: LOGGER.error('Unable to delete permission set: %s', str(exe)) return result def get_account_assignments(account_id, ps_arn): '''Return list of all AWS accounts in an organization''' result = list() inst_arn = get_instance('InstanceArn') try: paginator = SSO_ADMIN.get_paginator('list_account_assignments') iterator = paginator.paginate(InstanceArn=inst_arn, AccountId=account_id, PermissionSetArn=ps_arn) except ClientError as exe: LOGGER.error('Unable to list assignments %s', str(exe)) for page in iterator: result += page['AccountAssignments'] return result def generate_assignments(): ''' Return account assignment details ''' acc_list = list() for account in list_accounts('Id'): for ps_name in CT_PS_LIST: ps_arn = get_permission_set_arn(ps_name) if ps_arn: result = get_account_assignments(account, ps_arn) acc_list.extend(result) return acc_list def delete_assignment(target_id, ps_arn, p_id, p_type): ''' Delete mapping between Account / Group or User / Permission Set ''' result = None inst_arn = get_instance('InstanceArn') retry_count = 0 throttle_retry = True ps_map = generate_ps_map() ps_name = get_name(ps_map, ps_arn) if p_type == 'GROUP': id_map = generate_gid_map() else: id_map = generate_uid_map() pid_name = get_name(id_map, p_id) while throttle_retry and retry_count < 5: try: result = SSO_ADMIN.delete_account_assignment( InstanceArn=inst_arn, TargetId=target_id, TargetType='AWS_ACCOUNT', PermissionSetArn=ps_arn, PrincipalType=p_type, PrincipalId=p_id) LOGGER.info('Deleted Assignment: %s, %s, %s, %s', target_id, ps_name, p_type, pid_name) throttle_retry = False except ClientError as exe: error_msg = str(exe.response['Error']['Code']) if error_msg == 'ThrottlingException': retry_count += 1 else: LOGGER.error('Unable to delete an assignment: %s', str(exe)) return result def clear_assignments(assignment): ''' Clear assignments ''' account = assignment['AccountId'] ps_arn = assignment['PermissionSetArn'] p_id = assignment['PrincipalId'] p_type = assignment['PrincipalType'] output = delete_assignment(account, ps_arn, p_id, p_type) if output: request_id = output['AccountAssignmentDeletionStatus']['RequestId'] while not deletion_completed(request_id): sleep(2) def delete_sso_mappings(): ''' Delete all AWS SSO assginments created by AWS Control Tower ''' assignments = generate_assignments() for item in assignments: clear_assignments(item) def lambda_handler(event, context): ''' Clear Control Tower Mappings ''' LOGGER.info('EVENT: %s', event) LOGGER.info('CONTEXT: %s', context) event_details = event['detail'] event_name = event_details['eventName'] srvevent_details = event_details['serviceEventDetails'] if event_name == 'UpdateLandingZone': update_info = srvevent_details['updateLandingZoneStatus'] update_status = update_info['state'] if update_status == 'SUCCEEDED': delete_sso_mappings() sleep(10) for item in CT_PS_LIST: ps_name = get_permission_set_arn(item) if ps_name: delete_permissionset_arn(ps_name) else: LOGGER.error('Unsucessful Event Recieved. SKIPPING: %s', event)