import os import re import boto3 import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) PARAMETER_NAME = os.getenv("PARAMETER_NAME") def lambda_handler(event, context): LOGGER.info(event) LOGGER.info("Client token: " + event['authorizationToken']) LOGGER.info("Method ARN: " + event['methodArn']) # Get the Auth token based on IDP token = event['authorizationToken'].split(" ")[-1] principalId = 'user:test' tmp = event['methodArn'].split(':') apiGatewayArnTmp = tmp[5].split('/') awsAccountId = tmp[4] policy = AuthPolicy(principalId, awsAccountId) policy.restApiId = apiGatewayArnTmp[0] policy.region = tmp[3] policy.stage = apiGatewayArnTmp[1] # This is where a request would be sent to an external authentication system for token verification # noqa: E501 # For this demo, the token is verified if it is equal to 'allow' and other values are invalid # noqa: E501 # ** Read API key/ Secret key from Parameter store LOGGER.info("Read API key/ Secret key from Parameter store") try: ssm = boto3.client('ssm') myParameter = ssm.get_parameter(Name=PARAMETER_NAME, WithDecryption=False) # noqa: E501 if(token == ((myParameter['Parameter']['Value']))): policy.allowAllMethods() else: policy.denyAllMethods() # Finally, build the policy authResponse = policy.build() LOGGER.info(authResponse) return authResponse except Exception as e: LOGGER.error(f"Error returning response due to {e}") class HttpVerb: GET = 'GET' POST = 'POST' PUT = 'PUT' PATCH = 'PATCH' HEAD = 'HEAD' DELETE = 'DELETE' OPTIONS = 'OPTIONS' ALL = '*' class AuthPolicy(object): # The AWS account id the policy will be generated for. This is used to create the method ARNs. # noqa: E501 awsAccountId = '' # The principal used for the policy, this should be a unique identifier for the end user. # noqa: E501 principalId = '' # The policy version used for the evaluation. This should always be '2012-10-17' . # noqa: E501 version = '2012-10-17' # The regular expression used to validate resource paths for the policy pathRegex = '^[/.a-zA-Z0-9-\*]+$' '''Internal lists of allowed and denied methods. These are lists of objects and each object has 2 properties: A resource ARN and a nullable conditions statement. The build method processes these lists and generates the approriate statements for the final policy. ''' allowMethods = [] denyMethods = [] # The API Gateway API id. By default this is set to '*' restApiId = '*' # The region where the API is deployed. By default this is set to '*' region = '*' # The name of the stage used in the policy. By default this is set to '*' stage = '*' def __init__(self, principal, awsAccountId): self.awsAccountId = awsAccountId self.principalId = principal self.allowMethods = [] self.denyMethods = [] def _addMethod(self, effect, verb, resource, conditions): ''' Adds a method to the internal lists of allowed or denied methods. Each object in # noqa: E501 the internal list contains a resource ARN and a condition statement. The condition # noqa: E501 statement can be null. ''' if verb != '*' and not hasattr(HttpVerb, verb): raise NameError('Invalid HTTP verb ' + verb + '. Allowed verbs in HttpVerb class') # noqa: E501 resourcePattern = re.compile(self.pathRegex) if not resourcePattern.match(resource): raise NameError('Invalid resource path: ' + resource + '. Path should match ' + self.pathRegex) # noqa: E501 if resource[:1] == '/': resource = resource[1:] resourceArn = 'arn:aws:execute-api:{}:{}:{}/{}/{}/{}'.format(self.region, self.awsAccountId, self.restApiId, self.stage, verb, resource) # noqa: E501 if effect.lower() == 'allow': self.allowMethods.append({ 'resourceArn': resourceArn, 'conditions': conditions }) elif effect.lower() == 'deny': self.denyMethods.append({ 'resourceArn': resourceArn, 'conditions': conditions }) def _getEmptyStatement(self, effect): '''Returns an empty statement object prepopulated with the correct action and the # noqa: E501 desired effect.''' statement = { 'Action': 'execute-api:Invoke', 'Effect': effect[:1].upper() + effect[1:].lower(), 'Resource': [] } return statement def _getStatementForEffect(self, effect, methods): '''This function loops over an array of objects containing a resourceArn and conditions statement and generates the array of statements for the policy.''' # noqa: E501 statements = [] if len(methods) > 0: statement = self._getEmptyStatement(effect) for curMethod in methods: if curMethod['conditions'] is None or len(curMethod['conditions']) == 0: # noqa: E501 statement['Resource'].append(curMethod['resourceArn']) else: conditionalStatement = self._getEmptyStatement(effect) conditionalStatement['Resource'].append(curMethod['resourceArn']) # noqa: E501 conditionalStatement['Condition'] = curMethod['conditions'] statements.append(conditionalStatement) if statement['Resource']: statements.append(statement) return statements def allowAllMethods(self): '''Adds a '*' allow to the policy to authorize access to all methods of an API''' # noqa: E501 self._addMethod('Allow', HttpVerb.ALL, '*', []) def denyAllMethods(self): '''Adds a '*' allow to the policy to deny access to all methods of an API''' # noqa: E501 self._addMethod('Deny', HttpVerb.ALL, '*', []) def allowMethod(self, verb, resource): '''Adds an API Gateway method (Http verb + Resource path) to the list of allowed # noqa: E501 methods for the policy''' self._addMethod('Allow', verb, resource, []) def denyMethod(self, verb, resource): '''Adds an API Gateway method (Http verb + Resource path) to the list of denied methods for the policy''' self._addMethod('Deny', verb, resource, []) def allowMethodWithConditions(self, verb, resource, conditions): '''Adds an API Gateway method (Http verb + Resource path) to the list of allowed methods and includes a condition for the policy statement. More on AWS policy conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition''' self._addMethod('Allow', verb, resource, conditions) def denyMethodWithConditions(self, verb, resource, conditions): '''Adds an API Gateway method (Http verb + Resource path) to the list of denied methods and includes a condition for the policy statement. More on AWS policy conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition''' self._addMethod('Deny', verb, resource, conditions) def build(self): '''Generates the policy document based on the internal lists of allowed and denied conditions. This will generate a policy with two main statements for the effect: one statement for Allow and one statement for Deny. Methods that includes conditions will have their own statement in the policy.''' if ((self.allowMethods is None or len(self.allowMethods) == 0) and (self.denyMethods is None or len(self.denyMethods) == 0)): raise NameError('No statements defined for the policy') policy = { 'principalId': self.principalId, 'policyDocument': { 'Version': self.version, 'Statement': [] } } policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Allow', self.allowMethods)) policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Deny', self.denyMethods)) return policy