# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0. ''' Amazon Kinesis Video Stream (KVS) Consumer Library for Python. This library parses streaming bytes (chunks) made available by the StreamingBody returned from calls to the KVS Media Client GetMedia and KVS Archive Media Client GetMediaForFragmentList API. The Amazon Kinesis Video Stream (KVS) Consumer Library for Python reads in streaming bytes as they become available and parses to individual MKV fragments. The library is threaded and non-blocking, once a stream is being read it forwards received MKV fragments to named call-backs in the users application. Fragments are returned as raw bytes and a searchable DOM like structure by parsing with EMBLite by MideTechnology. The consumer library provides the following functions to further process parsed MKV fragments: 1) get_fragment_tags(): Extract MKV tags from the fragment. 2) save_fragment_as_local_mkv(): Saves the fragment as stand-alone MKV file on local disk. 3) get_frames_as_ndarray(): Returns a ratio of frames in the fragment as a list of NDArray objects. 4) save_frames_as_jpeg(): Returns a ratio of frames in the fragment as a JPEGs to local disk. Workflow: 1) Define a on_fragment_arrived and on_read_stream_complete call-backs in user application logic. These to process fragments as they are received and to handle the parser reaching the end of the stream. (When no more fragments are left), 2) Initialize the KVS Media and / or Archive Media clients, 3) Make a call to KVS Media GetMedia and / or KVS Archive Media GetMediaForFragmentList for the given stream, 4) Initialize this KVS Consumer library and call get_streaming_fragements providing the response from the GetMedia or GetMediaForFragmentList call, 5) Fragments will then be parsed and delivered to the call-backs for processing as per the example code provided. Credits: # EMBLite by MideTechnology is an external EBML parser found at https://github.com/MideTechnology/ebmlite # For convenance a slightly modified version of EMBLite is shipped with the KvsConsumerLibrary but adding credit where its due. # EMBLite MIT License: https://github.com/MideTechnology/ebmlite/blob/development/LICENSE ''' __version__ = "0.0.1" __status__ = "Development" __copyright__ = "Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved." __author__ = "Dean Colcott <https://www.linkedin.com/in/deancolcott/>" import timeit import logging from threading import Thread from amazon_kinesis_video_consumer_library.ebmlite import loadSchema # Init the logger. log = logging.getLogger(__name__) class KvsConsumerLibrary(Thread): def __init__(self, stream_name, get_media_response_object, on_fragment_arrived, on_read_stream_complete, on_read_stream_exception): ''' Initialize the KVS media consumer library ''' # Call the Thread class's init function Thread.__init__(self) # Used to trigger graceful exit of this thread self._stop_get_media = False # Init the local vars. log.info('Initilizing KvsConsumerLibrary...') self.stream_name = stream_name self.get_media_response_object = get_media_response_object self.on_fragment_arrived_callback = on_fragment_arrived self.on_read_stream_complete_callback = on_read_stream_complete self.on_read_stream_exception = on_read_stream_exception log.info('Loading EBMLlite MKV Schema....') self.schema = loadSchema('matroska.xml') def _get_ebml_header_elements(self, fragement_dom): ''' Returns the EBML Header elements in the Fragment DOM. EBML Header elements indicate the start of a new fragment and so we use them to set the byte boundaries of individual fragments as they arrive in the raw data stream (chunks). ### Parameters: **fragment_dom**: ebmlite.core.Document <ebmlite.core.MatroskaDocument> The DOM like structure describing the fragment parsed by EBMLite. ''' ebml_header_elements = [] # Iterate through the fragment elements and capture any EBML Fragment headers (indicating the start of a new fragment) for element in fragement_dom: if (element.id == 0x1A45DFA3): # EBML (Master) element ID = 0x1A45DFA3 (440786851 dec) ebml_header_elements.append(element) return ebml_header_elements def _get_simple_block_elements(self, fragement_dom): ''' Returns the DOM SimpleBlock elements found in the fragment. SimpleBlock Elements store the payload of the MKV fragemeny - typically H.264/265 frames but can be any data playload that was ingested by the KVS producer. ### Parameters: **fragment_dom**: ebmlite.core.Document <ebmlite.core.MatroskaDocument> The DOM like structure describing the fragment parsed by EBMLite. ''' simple_block_elements = [] # Iterate through the fragment elements and capture any Simple Block type elements. # These carry the fragments payload bytes (typically image frames as raw bytes.) for element in fragement_dom: if (element.id == 0x18538067): # Segment element ID = 0x18538067 for segement_child in element: if (segement_child.id == 0x1F43B675): # Cluster element ID = 0x1F43B675 for cluster_child in segement_child: if (cluster_child.id == 0xA3): # SimpleBlock element ID = xA3 simple_block_elements.append(cluster_child) return simple_block_elements def stop_thread(self): self._stop_get_media = True #################################################### # Read and parse streaming media from a Kinesis Video Stream def run(self): ''' Reads in chunks (unframed number of raw bytes) from a KVS GetMedia or GetMediaForFragmentList Streaming Body response and parses into bounded MKV fragments. Raw data is buffered until a complete fragment is received which is then forwarded to the on_fragmemt_arrived callback. Fragment is delivered as a raw byte array and also a parsed EBMLite Document that is a DOM like structure of the elements (including Tags) within the given Fragment. Kinesis Video will continually update the streaming buffer with media as soon as its available. For StartSelectorType = NOW, bytes from the media stream will be available as fast as they arrive into Kinesis Video by the producer. In this case the consumer bandwidth and fragment rate will be equal to that of the producer. However, if StartSelector is set to sometime in the past then all fragments from start to end time will be available immediately. The effect is this will read in bytes as fast as the system resources (KVS limits, CPU and bandwidth) will allow until the stream has caught up with the leading edge of media being generated. ''' try: # Get the steam botocore.response.Streamingody object from the provided GetMedia response kvs_streaming_buffer=self.get_media_response_object['Payload'] ######################################### # Iterate through reading and parsing streaming body response of KVS GET Media API call to MKV fragments. ######################################### chunk_buffer = bytearray() fragment_read_start_time = timeit.default_timer() chunk_read_count = 0 # Uses the StreamingBody object iterator to read in (default 1024 byte) chunks from the streaming buffer. for chunk in kvs_streaming_buffer: if self._stop_get_media: break # Append chunk bytes to ByteArray buffer while waiting for the entire MKV fragment to arrive. chunk_buffer.extend(chunk) ############################################# # Parse current byte buffer to MKV EBML DOM like object using EBMLite ############################################# fragement_intrum_dom = self.schema.loads(chunk_buffer) ############################################# # Process a complete fragment if its arrived and send to the on_fragment_arrived callback. ############################################# # EBML header elements indicate the start of a new fragment. Here we check if the start of a second fragment # has arrived and use its start to identify the byte boundary of the first complete fragment to process. ebml_header_elements = self._get_ebml_header_elements(fragement_intrum_dom) # If multiple fragment headers then the first fragment has been received completely and ready to process. if (len(ebml_header_elements) > 1): # Get the offset for the first and second fragments. First fragment offset should be zero or fragment boundary is out of sync! first_ebml_header_offset = ebml_header_elements[0].offset second_ebml_header_offset = ebml_header_elements[1].offset # Isolate the bytes from the first complete MKV fragments in the received chunk data fragment_bytes = chunk_buffer[first_ebml_header_offset : second_ebml_header_offset] # Parse the complete fragment as EBML to a DOM like object fragment_dom = self.schema.loads(fragment_bytes) # Calculate duration taken receiving this fragment - just for telemetry of the steaming data. fragment_receive_duration = timeit.default_timer() - fragment_read_start_time # Forward fragment to the on_fragment_arrived callback. self.on_fragment_arrived_callback(self.stream_name, fragment_bytes, fragment_dom, fragment_receive_duration) # Remove the processed MKV segment from the raw byte chunk_buffer chunk_buffer = chunk_buffer[second_ebml_header_offset: ] # Reset the chunk read count. chunk_read_count = 0 # Reset the start time for the next segment iteration just to time fragment durations fragment_read_start_time = timeit.default_timer() ############################################# # Increment to chunk read count for this fragment chunk_read_count +=1 ############################################# # Exit the thread if the stream has no more chunks. ############################################# #call the on_stream_read_complete() callback and exit the thread. self.on_read_stream_complete_callback(self.stream_name) except Exception as err: # Pass any exceptions to exception callback. self.on_read_stream_exception(self.stream_name, err)