# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # dpkg; version 1.9.5 -- https://github.com/kbandla/dpkt # Copyright (c) 2004 Dug Song # All rights reserved, all wrongs reversed. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. The names of the authors and copyright holders may not be used to # endorse or promote products derived from this software without # specific prior written permission. # THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, # INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL # THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; # OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, # WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR # OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from this # software without specific prior written permission. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # deployment with dpkt dependencies # pip install --target ./lambda dpkt # cd lambda # zip -r ../lambda.zip . # cd .. # zip -g lambda.zip index.py import json import logging import boto3 import os import sys import dpkt import io from urllib.parse import unquote_plus RTSTPS_INSTANCE = False try: RTSTPS_INSTANCE = os.environ['RtstpsInstance'] except: pass logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) s3_client = boto3.client('s3') # ----------------------------- # Utility Bin Functions # ----------------------------- def getIntFromBin(data): return int.from_bytes(data, 'big') def convertMaskAndTrimBytesToBitString(data, mask, rightShift, length): try: bytes = int.from_bytes(data, r'big') # Convert to int output = (bytes & mask) >> rightShift # Mask + 3 right-shift # Right-Trim to the required length fmt = '0' + str(length) + 'b' # e.g. 04b / 08b output = format(output, fmt) return output except Exception as e: print(" (ERR) convertMaskAndTrimBytesToBitString error: %s" % e) return None def convertMaskAndTrimBytes(data, mask, rightShift): try: bytes = int.from_bytes(data, r'big') # Convert to int output = (bytes & mask) >> rightShift # Mask + 3 right-shift return output except Exception as e: print(" (ERR) convertMaskAndTrimBytesToBitString error: %s" % e) return None # ----------------------------- # Utility V49 Functions # ----------------------------- def isDataPacket(data): dataPacket = False binString = convertMaskAndTrimBytes(data, 0b11110000, 4) if binString is not None: dataPacket = binString == 0b0000 or binString == 0b0001 or binString == 0b0010 or binString == 0b0011 return dataPacket # ----------------------------- # Pcap / File Functions # ----------------------------- def writePayloadToFile(bufferData, outFile): try: f = open(outFile, "wb+") num_bytes_written = f.write(bufferData) f.flush() os.fsync(f.fileno()) f.close() #print("%d Bytes written to output file" % (num_bytes_written) ) except Exception as e: print("(ERR) File write error: %s" % e ) return 0 def extractPayload(infFile, outFile): numPackets = 0 inMemoryPayload = b'' inMemoryPayload = io.BytesIO(b'') # Cycle through each packet in the pcap for timestamp, buf in dpkt.pcap.Reader(open(infFile, 'rb')): eth = dpkt.ethernet.Ethernet(buf) if eth.type != dpkt.ethernet.ETH_TYPE_IP: print("Non-IP packet found, skipping..") continue # Check there is a data portion if eth.ip.data == '': continue # Get the data from the packet message_view = eth.ip.data.data numPackets += 1 #print() #print(f'[{numPackets}] Found {len(message_view)} Bytes in a packet') j = 0 message_size = len(message_view) # There may be more than one VRT payload in the packet, loop until all VRT payloads are processed while j < message_size: if isDataPacket(message_view[j:1]) is not True: print(f'[{numPackets}] Not a signal data frame, ignoring') continue v49_packet_size_bytes = getIntFromBin(message_view[j+2:j+4]) * 4 # Packet size is given in words. A word is 4 bytes. v49_packet_view = message_view[j:j+v49_packet_size_bytes] #print(f'[{numPackets}] V49 Packet size {v49_packet_size_bytes/4} Bytes') v49_packet_type = convertMaskAndTrimBytes(v49_packet_view[0:1], 0b11110000, 4) #print(f'[{numPackets}] V49 Packet Type {v49_packet_type}') is_stream_id_included = v49_packet_type == 0b0001 or v49_packet_type == 0b0011 or v49_packet_type == 0b0100 or v49_packet_type == 0b0101 is_class_id_included = convertMaskAndTrimBytes(v49_packet_view[0:1], 0b00001000, 3) != 0 is_trailer_included = convertMaskAndTrimBytes(v49_packet_view[0:1], 0b00000100, 2) != 0 is_timestamp_included = convertMaskAndTrimBytes(v49_packet_view[1:2], 0b11000000, 6) != 0 v49_start_of_payload_index = 4 # Mandatory 4 byte (1 word) VITA-49 header. v49_start_of_payload_index += 4 if is_stream_id_included else 0 # Optional 4 byte (1 word) stream ID field in header. v49_start_of_payload_index += 8 if is_class_id_included else 0 # Optional 8 byte (2 word) class ID fields in header v49_start_of_payload_index += 12 if is_timestamp_included else 0 # Optional 12 byte (3 word) timestamp fields in header v49_end_of_payload_index = v49_packet_size_bytes v49_end_of_payload_index -= 4 if is_trailer_included else 0 # Optional 4 byte trailer field as a footer. payloadData = message_view[j+v49_start_of_payload_index:j+v49_end_of_payload_index] #print(f'[{numPackets}] V49 Payload size {len(payloadData)} Bytes') inMemoryPayload.write(payloadData) j += v49_packet_size_bytes # Write payload to output file and clear inMemoryPayload bufferData = inMemoryPayload.getvalue() if numPackets > 1 and len(bufferData) > 0: logger.info(f'Writing {len(bufferData)} Bytes to file: {outFile}') writePayloadToFile(bufferData, outFile) inMemoryPayload = b'' def get_instance_state(ec2_client, instance_id): response = ec2_client.describe_instance_status(InstanceIds=[instance_id], IncludeAllInstances=True) instance_statuses = response.get("InstanceStatuses", {}) instance_state = instance_statuses[0].get("InstanceState", {}).get("Name") logger.info(f"Instance {instance_id} state is {instance_state}") return instance_state def handle_cloudwatch_event(lambda_event, context): logger.info(f"Called with event {json.dumps(lambda_event)}") ec2_client = boto3.client("ec2") bucket = lambda_event['detail']['bucketArn'].split(':::',1)[1] contact_ID = lambda_event['detail']['contactId'] key_format = lambda_event['detail']['keyFormat'] list_prefix = key_format.split(contact_ID,1)[0] + contact_ID key_list_responce = s3_client.list_objects_v2(Bucket=bucket, Prefix=list_prefix) # Remove empty guiding .pcap key_list_responce['Contents'].pop(0) for record in key_list_responce['Contents']: key = unquote_plus(record['Key']) # Download file tmpkey = key.replace('/', '') tmpInfile = f'/tmp/{tmpkey}' logger.info(f"Downloading S3 object s3://{bucket}/{key} to {tmpInfile}") s3_client.download_file(bucket, key, tmpInfile) # Extract the payload tmpOutfile = tmpInfile.replace('.pcap', '.bin') extractPayload(tmpInfile, tmpOutfile) # Upload file to S3 newKey = key.replace('pcap/', 'raw/') newKey = newKey.replace('.pcap', '.bin') logger.info(f"Putting file {tmpOutfile} to S3 s3://{bucket}/{newKey}") s3_client.upload_file(tmpOutfile, bucket, newKey) # Delete tmp files os.remove(tmpInfile) os.remove(tmpOutfile) # Tag the EC2 instance with the contact_ID ec2_client.create_tags(Resources=[RTSTPS_INSTANCE], Tags=[{'Key':'key-prefix', 'Value':list_prefix}]) if RTSTPS_INSTANCE: instance_state = get_instance_state(ec2_client, RTSTPS_INSTANCE) if instance_state == "stopped": logger.info(f"Starting instance {RTSTPS_INSTANCE}") ec2_client.start_instances(InstanceIds=[RTSTPS_INSTANCE])