# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from array import array from base64 import b64encode from functools import reduce from hashlib import sha256 from random import randrange from amazon.ion.simpleion import loads HASH_LENGTH = 32 UPPER_BOUND = 8 def parse_proof(value_holder): """ Parse the Proof object returned by QLDB into an iterator. The Proof object returned by QLDB is a dictionary like the following: {'IonText': '[{{}},{{}}]'} :type value_holder: dict :param value_holder: A structure containing an Ion string value. :rtype: :py:class:`amazon.ion.simple_types.IonPyList` :return: A list of hash values. """ value_holder = value_holder.get('IonText') proof_list = loads(value_holder) return proof_list def parse_block(value_holder): """ Parse the Block object returned by QLDB and retrieve block hash. :type value_holder: dict :param value_holder: A structure containing an Ion string value. :rtype: :py:class:`amazon.ion.simple_types.IonPyBytes` :return: The block hash. """ value_holder = value_holder.get('IonText') block = loads(value_holder) block_hash = block.get('blockHash') return block_hash def flip_random_bit(original): """ Flip a single random bit in the given hash value. This method is used to demonstrate QLDB's verification features. :type original: bytes :param original: The hash value to alter. :rtype: bytes :return: The altered hash with a single random bit changed. """ assert len(original) != 0, 'Invalid bytes.' altered_position = randrange(len(original)) bit_shift = randrange(UPPER_BOUND) altered_hash = bytearray(original).copy() altered_hash[altered_position] = altered_hash[altered_position] ^ (1 << bit_shift) return bytes(altered_hash) def compare_hash_values(hash1, hash2): """ Compare two hash values by converting them into byte arrays, assuming they are little endian. :type hash1: bytes :param hash1: The hash value to compare. :type hash2: bytes :param hash2: The hash value to compare. :rtype: int :return: Zero if the hash values are equal, otherwise return the difference of the first pair of non-matching bytes. """ assert len(hash1) == HASH_LENGTH assert len(hash2) == HASH_LENGTH hash_array1 = array('b', hash1) hash_array2 = array('b', hash2) for i in range(len(hash_array1) - 1, -1, -1): difference = hash_array1[i] - hash_array2[i] if difference != 0: return difference return 0 def join_hash_pairwise(hash1, hash2): """ Take two hash values, sort them, concatenate them, and generate a new hash value from the concatenated values. :type hash1: bytes :param hash1: Hash value to concatenate. :type hash2: bytes :param hash2: Hash value to concatenate. :rtype: bytes :return: The new hash value generated from concatenated hash values. """ if len(hash1) == 0: return hash2 if len(hash2) == 0: return hash1 concatenated = hash1 + hash2 if compare_hash_values(hash1, hash2) < 0 else hash2 + hash1 new_hash_lib = sha256() new_hash_lib.update(concatenated) new_digest = new_hash_lib.digest() return new_digest def calculate_root_hash_from_internal_hashes(internal_hashes, leaf_hash): """ Combine the internal hashes and the leaf hash until only one root hash remains. :type internal_hashes: map :param internal_hashes: An iterable over a list of hash values. :type leaf_hash: bytes :param leaf_hash: The revision hash to pair with the first hash in the Proof hashes list. :rtype: bytes :return: The root hash constructed by combining internal hashes. """ root_hash = reduce(join_hash_pairwise, internal_hashes, leaf_hash) return root_hash def build_candidate_digest(proof, leaf_hash): """ Build the candidate digest representing the entire ledger from the Proof hashes. :type proof: dict :param proof: The Proof object. :type leaf_hash: bytes :param leaf_hash: The revision hash to pair with the first hash in the Proof hashes list. :rtype: bytes :return: The calculated root hash. """ parsed_proof = parse_proof(proof) root_hash = calculate_root_hash_from_internal_hashes(parsed_proof, leaf_hash) return root_hash def verify_document(document_hash, digest, proof): """ Verify document revision against the provided digest. :type document_hash: bytes :param document_hash: The SHA-256 value representing the document revision to be verified. :type digest: bytes :param digest: The SHA-256 hash value representing the ledger digest. :type proof: dict :param proof: The Proof object retrieved from :func:`pyqldbsamples.get_revision.get_revision`. :rtype: bool :return: If the document revision verify against the ledger digest. """ candidate_digest = build_candidate_digest(proof, document_hash) return digest == candidate_digest def to_base_64(input): """ Encode input in base64. :type input: bytes :param input: Input to be encoded. :rtype: string :return: Return input that has been encoded in base64. """ encoded_value = b64encode(input) return str(encoded_value, 'UTF-8')