Parameters: accountid: Type: String Default: "123456789012" AllowedPattern: '^[0-9]{12}$' Description: >- Enter the AWS Account ID of the account that you'd like to delegate as administrator for Identity Center delegate: Type: String Default: "true" AllowedValues: - "true" - "false" Description: >- Set to true if you'd like to delegate another account as a delegated administrator for Identity Center Resources: icDelegateAdminLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: lambda.amazonaws.com Version: "2012-10-17" ManagedPolicyArns: - Fn::Join: - "" - - "arn:" - !Ref AWS::Partition - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole icDelegateAdminPolicy: Type: AWS::IAM::Policy Properties: PolicyDocument: Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - organizations:DeregisterDelegatedAdministrator - organizations:ListDelegatedAdministrators - organizations:RegisterDelegatedAdministrator Effect: Allow Resource: "*" Version: "2012-10-17" PolicyName: icDelegateAdminPolicy Roles: - !Ref icDelegateAdminLambdaExecutionRole icDelegateAdminFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | from __future__ import print_function import boto3 import os import urllib3 import json import logging from time import sleep SUCCESS = "SUCCESS" FAILED = "FAILED" logger = logging.getLogger() logger.setLevel(logging.INFO) http = urllib3.PoolManager() def delegate_sso_admin(event, context): delegate = event["ResourceProperties"]["delegate"] print(f'Input value for delegate {delegate}') if delegate.lower() != 'true': print(f'Delegate: {delegate.lower()}. Delegation not requested') return account_id = event["ResourceProperties"]["account_id"] print(f'Input value for account ID to delegate AWS IC to: {account_id}') org_client = boto3.client('organizations') # List the delegated administrators for IC admins = org_client.list_delegated_administrators( ServicePrincipal='sso.amazonaws.com' ) sleep(0.1) # Check if any other accounts are delegated admins for IC other_admins = [admin['Id'] for admin in admins['DelegatedAdministrators'] if admin['Id'] != account_id] if other_admins: # Deregister the other delegated admins for IC for admin_id in other_admins: print('Deregistreing other delegated administrators for AWS IC') org_client.deregister_delegated_administrator( AccountId=admin_id, ServicePrincipal='sso.amazonaws.com' ) print(f'Deregistered {admin_id}') sleep(0.1) # Check if the specified account is already a delegated admin for IC found = False for admin in admins['DelegatedAdministrators']: if admin['Id'] == account_id: found = True print(f'{account_id} is already a delegated administrator for AWS IC') break if not found: # Delegate the specified account as an administrator for IC org_client.register_delegated_administrator( AccountId=account_id, ServicePrincipal='sso.amazonaws.com' ) print(f'Delegated {account_id} as administrator for AWS IC') sleep(0.1) def handler(event, context): logger.info(event) logger.debug(context) request_type = event['RequestType'] if (request_type == 'Create' or request_type == 'Update'): try: delegate_sso_admin(event, context) except Exception as e: send(event, context, "FAILED", {'Error': str(e)}) raise else: send(event, context, "SUCCESS", {}) elif request_type == 'Delete': send(event, context, "SUCCESS", {}) else: send(event, context, "SUCCESS", {}) def send(event, context, responseStatus, responseData, physicalResourceId=None, noEcho=False, reason=None): responseUrl = event['ResponseURL'] print(responseUrl) responseBody = { 'Status': responseStatus, 'Reason': reason or "See the details in CloudWatch Log Stream: {}".format(context.log_stream_name), 'PhysicalResourceId': physicalResourceId or context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'NoEcho': noEcho, 'Data': responseData } json_responseBody = json.dumps(responseBody) print("Response body:") print(json_responseBody) headers = { 'content-type': '', 'content-length': str(len(json_responseBody)) } try: response = http.request( 'PUT', responseUrl, headers=headers, body=json_responseBody) print("Status code:", response.status) except Exception as e: print("send(..) failed executing http.request(..):", e) Role: Fn::GetAtt: - icDelegateAdminLambdaExecutionRole - Arn FunctionName: delegate-IC-Admin Handler: index.handler MemorySize: 1024 Runtime: python3.8 Timeout: 30 customResourceLambdaInvoke: Type: Custom::AWS Properties: ServiceToken: Fn::GetAtt: - icDelegateAdminFunction - Arn delegate: !Sub ${delegate} account_id: !Sub ${accountid} physicalResourceId: JobSenderTriggerPhysicalId InstallLatestAwsSdk: true DependsOn: - customLambdaInvokePolicy UpdateReplacePolicy: Delete DeletionPolicy: Delete customLambdaInvokePolicy: Type: AWS::IAM::Policy Properties: PolicyDocument: Statement: - Action: lambda:InvokeFunction Effect: Allow Resource: Fn::GetAtt: - icDelegateAdminFunction - Arn Version: "2012-10-17" PolicyName: customLambdaInvokePolicy Roles: - !Ref icCustomResourceLambdaExecutionRole icCustomResourceLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: lambda.amazonaws.com Version: "2012-10-17" ManagedPolicyArns: - Fn::Join: - "" - - "arn:" - !Ref AWS::Partition - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole