# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Example showing creation and use of a RawMasterKeyProvider.""" import filecmp import os import aws_encryption_sdk from aws_encryption_sdk.identifiers import Algorithm, CommitmentPolicy, EncryptionKeyType, WrappingAlgorithm from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider class StaticRandomMasterKeyProvider(RawMasterKeyProvider): """Randomly generates 256-bit keys for each unique key ID.""" provider_id = "static-random" def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize empty map of keys.""" self._static_keys = {} def _get_raw_key(self, key_id): """Returns a static, randomly-generated symmetric key for the specified key ID. :param str key_id: Key ID :returns: Wrapping key that contains the specified static key :rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey` """ try: static_key = self._static_keys[key_id] except KeyError: static_key = os.urandom(32) self._static_keys[key_id] = static_key return WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=static_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC, ) def cycle_file(source_plaintext_filename): """Encrypts and then decrypts a file under a custom static master key provider. :param str source_plaintext_filename: Filename of file to encrypt """ # Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a # commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) # Create a static random master key provider key_id = os.urandom(8) master_key_provider = StaticRandomMasterKeyProvider() master_key_provider.add_master_key(key_id) ciphertext_filename = source_plaintext_filename + ".encrypted" cycled_plaintext_filename = source_plaintext_filename + ".decrypted" # Encrypt the plaintext source data # We can use an unsigning algorithm suite here under the assumption that the contexts that encrypt # and decrypt are equally trusted. with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext: with client.stream( algorithm=Algorithm.AES_256_GCM_HKDF_SHA512_COMMIT_KEY, mode="e", source=plaintext, key_provider=master_key_provider, ) as encryptor: for chunk in encryptor: ciphertext.write(chunk) # Decrypt the ciphertext # We can use the recommended "decrypt-unsigned" streaming mode since we encrypted with an unsigned algorithm suite. with open(ciphertext_filename, "rb") as ciphertext, open(cycled_plaintext_filename, "wb") as plaintext: with client.stream(mode="decrypt-unsigned", source=ciphertext, key_provider=master_key_provider) as decryptor: for chunk in decryptor: plaintext.write(chunk) # Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source # plaintext assert filecmp.cmp(source_plaintext_filename, cycled_plaintext_filename) # Verify that the encryption context used in the decrypt operation includes all key pairs from # the encrypt operation # # In production, always use a meaningful encryption context. In this sample, we omit the # encryption context (no key pairs). assert all( pair in decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items() ) return ciphertext_filename, cycled_plaintext_filename