#!/usr/bin/env python3 # Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. # This file is licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR # CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. """ This Lambda initiates a new provisioning. Conditions to allow a provisioning are: * The 'userName' maps to a user allowed to perform provisioning and with a validated email address. * There are no existing provisioning for this 'deviceId' except in states 'failed' or 'denied' or 'timeout'. * A thing with this thingName doesn't exist. Actions executed: * Create an entry in the Database: * deviceId * thingName * currentState 'pending' * userName * email * Acknowledgement Expiration: 1 hour ahead * Email the User with 'approve' and 'deny' links * Return to the caller: * 200: {'transactionId': 'string'} * 4xx/5xx: {'details': 'string'} """ # Import the helper functions from the layer from ggi_lambda_utils import * # Other imports import sys import traceback import boto3 import os import json from uuid import uuid4 from datetime import datetime from urllib.parse import urlencode, quote # Cognito Configuration COG_GRP = os.environ.get("COGNITO_PROV_GROUP", "GreengrassProvisioningOperators") COG_USER_POOL_ID = os.environ.get("COGNITO_USER_POOL_ID") if not COG_USER_POOL_ID: raise Exception("Environment variable COGNITO_USER_POOL_ID missing") COG_URL = os.environ.get("COGNITO_URL") if not COG_URL: raise Exception("Environment variable COGNITO_URL missing") COG_C_NAME = os.environ.get("COGNITO_CLIENT_NAME") if not COG_C_NAME: raise Exception("Environment variable COGNITO_CLIENT_NAME missing") # DynamoDB configuration DDB_TABLE = os.environ.get("DYNAMO_TABLE_NAME") if not DDB_TABLE: raise Exception("Environment variable DYNAMO_TABLE_NAME missing") # API Gateway configuration OPS_ENDPOINT = "manage/request/" # SES Configuration SES_SENDER = os.environ.get("SES_SENDER_EMAIL") if not SES_SENDER: raise Exception("Environment variable SES_SENDER_EMAIL missing") # Set some boto3 clients cog_client = boto3.client('cognito-idp') iot_client = boto3.client('iot') ddb_client = boto3.client('dynamodb') ses_client = boto3.client("ses") def find_user_in_group(user_name: str, users: dict) -> dict: """ Find a user in a group :param user_name: user name :param users: the dictionary of users returned by Cognito :return: the user of an empty dict """ # logger.debug("Users received: {}".format(users)) for user in users: if user['Username'] == user_name: return user else: return {} return {} def get_user_from_group(user_name: str, pool_id: str = COG_USER_POOL_ID, group: str = COG_GRP) -> dict: """ Searches for a confirmed User in the specified group :param user_name: Username :param pool_id: Cognito pool ID :param group: Cognito group name :return: the User object or an empty dict if no match or not confirmed """ user = {} if not user_name: logger.warning("user_name is None - can't proceed!") return user resp = cog_client.list_users_in_group( UserPoolId=pool_id, GroupName=group ) user = find_user_in_group(user_name, resp.get('Users')) if not user: nextToken = resp.get('nextToken') while nextToken: resp = cog_client.list_users_in_group( UserPoolId=pool_id, GroupName=group, nextToken=nextToken ) user = find_user_in_group(user_name, resp.get('Users')) if not user: nextToken = resp.get('nextToken') else: break if user.get('UserStatus') == "CONFIRMED": logger.debug("Found User: '{}'".format(user)) return user else: return {} def get_user_email(user: dict) -> str: """ Returns the email from the User object :param user: User dictionary returned by Cognito :return: the email address """ email = "" for attr in user.get('Attributes'): if attr.get('Name') == "email": email = attr.get('Value') break logger.debug("User email: '{}'".format(email)) return email def ok_200(transaction_id: str) -> dict: """ Returns a 200 response with the transaction ID in the JSON body :param transaction_id: provisioning request transaction ID :return: response """ return { 'statusCode': 200, 'headers': {'Content-Type': "application.json"}, 'body': json.dumps({'transactionId': transaction_id}) } def bad_request(msg: str, status_code: int = 403) -> dict: """ Returns a 4xx response with Forbidden as default :param msg: Message to the user :param status_code: status code to use - default = 403 :return: response """ return { 'statusCode': status_code, 'headers': {'Content-Type': "application.json"}, 'body': json.dumps({'reason': msg}) } def internal_error(status_code: int = 500) -> dict: """ Something went wrong :param status_code: status code to use - default = 500 :return: response """ msg = "Something unexpected happened. Try again and contact support if the problem persists." return { 'statusCode': status_code, 'headers': {'Content-Type': "application.json"}, 'body': json.dumps({'reason': msg}) } def get_items_by_device_id(device_id: str, ddb_table_name: str = DDB_TABLE, index: str = "deviceId-transactionId-index"): """ Retrieves all the DynamoDB items that match the device_id :param device_id: the device identifier like the serial number :param ddb_table_name: name of the DynamoDB table :param index: name of the index to use :return: unmarshalled Items returned by DynamoDB """ resp = ddb_client.query( TableName=ddb_table_name, IndexName=index, Select='ALL_ATTRIBUTES', ReturnConsumedCapacity='NONE', ExpressionAttributeValues={ ':v1': { 'S': device_id, }, }, KeyConditionExpression="deviceId = :v1", ) return unmarshall(resp['Items']) def get_items_by_thing_name(thing_name: str, ddb_table_name: str = DDB_TABLE, index: str = "thingName-transactionId-index"): """ Retrieves all the DynamoDB items that match the thing_name :param thing_name: Iot Core Thing Name :param ddb_table_name: name of the DynamoDB table :param index: name of the index to use :return: unmarshalled Items returned by DynamoDB """ resp = ddb_client.query( TableName=ddb_table_name, IndexName=index, Select='ALL_ATTRIBUTES', ReturnConsumedCapacity='NONE', ExpressionAttributeValues={ ':v1': { 'S': thing_name, }, }, KeyConditionExpression="thingName = :v1", ) return unmarshall(resp['Items']) def new_xaction_record(thing_name: str, device_id: str, username: str, email: str) -> dict: """ Poulates a dictionary with the items required to create a new record in DynamoDB :param thing_name: Iot Core Thing Name :param device_id: device identifier like the serial number :param username: username as recorded in Cognito :param email: email address of the user :return: the populated dictionary """ now = datetime.utcnow().isoformat() return { 'transactionId': str(uuid4()), 'deviceId': device_id, 'thingName': thing_name, 'currentStatus': Status.PENDING.name, 'dateCreated': now, 'requester': {'username': username, 'email': email}, 'history': {now: { "action": "create", "previous_status": "NONE" } } } def send_email(transaction_id: str, device_id: str, thing_name: str, recipient: str, cog_cid: str, api_url: str, cog_url: str = COG_URL, source: str = SES_SENDER): """ Sends an email to the user containing links to allow or deny the provisioning request :param cog_url: Cognito domain URL :param cog_cid: Cognito Client ID :param transaction_id: provisioning request transaction ID :param device_id: device identifier like the serial number :param thing_name: Iot Core Thing Name :param recipient: email address 'to' :param api_url: the API Gateway URL where the user response will be sent :param source: the email sender :return: the response from SES (dict) """ auth_url = '{0}/login?client_id={1}&response_type=code'.format(cog_url, cog_cid) params = {'action': "allow", 'transactionId': transaction_id, 'deviceId': device_id} # Need to quote twice on top of urlencode to escape the & present in the state value which is decode by API Gateway state_allow = '&state={}'.format(quote(quote(urlencode(params)))) params['action'] = 'deny' state_deny = '&state={}'.format(quote(quote(urlencode(params)))) redirect = '&redirect_uri={0}/{1}'.format(api_url, OPS_ENDPOINT) data = 'The device {0} is requesting to be provisioned on AWS IoT as a Thing named {1}.
' \ 'Please allow or deny this request by clicking on one of the links below (log-in required):

' \ 'Allow this provisioning request.

' \ 'Deny this provisioning request.
'.format( device_id, thing_name, auth_url, state_allow, state_deny, redirect) body = { 'Html': { 'Charset': "UTF-8", 'Data': data } } subject = { 'Charset': "UTF-8", 'Data': "Device Provisioning Request for {}".format(device_id) } return ses_client.send_email( Destination={'ToAddresses': [recipient]}, Message={'Body': body, 'Subject': subject}, Source=source ) def lambda_handler(event, context): """ Check validity of the request, write a new transaction in DynamoDB and send an email to the user """ try: # Retrieve Query String Parameters user_name = event["queryStringParameters"].get('userName') thing_name = event["queryStringParameters"].get('thingName') device_id = event["queryStringParameters"].get('deviceId') logger.debug("Received query string parameters: userName = '{}', thingName = '{}', " "deviceId = '{}'".format(user_name, thing_name, device_id)) api_url = "https://{}/{}".format(event["requestContext"]["domainName"], event["requestContext"]["stage"]) cog_cid = get_cognito_client_id_from_name(cog_client=boto3.client('cognito-idp'), pool_id=COG_USER_POOL_ID, name=COG_C_NAME) if not cog_cid: logger.critical("Couldn't determine the Cognito Client ID from its name: {}".format(COG_C_NAME)) return internal_error() if is_valid_thing_name(thing_name) is not True: logger.warning("Invalid Thing Name requested: '{}'".format(thing_name)) return bad_request("Invalid Thing Name: {}".format(thing_name)) # Check User group membership and retrieve validated email address user = get_user_from_group(user_name=user_name) if not user: logger.warning("A confirmed User '{}' was not found in " "group '{}' of pool '{}'".format(user_name, COG_GRP, COG_USER_POOL_ID)) return bad_request("Request rejected") email = get_user_email(user) if not email: logger.warning("A email address could not be found for User '{}'".format(user_name)) return bad_request("Request rejected") # Check if the Thing Name or the Device ID already exist. if is_new_iot_thing(thing_name=thing_name, iot_client=iot_client) is False: return bad_request("A thing with name {} already exists".format(thing_name)) xactions = get_items_by_device_id(device_id=device_id) for xaction in xactions: if Status[xaction['currentStatus'].upper()] not in FAILED_XACTIONS: logger.warning( "Received new provisioning request for a Device already in progress: {}".format(device_id)) return bad_request("There is already a Transaction in progress") xactions = get_items_by_thing_name(thing_name=thing_name) for xaction in xactions: if Status[xaction['currentStatus'].upper()] not in FAILED_XACTIONS: logger.warning( "Received new provisioning request for a Thing already in progress: {}".format(thing_name)) return bad_request("There is already a Transaction in progress") # Write a new transaction in the DB xaction = new_xaction_record(thing_name=thing_name, device_id=device_id, username=user_name, email=email) logger.debug("Writing to DynamoDB: {}".format(xaction)) resp = ddb_client.put_item( TableName=DDB_TABLE, Item=marshall(xaction), ReturnConsumedCapacity='NONE', ) logger.debug("DynamoDB response: {}".format(resp)) # Email the Operator with Allow and Deny links. resp = send_email(transaction_id=xaction['transactionId'], device_id=device_id, thing_name=thing_name, recipient=email, cog_cid=cog_cid, api_url=api_url ) logger.debug("Email sending response: {}".format(resp)) return ok_200(xaction['transactionId']) except Exception as e: logger.error("Error during runtime: {}".format(e)) traceback.print_exc(file=sys.stdout) return internal_error()