from unittest import TestCase from parameterized import parameterized from samtranslator.model.eventsources.pull import DocumentDB from samtranslator.model.exceptions import InvalidEventException class DocumentDBEventSource(TestCase): def setUp(self): self.logical_id = "DocumentDBEvent" self.ddb_event_source = DocumentDB(self.logical_id) self.ddb_event_source.relative_id = "EventId" def test_get_policy_arn(self): source_arn = self.ddb_event_source.get_policy_arn() expected_source_arn = None self.assertEqual(source_arn, expected_source_arn) def test_get_policy_statements(self): self.ddb_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] self.ddb_event_source.Cluster = "CLUSTER_ARN" policy_statements = self.ddb_event_source.get_policy_statements() expected_policy_document = [ { "PolicyName": "SamAutoGeneratedDocumentDBPolicy", "PolicyDocument": { "Statement": [ { "Action": [ "secretsmanager:GetSecretValue", ], "Effect": "Allow", "Resource": "SECRET_URI", }, { "Action": [ "rds:DescribeDBClusterParameters", ], "Effect": "Allow", "Resource": { "Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-pg:*" }, }, { "Action": [ "rds:DescribeDBSubnetGroups", ], "Effect": "Allow", "Resource": { "Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:subgrp:*" }, }, { "Action": [ "rds:DescribeDBClusters", ], "Effect": "Allow", "Resource": "CLUSTER_ARN", }, { "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeVpcs", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups", ], "Effect": "Allow", "Resource": "*", }, ] }, } ] self.assertEqual(policy_statements, expected_policy_document) @parameterized.expand( [ (1,), (True,), (["1abc23d4-567f-8ab9-cde0-1fab234c5d67"],), ({"KmsKeyId": "1abc23d4-567f-8ab9-cde0-1fab234c5d67"},), ] ) def test_must_validate_secrets_manager_kms_key_id(self, kms_key_id_value): self.ddb_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] self.ddb_event_source.Cluster = "CLUSTER_ARN" self.ddb_event_source.SecretsManagerKmsKeyId = kms_key_id_value error_message = "('EventId', \"Property 'SecretsManagerKmsKeyId' should be a string.\")" with self.assertRaises(InvalidEventException) as error: self.ddb_event_source.get_policy_statements() self.assertEqual(error_message, str(error.exception)) def test_get_policy_statements_with_secrets_manager_kms_key_id(self): self.ddb_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] self.ddb_event_source.Cluster = "CLUSTER_ARN" self.ddb_event_source.SecretsManagerKmsKeyId = "1abc23d4-567f-8ab9-cde0-1fab234c5d67" policy_statements = self.ddb_event_source.get_policy_statements() expected_policy_document = [ { "PolicyName": "SamAutoGeneratedDocumentDBPolicy", "PolicyDocument": { "Statement": [ { "Action": [ "secretsmanager:GetSecretValue", ], "Effect": "Allow", "Resource": "SECRET_URI", }, { "Action": [ "rds:DescribeDBClusterParameters", ], "Effect": "Allow", "Resource": { "Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-pg:*" }, }, { "Action": [ "rds:DescribeDBSubnetGroups", ], "Effect": "Allow", "Resource": { "Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:subgrp:*" }, }, { "Action": [ "rds:DescribeDBClusters", ], "Effect": "Allow", "Resource": "CLUSTER_ARN", }, { "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeVpcs", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups", ], "Effect": "Allow", "Resource": "*", }, { "Action": [ "kms:Decrypt", ], "Effect": "Allow", "Resource": { "Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/1abc23d4-567f-8ab9-cde0-1fab234c5d67" }, }, ] }, } ] self.assertEqual(policy_statements, expected_policy_document) def test_must_raise_error_for_missing_source_access_configurations(self): self.ddb_event_source.Cluster = "CLUSTER_ARN" with self.assertRaises(InvalidEventException): self.ddb_event_source.get_policy_statements() def test_must_raise_error_for_unknown_source_access_configurations_type(self): test_credentials = [ [{"Type": "BASIC_AUT", "URI": "SECRET_URI"}], [{"Type": "SASL_SCRAM_256_AUT", "URI": "SECRET_URI"}], [{"Type": None, "URI": "SECRET_URI"}], [{"Type": "VPC_SUB", "URI": "SECRET_URI"}, {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}], [{"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, {"Type": None, "URI": None}], ] self.ddb_event_source.Cluster = "CLUSTER_ARN" for config in test_credentials: self.ddb_event_source.SourceAccessConfigurations = config with self.assertRaises(InvalidEventException): self.ddb_event_source.get_policy_statements() def test_must_raise_error_for_multiple_basic_auth(self): self.ddb_event_source.SourceAccessConfigurations = [ {"Type": "BASIC_AUT", "URI": "SECRET_URI"}, {"Type": "BASIC_AUT", "URI": "SECRET_URI2"}, ] self.ddb_event_source.Cluster = "CLUSTER_ARN" with self.assertRaises(InvalidEventException): self.ddb_event_source.get_policy_statements() def test_must_raise_error_for_no_source_access_configurations_uri(self): self.ddb_event_source.SourceAccessConfigurations = [ {"Type": "BASIC_AUTH"}, ] self.ddb_event_source.Cluster = "CLUSTER_ARN" with self.assertRaises(InvalidEventException): self.ddb_event_source.get_policy_statements()