# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from logging import basicConfig, getLogger, INFO from amazon.ion.simpleion import loads from boto3 import client from pyqldbsamples.constants import Constants from pyqldbsamples.get_digest import get_digest_result from pyqldbsamples.model.sample_data import SampleData, convert_object_to_ion from pyqldbsamples.qldb.block_address import block_address_to_dictionary from pyqldbsamples.verifier import verify_document, flip_random_bit, to_base_64 from pyqldbsamples.connect_to_ledger import create_qldb_driver from pyqldbsamples.qldb.qldb_string_utils import value_holder_to_string logger = getLogger(__name__) basicConfig(level=INFO) qldb_client = client('qldb') def get_revision(ledger_name, document_id, block_address, digest_tip_address): """ Get the revision data object for a specified document ID and block address. Also returns a proof of the specified revision for verification. :type ledger_name: str :param ledger_name: Name of the ledger containing the document to query. :type document_id: str :param document_id: Unique ID for the document to be verified, contained in the committed view of the document. :type block_address: dict :param block_address: The location of the block to request. :type digest_tip_address: dict :param digest_tip_address: The latest block location covered by the digest. :rtype: dict :return: The response of the request. """ result = qldb_client.get_revision(Name=ledger_name, BlockAddress=block_address, DocumentId=document_id, DigestTipAddress=digest_tip_address) return result def lookup_registration_for_vin(driver, vin): """ Query revision history for a particular vehicle for verification. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: VIN to query the revision history of a specific registration with. :rtype: :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor` :return: Cursor on the result set of the statement query. """ logger.info("Querying the 'VehicleRegistration' table for VIN: {}...".format(vin)) query = 'SELECT * FROM _ql_committed_VehicleRegistration WHERE data.VIN = ?' return driver.execute_lambda(lambda txn: txn.execute_statement(query, convert_object_to_ion(vin))) def verify_registration(driver, ledger_name, vin): """ Verify each version of the registration for the given VIN. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type ledger_name: str :param ledger_name: The ledger to get digest from. :type vin: str :param vin: VIN to query the revision history of a specific registration with. :raises AssertionError: When verification failed. """ logger.info("Let's verify the registration with VIN = {}, in ledger = {}.".format(vin, ledger_name)) digest = get_digest_result(ledger_name) digest_bytes = digest.get('Digest') digest_tip_address = digest.get('DigestTipAddress') logger.info('Got a ledger digest: digest tip address = {}, digest = {}.'.format( value_holder_to_string(digest_tip_address.get('IonText')), to_base_64(digest_bytes))) logger.info('Querying the registration with VIN = {} to verify each version of the registration...'.format(vin)) cursor = lookup_registration_for_vin(driver, vin) logger.info('Getting a proof for the document.') for row in cursor: block_address = row.get('blockAddress') document_id = row.get('metadata').get('id') result = get_revision(ledger_name, document_id, block_address_to_dictionary(block_address), digest_tip_address) revision = result.get('Revision').get('IonText') document_hash = loads(revision).get('hash') proof = result.get('Proof') logger.info('Got back a proof: {}.'.format(proof)) verified = verify_document(document_hash, digest_bytes, proof) if not verified: raise AssertionError('Document revision is not verified.') else: logger.info('Success! The document is verified.') altered_document_hash = flip_random_bit(document_hash) logger.info("Flipping one bit in the document's hash and assert that the document is NOT verified. " "The altered document hash is: {}.".format(to_base_64(altered_document_hash))) verified = verify_document(altered_document_hash, digest_bytes, proof) if verified: raise AssertionError('Expected altered document hash to not be verified against digest.') else: logger.info('Success! As expected flipping a bit in the document hash causes verification to fail.') logger.info('Finished verifying the registration with VIN = {} in ledger = {}.'.format(vin, ledger_name)) def main(ledger_name=Constants.LEDGER_NAME): """ Verify the integrity of a document revision in a QLDB ledger. """ registration = SampleData.VEHICLE_REGISTRATION[0] vin = registration['VIN'] try: with create_qldb_driver(ledger_name) as driver: verify_registration(driver, ledger_name, vin) except Exception as e: logger.exception('Unable to verify revision.') raise e if __name__ == '__main__': main()