# pylint: disable=R0914 # pylint: disable=C0301 # pylint: disable=W0612 """User management Lambda to manage connect users.""" import os import json import logging import boto3 import botocore LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) # boto3 service call CONNECT_CLIENT = boto3.client('connect') # Environment variable INSTANCE_ID = os.getenv("INSTANCE_ID") DEFAULT_ROUTING_PROFILE = os.getenv('DEFAULT_ROUTING_PROFILE') # The fuction to get connect user information def get_connect_user(userid): """To get Connect user info.""" user_info_list = userid.split("%3F") userid = user_info_list[0] externalId = user_info_list[1] user_found = {} try: LOGGER.info("Looking for %s in Connect instance %s...", userid, INSTANCE_ID) # noqa: E501 paginator = CONNECT_CLIENT.get_paginator('list_users') paginated_user_list = paginator.paginate( InstanceId=INSTANCE_ID, PaginationConfig={ } ) for page in paginated_user_list: for users in page['UserSummaryList']: if userid == users['Id']: user_info = CONNECT_CLIENT.describe_user(InstanceId=INSTANCE_ID, UserId=userid) # noqa: E501 user_found = { "Username": user_info['User']['Username'], "Id": users['Id'], "externalId": externalId, "FirstName": user_info['User']['IdentityInfo']['FirstName'], # noqa: E501 "LastName": user_info['User']['IdentityInfo']['LastName'] # noqa: E501 } return user_found except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while getting Connect user due to %s", error.response['Error']['Code']) # noqa: E501 raise error # The fuction to Create connect user based on SCIM payload def create_connect_user(body): """To create connect user based on scim payload.""" try: user_info = json.loads(body) user_name = user_info['userName'] first_name = user_info['name']['givenName'] last_name = user_info['name']['familyName'] security_profile = user_info["urn:ietf:params:scim:schemas:extension:enterprise:2.0:User"]["department"] # noqa: E501 security_profile_list = security_profile.split(',') sg_id_list = get_sg_id(security_profile_list) routing_id = get_routing_id(DEFAULT_ROUTING_PROFILE) LOGGER.info("The Security profile %s attached to user %s", sg_id_list, user_name) # noqa: E501 output = CONNECT_CLIENT.create_user( Username=user_name, IdentityInfo={ 'FirstName': first_name, 'LastName': last_name }, PhoneConfig={ 'PhoneType': 'SOFT_PHONE', 'AutoAccept': False, 'AfterContactWorkTimeLimit': 30 }, SecurityProfileIds=sg_id_list, RoutingProfileId=routing_id, InstanceId=INSTANCE_ID ) user_info['id'] = output['UserId'] + "?" + user_info["externalId"] return user_info except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while creating Connect user due to %s", error.response['Error']['Code']) # noqa: E501 raise error # The fuction to get connect security profile. def get_sg_id(security_profile): """To get security profile id.""" try: sg_id = [] security_list = CONNECT_CLIENT.list_security_profiles(InstanceId=INSTANCE_ID) # noqa: E501 for each_security_profile in security_profile: for profile in security_list['SecurityProfileSummaryList']: if each_security_profile == profile['Name']: sg_id.append(profile['Id']) return sg_id except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while getting SecurityProfile Id due to %s", error.response['Error']['Code']) # noqa: E501 raise error # The fuction to get connect Routing profile Id. def get_routing_id(routing_profile): """To get Routing profile id.""" try: routing_id = '' routing_profile_id = CONNECT_CLIENT.list_routing_profiles(InstanceId=INSTANCE_ID) # noqa: E501 for profile in routing_profile_id['RoutingProfileSummaryList']: if routing_profile == profile['Name']: routing_id = profile['Id'] return routing_id except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while getting Routing Profile Id due to %s", error.response['Error']['Code']) # noqa: E501 raise error # SCIM response for user not found user case. def user_not_found_response(): """To send scim response when user not found.""" return_response = { "id" : "urn:ietf:params:scim:api:messages:2.0:ListResponse", "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], "totalResults": 0, "Resources": [], "startIndex": 1, "itemsPerPage": 20 } return return_response # SCIM response to send when user exist in Connect. def build_scim_user(user): """To send scim response when user exist.""" LOGGER.info("The Existing user to build SCIM response %s", user) id = user["Id"] + "?" + user["externalId"] sg_entitlement = [] try: paginator = CONNECT_CLIENT.get_paginator('list_users') paginated_user_list = paginator.paginate(InstanceId=INSTANCE_ID, PaginationConfig={}) # noqa: E501 userid = user["Id"] for page in paginated_user_list: for users in page['UserSummaryList']: if userid == users['Id']: get_user_info = CONNECT_CLIENT.describe_user(UserId=userid, InstanceId=INSTANCE_ID) # noqa: E501 get_exist_sg_id = get_user_info['User']['SecurityProfileIds'] # noqa: E501 for each_sg_info in get_exist_sg_id: get_sg_detail = CONNECT_CLIENT.describe_security_profile( SecurityProfileId=each_sg_info, InstanceId=INSTANCE_ID ) get_sg_name = get_sg_detail['SecurityProfile']['SecurityProfileName'] # noqa: E501 sg_entitlement.append(get_sg_name) entitlement = ','.join(map(str, sg_entitlement)) user['department'] = entitlement scim_user = "{{\"schemas\":[\"urn:ietf:params:scim:schemas:core:2.0:User\", \"urn:ietf:params:scim:schemas:extension:enterprise:2.0:User\"], \"id\":\"{}\",\"externalId\":\"{}\",\"userName\":\"{}\",\"active\":true,\"meta\":{{\"resourceType\":\"User\"}},\"roles\":[],\"name\": {{\"familyName\":\"{}\",\"givenName\":\"{}\"}}, \"urn:ietf:params:scim:schemas:extension:enterprise:2.0:User\":{{\"department\":\"{}\"}}}}".format(id, user["externalId"], user["Username"], user["LastName"], user["FirstName"], user['department']) # noqa: E501 send_response = json.loads(scim_user) return_response = send_response return return_response except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while building scim response due to %s", error.response['Error']['Code']) # noqa: E501 raise error # SCIM dummy response for group request. def dummy_group_response(): """To send dummy group response.""" return_message = { "id": "urn:ietf:params:scim:api:messages:2.0:ListResponse", "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], "totalResults": 1, "Resources": [], "startIndex": 1, "itemsPerPage": 20 } return return_message # The function to update connect user during PUT request. def update_connect_user(userid, body): """To update connect user.""" user_info = json.loads(body) user_value = [] user_entity = [] for info in user_info['Operations']: user_value.append(info['value']) user_entity.append(info['path']) sg_entitlement = [] if "department" in user_entity: try: paginator = CONNECT_CLIENT.get_paginator('list_users') paginated_user_list = paginator.paginate(InstanceId=INSTANCE_ID, PaginationConfig={}) # noqa: E501 for page in paginated_user_list: for users in page['UserSummaryList']: if userid == users['Id']: get_updated_sg_info = get_sg_id(user_value) LOGGER.info("The updated list of security profile %s for the user %s", get_updated_sg_info, userid) # noqa: E501 CONNECT_CLIENT.update_user_security_profiles(SecurityProfileIds=get_updated_sg_info, # noqa: E501 UserId=userid, # noqa: E501 InstanceId=INSTANCE_ID) # noqa: E501 for each_sg_info in get_updated_sg_info: get_sg_detail = CONNECT_CLIENT.describe_security_profile(SecurityProfileId=each_sg_info, InstanceId=INSTANCE_ID) # noqa: E501 get_sg_name = get_sg_detail['SecurityProfile']['SecurityProfileName'] # noqa: E501 sg_entitlement.append(get_sg_name) user_info["department"] = sg_entitlement return user_info except botocore.exceptions.ClientError as error: LOGGER.error("Connect User Management Failure - Boto3 client error in UserManagementScimLambda while updating Connect user due to %s", error.response['Error']['Code']) # noqa: E501 raise error else: return user_info # Main Lambda function def lambda_handler(event, context): """The handler for the user management.""" LOGGER.info("Received event is %s", json.dumps(event)) LOGGER.info("Received context is %s", context) body = event['body'] method = event['httpMethod'] # Get method user management action if method == 'GET': uid = "" # No Group being used Azure integration if ('Groups' in event['pathParameters']['Users']): # noqa: E501 message = dummy_group_response() return { "statusCode": 200, "body": json.dumps(message), "headers": { 'Content-Type': 'application/json', } } if ('Users' in event['pathParameters']['Users']): # noqa: E501 uid = event["pathParameters"]["Users"].split("/")[-1] LOGGER.info("The user in the request is %s", uid) if uid == "Users": scim_user = user_not_found_response() LOGGER.info("Method:GET - SCIM User Response ==========> %s", json.dumps(scim_user)) # noqa: E501 return { 'statusCode': 200, 'body': json.dumps(scim_user), 'headers': { 'Content-Type': 'application/json', } } elif uid != "": user = get_connect_user(uid) if user: scim_user = build_scim_user(user) LOGGER.info("Method:GET for existing user - SCIM User Response ==========> %s", json.dumps(scim_user)) # noqa: E501 else: scim_user = user_not_found_response() LOGGER.info("Method:GET - SCIM User Response ==========> %s", json.dumps(scim_user)) # noqa: E501 return { 'statusCode': 200, 'body': json.dumps(scim_user), 'headers': { 'Content-Type': 'application/json', } } else: LOGGER.error("No user Id provided") # POST method user management action if method == 'POST': LOGGER.info("Method:POST - " + event['pathParameters']['Users']) if ('Groups' in event['pathParameters']['Users']): # noqa: E501 message = dummy_group_response() return { "statusCode": 200, "body": json.dumps(message), "headers": { 'Content-Type': 'application/json', } } LOGGER.info("Method:POST - Add User %s", body) user_to_create = create_connect_user(body) if user_to_create: LOGGER.info("Scim return response for POST ======> %s", json.dumps(user_to_create)) # noqa: E501 return { "statusCode": 200, "body": json.dumps(user_to_create), "headers": { 'Content-Type': 'application/json', } } else: return { "statusCode": 200, "body": '', "headers": { 'Content-Type': 'application/json', } } # PATCH method for Delete or update user in management action for Azure AD if method == 'PATCH': if ('Groups' in event['pathParameters']['Users']): # noqa: E501 message = dummy_group_response() return { "statusCode": 200, "body": json.dumps(message), "headers": { 'Content-Type': 'application/json', } } LOGGER.info("Method:PATCH - Update or Delete User %s", body) uid = '' patch_action = [] patch_value = [] if (event['pathParameters'] and event['pathParameters']['Users'] and event['pathParameters']['Users'] != 'Users'): # noqa: E501 uid = event["pathParameters"]["Users"].split("/")[-1] user_update_info = json.loads(body) for info in user_update_info['Operations']: patch_value.append(info['value']) patch_action.append(info['path']) # if user_path contains active then the request is to Delete User if 'active' in patch_action: if 'False' in patch_value: user = get_connect_user(uid) CONNECT_CLIENT.delete_user( InstanceId=INSTANCE_ID, UserId=user['Id'] ) id = user["Id"] + "?" + user["externalId"] scim_user = "{{\"schemas\":[\"urn:ietf:params:scim:schemas:core:2.0:User\", \"urn:ietf:params:scim:schemas:extension:enterprise:2.0:User\"], \"id\":\"{}\",\"externalId\":\"{}\",\"userName\":\"{}\",\"active\":false,\"meta\":{{\"resourceType\":\"User\"}},\"roles\":[],\"name\": {{\"familyName\":\"{}\",\"givenName\":\"{}\"}}}}".format(id, user["externalId"], user["Username"], user["LastName"], user["FirstName"]) # noqa: E501 scim_send_response = json.loads(scim_user) LOGGER.info("The SCIM retur response for PATCH %s", json.dumps(scim_send_response)) # noqa: E501 return { "statusCode": 200, "body": json.dumps(scim_send_response), "headers": { 'Content-Type': 'application/json', } } # if user_path does not contain active then the request is to update User # noqa: E501 else: user = get_connect_user(uid) user_update = update_connect_user(user['Id'], body) id = user["Id"] + "?" + user["externalId"] scim_user = "{{\"schemas\":[\"urn:ietf:params:scim:schemas:core:2.0:User\", \"urn:ietf:params:scim:schemas:extension:enterprise:2.0:User\"], \"id\":\"{}\",\"externalId\":\"{}\",\"userName\":\"{}\",\"active\":true,\"meta\":{{\"resourceType\":\"User\"}},\"roles\":[],\"name\": {{\"familyName\":\"{}\",\"givenName\":\"{}\"}}, \"urn:ietf:params:scim:schemas:extension:enterprise:2.0:User\":{{\"department\":\"{}\"}}}}".format(id, user["externalId"], user["Username"], user["LastName"], user["FirstName"], user_update['department']) # noqa: E501 send_response = json.loads(scim_user) LOGGER.info("The Scim return response for PATCH update ======> %s", json.dumps(send_response)) # noqa: E501 return { "statusCode": 200, "body": json.dumps(send_response), "headers": { 'Content-Type': 'application/json' } }