# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 ######################################################################################### ######################################################################################### import unittest import boto3 import logging import json import os from unittest import TestCase, mock from moto import mock_dynamodb, mock_s3 # This is to get around the relative path import issue. # Absolute paths are being used in this file after setting the root directory import sys from pathlib import Path file = Path(__file__).resolve() package_root_directory = file.parents[1] sys.path.append(str(package_root_directory)) sys.path.append(str(package_root_directory) + '/lambda_layers/lambda_layer_policy/python/') # Set log level loglevel = logging.INFO logging.basicConfig(level=loglevel) log = logging.getLogger(__name__) default_http_headers = { 'Access-Control-Allow-Origin': '*', 'Strict-Transport-Security': 'max-age=63072000; includeSubDomains; preload', 'Content-Security-Policy': "base-uri 'self'; upgrade-insecure-requests; default-src 'none'; object-src 'none'; connect-src none; img-src 'self' data:; script-src blob: 'self'; style-src 'self'; font-src 'self' data:; form-action 'self';" } test_script_records = [ { "package_uuid": { "S": "9bd96f83-8510-44a9-be5e-d34f20982143" }, "version": { "N": "0" }, "default": { "N": "1" }, "latest": { "N": "1" }, "script_arguments": { "L": [ { "M": { "description": { "S": "Argument 1" }, "group_order": { "S": "1" }, "long_desc": { "S": "Argument 1 long description." }, "name": { "S": "argument1" }, "required": { "BOOL": True }, "type": { "S": "string" } } } ] }, "script_dependencies": { "NULL": True }, "script_description": { "S": "This is a test script." }, "script_group": { "S": "Test" }, "script_masterfile": { "S": "test-masterfile.py" }, "script_name": { "S": "test" }, "script_update_url": { "NULL": True }, "version_id": { "S": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw" }, "_history": { "M": { "createdBy": { "M": { "email": { "S": "someone@amazon.co.uk" }, "userRef": { "S": "someone" } } }, "createdTimestamp": { "S": "2023-05-26T07:50:06.396959" }, "lastModifiedBy": { "M": { "email": { "S": "someone@amazon.com" }, "userRef": { "S": "someone" } } }, "lastModifiedTimestamp": { "S": "2023-05-26T08:01:35.441261" } } } } , { "package_uuid": { "S": "9bd96f83-8510-44a9-be5e-d34f20982143" }, "version": { "N": "1" }, "script_arguments": { "L": [ { "M": { "description": { "S": "Argument 1" }, "group_order": { "S": "1" }, "long_desc": { "S": "Argument 1 long description." }, "name": { "S": "argument1" }, "required": { "BOOL": True }, "type": { "S": "string" } } } ] }, "script_dependencies": { "NULL": True }, "script_description": { "S": "This is a test script." }, "script_group": { "S": "Cutover" }, "script_masterfile": { "S": "test-masterfile.py" }, "script_name": { "S": "test" }, "script_update_url": { "NULL": True }, "version_id": { "S": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw" }, "_history": { "M": { "createdBy": { "M": { "email": { "S": "someone@amazon.com" }, "userRef": { "S": "someone" } } }, "createdTimestamp": { "S": "2023-05-26T07:50:06.396959" } } } } ] test_script_base64 = "data:application/zip;base64," @mock.patch('lambda_ssm_scripts.item_validation') def mock_item_validation(): return {'action': 'allow'} scripts_table_name = '{}-{}-'.format('cmf', 'unittest') + 'ssm-scripts' scripts_s3_bucket_name = 'scripts_s3_bucket' # Setting the default AWS region environment variable required by the Python SDK boto3 @mock.patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1', 'region': 'us-east-1', 'application': 'cmf', 'environment': 'unittest', 'scripts_bucket_name': scripts_s3_bucket_name, 'scripts_table': scripts_table_name}) @mock_dynamodb @mock_s3 class LambdaSSMScriptsTest(TestCase): def setUp(self): # Setup dynamoDB tables and put items required for test cases boto3.setup_default_session() self.scripts_table_name = scripts_table_name # Creating schema table and creating schema item to test out schema types self.ddb_client = boto3.client("dynamodb", region_name='us-east-1') self.ddb_client.create_table( TableName=self.scripts_table_name, BillingMode='PAY_PER_REQUEST', KeySchema=[ {"AttributeName": "package_uuid", "KeyType": "HASH"}, {"AttributeName": "version", "KeyType": "RANGE"} ], AttributeDefinitions=[ {"AttributeName": "package_uuid", "AttributeType": "S"}, {"AttributeName": "version", "AttributeType": "N"}, ], GlobalSecondaryIndexes=[ {"IndexName": "version-index", "KeySchema": [ {"AttributeName": "version", "KeyType": "HASH"} ], "Projection": { "ProjectionType": "ALL"} } ] ) for item in test_script_records: self.ddb_client.put_item( TableName=self.scripts_table_name, Item=item) self.s3_client = boto3.client("s3", region_name='us-east-1') self.s3_client.create_bucket( Bucket=scripts_s3_bucket_name, ) self.s3_client.put_bucket_versioning( Bucket=scripts_s3_bucket_name, VersioningConfiguration={ 'Status': 'Enabled' }, ) def tearDown(self): """ Delete database resource and mock table """ print("Tearing down") self.ddb_client.delete_table(TableName=self.scripts_table_name) self.dynamodb = None print("Teardown complete") def mock_getUserAttributePolicy(self, event, schema): return {'action': 'allow'} def mock_getUserResourceCreationPolicy(self, event, schema): return {'action': 'allow', 'user': 'testuser@testuser'} def test_lambda_handler_scripts_get_all_default_versions(self): from lambda_functions.lambda_ssm_scripts import lambda_ssm_scripts self.event = {"httpMethod": 'GET', "pathParameters": None} log.info("Testing lambda_ssm_scripts GET default scripts") result = lambda_ssm_scripts.lambda_handler(self.event, '') data = result print("Result data: ", data) expected_result_data = [{"package_uuid": "9bd96f83-8510-44a9-be5e-d34f20982143", "version": "0", "default": "1", "latest": "1", "script_arguments": [{"description": "Argument 1", "group_order": "1", "long_desc": "Argument 1 long description.", "name": "argument1", "required": True, "type": "string"}], "script_dependencies": None, "script_description": "This is a test script.", "script_group": "Test", "script_masterfile": "test-masterfile.py", "script_name": "test", "script_update_url": None, "version_id": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw", "_history": {"createdBy": {"email": "someone@amazon.co.uk", "userRef": "someone"}, "createdTimestamp": "2023-05-26T07:50:06.396959", "lastModifiedBy": {"email": "someone@amazon.com", "userRef": "someone"}, "lastModifiedTimestamp": "2023-05-26T08:01:35.441261"}}] expected_response = {'headers': {**default_http_headers}, 'body': f"{json.dumps(expected_result_data)}"} self.assertEqual(data, expected_response) def test_lambda_handler_get_script_by_id_return_all_versions(self): from lambda_functions.lambda_ssm_scripts import lambda_ssm_scripts self.event = {"httpMethod": 'GET', "pathParameters": {"scriptid": "9bd96f83-8510-44a9-be5e-d34f20982143"}} log.info("Testing lambda_ssm_scripts GET specific script by ID and return all versions of the script.") result = lambda_ssm_scripts.lambda_handler(self.event, '') data = result print("Result data: ", data) expected_result_data = [{"package_uuid": "9bd96f83-8510-44a9-be5e-d34f20982143", "version": "0", "default": "1", "latest": "1", "script_arguments": [{"description": "Argument 1", "group_order": "1", "long_desc": "Argument 1 long description.", "name": "argument1", "required": True, "type": "string"}], "script_dependencies": None, "script_description": "This is a test script.", "script_group": "Test", "script_masterfile": "test-masterfile.py", "script_name": "test", "script_update_url": None, "version_id": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw", "_history": {"createdBy": {"email": "someone@amazon.co.uk", "userRef": "someone"}, "createdTimestamp": "2023-05-26T07:50:06.396959", "lastModifiedBy": {"email": "someone@amazon.com", "userRef": "someone"}, "lastModifiedTimestamp": "2023-05-26T08:01:35.441261"}}, {"package_uuid": "9bd96f83-8510-44a9-be5e-d34f20982143", "version": "1", "script_arguments": [{"description": "Argument 1", "group_order": "1", "long_desc": "Argument 1 long description.", "name": "argument1", "required": True, "type": "string"}], "script_dependencies": None, "script_description": "This is a test script.", "script_group": "Cutover", "script_masterfile": "test-masterfile.py", "script_name": "test", "script_update_url": None, "version_id": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw", "_history": {"createdBy": {"email": "someone@amazon.com", "userRef": "someone"}, "createdTimestamp": "2023-05-26T07:50:06.396959"}}] expected_response = {'headers': {**default_http_headers}, 'body': f"{json.dumps(expected_result_data)}"} self.assertEqual(data, expected_response) def test_lambda_handler_scripts_upload(self): from lambda_functions.lambda_ssm_scripts import lambda_ssm_scripts self.event = {"httpMethod": 'POST', "body": json.dumps({"script_file": test_script_base64, "script_name": "testscriptupload"})} log.info("Testing lambda_ssm_scripts PUT upload new script") result = lambda_ssm_scripts.lambda_handler(self.event, '') data = result print("Result data: ", data) self.assertEqual(data.get('statusCode'), 200) self.assertEqual(data.get('headers'), {**default_http_headers}) @mock.patch('policy.MFAuth.getUserResourceCreationPolicy', new=mock_getUserResourceCreationPolicy) def test_lambda_handler_scripts_upload_updated(self): from lambda_functions.lambda_ssm_scripts import lambda_ssm_scripts self.event = {"httpMethod": 'PUT', "body": json.dumps({"script_file": test_script_base64, "script_name": "testscriptupload", "action": "update_package"}), "pathParameters": {"scriptid": "9bd96f83-8510-44a9-be5e-d34f20982143"} } log.info("Testing lambda_ssm_scripts PUT upload updated script") result = lambda_ssm_scripts.lambda_handler(self.event, '') data = result print("Result data: ", data) expected_result_data = [ {"package_uuid": "9bd96f83-8510-44a9-be5e-d34f20982143", "version": "1", "script_arguments": [{"description": "Argument 1", "group_order": "1", "long_desc": "Argument 1 long descrption.", "name": "argument1", "required": True, "type": "string"}], "script_dependencies": None, "script_description": "This is a test script.", "script_group": "Cutover", "script_masterfile": "test-masterfile.py", "script_name": "test", "script_update_url": None, "version_id": "YWOD5U3Wn4XO4e8ApSSetwmtq0I_4maw", "_history": {"createdBy": {"email": "someone@amazon.com", "userRef": "someone"}, "createdTimestamp": "2023-05-26T07:50:06.396959"}}] self.assertEqual(data.get('statusCode'), 200) self.assertEqual(data.get('headers'), {**default_http_headers})