#!/usr/bin/env python3
# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
Marks an existing Provisioning Request as allowed or denied by the user.
this is done after a thorough check of the various parameters because the entry point to this function is a link in
an email and could have been altered.
"""
# Import the helper functions from the layer
from ggi_lambda_utils import *
# Other imports
import os
import sys
import boto3
from uuid import UUID
import re
import traceback
from urllib.parse import parse_qs
# DynamoDB configuration
DDB_TABLE = os.environ.get("DYNAMO_TABLE_NAME")
if not DDB_TABLE:
raise Exception("Environment variable DYNAMO_TABLE_NAME missing")
# Set some boto3 clients
ddb_client = boto3.client('dynamodb')
def ok_200(msg: str) -> dict:
"""
Returns a 200 response with HTML page in body
:param msg: A message to the user
:return: An HTML document in the response dict
"""
page = '
Success
{}'.format(msg)
return {
'statusCode': 200,
'headers': {'Content-Type': "text/html; charset=UTF-8"},
'body': page
}
def accepted_202(msg: str) -> dict:
"""
Returns a 202 response with HTML page in body. To be used when the request was properly formatted but the
information provided does not allow to perform the expect operations
:param msg: A message to the user
:return: An HTML document in the response dict
"""
page = ' Error !!!
{}'.format(msg)
return {
'statusCode': 202,
'headers': {'Content-Type': "text/html; charset=UTF-8"},
'body': page
}
def bad_request_400(msg: str) -> dict:
"""
The request was improperly formatted or parameters are missing.
:param msg: A message to the user
:return: An HTML document in the response dict
"""
page = ' Error !!!
{}'.format(msg)
return {
'statusCode': 400,
'headers': {'Content-Type': "text/html; charset=UTF-8"},
'body': page
}
def error_500(msg: str = "Something went wrong on the Backend. Check the logs.") -> dict:
"""
Something went wrong.
:param msg: A message to the user
:return: An HTML document in the response dict
"""
page = ' Error !!!
{}'.format(msg)
return {
'statusCode': 500,
'headers': {'Content-Type': "text/html; charset=UTF-8"},
'body': page
}
def decode_state(event: dict) -> dict:
"""
Decode the query string parameters string embedded into the Cognito state query string parameter
:param event: the event object as passed to the handler
:return: a dictionary with the query string parameters included in the state parameter.
"""
qs = parse_qs(event['queryStringParameters'].get('state'))
logger.debug("Decoded State Params: {}".format(qs))
return {k: qs[k][0] for k in qs.keys()}
def get_authorizer_params(event: dict) -> dict:
"""
Retrieves teh parameters passed by the Lambda authorizer
:param event: the event object as passed to the handler
:return: a dictionary with the parameters
"""
to_retrieve = ['email', 'username']
params = event['requestContext'].get('authorizer')
d = {}
if params:
for p in to_retrieve:
d[p] = params[p]
# logger.debug("Authorizer Params: {}".format(d))
return d
def is_valid_state_params(params: dict) -> bool:
"""
State parameters are passed as Query String Parameters and could have been altered. This is a minimalistic
validation of those parameters
:param params: dictionary with the parameters
:return: True if valid or False
"""
good = True
# Check transactionId is a UUID version 4
try:
_ = UUID(params['transactionId'], version=4)
except ValueError:
logger.critical("The transactionId is not a valid uuid4. It could mean an attempt of code injection")
good = False
# Check if action is part of allowed list
actions_allowed = ['allow', 'deny']
if params['action'] not in actions_allowed:
logger.critical("The action does not a valid value: '{}'".format(params['action']))
good = False
return good
def is_same_requester(request: dict, username: str, email: str) -> bool:
"""
Compares the info included in the request to the expected username and email address
:param request: dictionary with username and email keys
:param username: the user name
:param email: the email address of this user
:return: True if it's a match or False
"""
return request['requester']['email'] == email and request['requester']['username'] == username
def lambda_handler(event, context):
"""
Updates the provisioning request transaction in the DB with new status, after validation of the various parameters
and transaction current status
:param event:
:param context:
:return:
"""
try:
# logger.debug("Event: {}".format(event))
logger.debug("Query String Params: {}".format(event['queryStringParameters']))
# Retrieve operational parameters
parameters = decode_state(event=event)
if is_valid_state_params(parameters) is False:
return bad_request_400("Process aborted due to invalid parameters. Check the logs for details.")
parameters = parameters | get_authorizer_params(event=event)
logger.debug("Used parameters: {}".format(parameters))
# Retrieve the request from the DB
prov_req = get_ddb_item(pkey='transactionId', pvalue=parameters['transactionId'],
skey='deviceId', svalue=parameters['deviceId'],
table=DDB_TABLE, ddb_client=ddb_client)
logger.debug("Record found in the DB: {}".format(prov_req))
if not prov_req:
msg = "A provisioning request with TransactionId '{}' " \
"and deviceId '{}' could not be found.".format(parameters['transactionId'],
parameters['deviceId'])
logger.warning(msg)
return accepted_202(msg)
# Validate the request ownership: the same user and email as when the request was recreated
if not is_same_requester(request=prov_req, username=parameters['username'], email=parameters['email']):
logger.critical("The user trying to allow/deny the request is not the original requester")
return bad_request_400("You are not allowed to take this action.")
# The request must be in status PENDING, ALLOWED or DENIED to be allowed or denied
if not prov_req['currentStatus'] in [Status.PENDING.name, Status.ALLOWED.name, Status.DENIED.name]:
logger.warning("Attempt to {} the quest {} while in status {}".format(parameters['action'],
parameters['transactionId'],
prov_req['currentStatus']))
return bad_request_400("Operation not allowed. See logs for details.")
# Update the record in the DB
new_status = Status.ALLOWED if parameters['action'] == "allow" else Status.DENIED
update_request_status(current_request=prov_req,
new_status=new_status.name,
action=parameters['action'],
table=DDB_TABLE,
ddb_client=ddb_client)
# Finally, send response to the User
return ok_200("The provisioning request with TransactionId '{}' "
"and deviceId '{}' has been {}.".format(parameters['transactionId'],
parameters['deviceId'], new_status.name))
except Exception as e:
logger.error("Error during runtime: {}".format(e))
traceback.print_exc(file=sys.stdout)
return error_500()