#!/bin/bash -xe echo '===START OF USER DATA===' exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1 echo 'Updating all the presently installed packages' sudo yum update -y echo 'Installing ffmpeg from the latest release build' cd /usr/local/bin mkdir ffmpeg && cd ffmpeg wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz tar -xf ffmpeg-release-amd64-static.tar.xz rm -rf ffmpeg-release-amd64-static.tar.xz for dir in ffmpeg*/ do if [ -d $dir ];then cp -a /usr/local/bin/ffmpeg/$dir/. /usr/local/bin/ffmpeg/; rm -rf $dir; break fi done echo 'Creating symlinks for ffmpeg and ffprobe' ln -s /usr/local/bin/ffmpeg/ffmpeg /usr/bin/ffmpeg ln -s /usr/local/bin/ffmpeg/ffprobe /usr/bin/ffprobe echo 'Installing incron via the epel-release package' sudo amazon-linux-extras install epel sudo yum install -y incron sudo rm -f /etc/incron.allow sudo -u ec2-user mkdir /home/ec2-user/ffmpeg_hls_output echo 'Populating send_to_s3.sh and ffmpeg_hls_streamer_poller.py files' sudo -u ec2-user touch /home/ec2-user/send_to_s3.sh sudo chmod +x /home/ec2-user/send_to_s3.sh cat <<EOT >> /home/ec2-user/send_to_s3.sh #!/bin/bash PASSED=\$2/\$1 if [[ -d \${PASSED} ]]; then echo "Not sending \$PASSED to S3 as it is a directory" >> /home/ec2-user/send_to_s3.log exit 0 elif [[ -f \${PASSED} ]]; then echo "Sending \$PASSED to S3" >> /home/ec2-user/send_to_s3.log IFS='/'; read -ra arrIN <<< "\$2" /usr/bin/aws s3 cp "\$PASSED" "s3://\$3/HLS/\${arrIN[4]}/\${arrIN[5]}/\${arrIN[6]}/\$1" >> /home/ec2-user/send_to_s3.log else echo "Not sending \$PASSED to S3 as it is neither a directory nor a file" >> /home/ec2-user/send_to_s3.log fi EOT sudo -u ec2-user touch /home/ec2-user/ffmpeg_hls_streamer_poller.py cat <<EOT >> /home/ec2-user/ffmpeg_hls_streamer_poller.py import os import sys import json import time import re import boto3 import ffmpeg import requests from requests_aws4auth import AWS4Auth AWS_REGION = "%%AWS_REGION%%" SQS_QUEUE_URL = "%%SQS_QUEUE_URL%%" SQS_WAIT_TIME_SECS = "%%SQS_WAIT_TIME_SECS%%" headers = { "Authorization": "%%HEADERS%%" } sqs = boto3.client('sqs', region_name=AWS_REGION) def get_master_manifest_url(playback_url): # Get the HLS Master Manifest URL print("Getting the HLS Master Manifest URL from the Playback API endpoint") response = requests.get(playback_url, headers=headers) return response.json()["url"] def get_endpoint_url_from_ssm(): ssm_client = boto3.client( 'ssm', region_name=AWS_REGION ) response = ssm_client.get_parameter( Name='/MRE/ControlPlane/EndpointURL', WithDecryption=True ) assert "Parameter" in response endpoint_url = response["Parameter"]["Value"] endpoint_url_regex = ".*.execute-api."+AWS_REGION+".amazonaws.com/api/.*" assert re.match(endpoint_url_regex, endpoint_url) return endpoint_url def put_event_status(event, program, status): print(f"Updating the status of Event '{event}' in Program '{program}' to '{status}'") path = f"/event/{event}/program/{program}/status/{status}" method = "PUT" session = boto3.Session() credentials = session.get_credentials() auth = AWS4Auth( credentials.access_key, credentials.secret_key, AWS_REGION, 'execute-api', session_token=credentials.token ) try: response = requests.request( method=method, url=get_endpoint_url_from_ssm() + path, verify=False, auth=auth ) response.raise_for_status() except requests.exceptions.RequestException as e: print(f"Encountered an error while invoking the control plane api: {str(e)}") raise Exception(e) else: return response def process_message(message): message_id = message["MessageId"] receipt_handle = message["ReceiptHandle"] message_body = json.loads(message["Body"]) program = message_body["Program"] event = message_body["Event"] profile = message_body["Profile"] hls_path = f"/home/ec2-user/ffmpeg_hls_output/{program}/{event}/{profile}" try: playback_url = message_body["PlaybackURL"] master_manifest_url = get_master_manifest_url(playback_url) # Probe the HLS Stream to get the corresponding video track for 1080p resolution print("Probing the HLS Stream to get 1080p video track information. If no 1080p video track is found, first video track will be chosen by default.") video_index = 0 gop = 0 probe = ffmpeg.probe(master_manifest_url, select_streams="v") if "streams" in probe: for index in range(len(probe["streams"])): width = probe["streams"][index]["width"] height = probe["streams"][index]["height"] frame_rate = probe["streams"][index]["avg_frame_rate"] if index == 0: gop = round(eval(frame_rate)) if width == 1920 and height == 1080: video_index = index gop = round(eval(frame_rate)) print(f"Found a 1080p video track at stream '{video_index}' with gop '{gop}'") break else: print("No video streams are available in the input HLS stream. Exiting.") sys.exit(0) # Create hls_path directory if it doesn't exist print(f"Creating the directory hierarchy '{hls_path}' to store HLS manifest and segment files outputted by FFMpeg") os.makedirs(hls_path, exist_ok=True) inp = ffmpeg.input(get_master_manifest_url(playback_url), re=None) # -re reads the input at native frame rate video = inp[f'v:{video_index}'] # Select the corresponding video track in the HLS stream audio = inp['a'] # Select all the audio tracks in the HLS stream process = ( ffmpeg .output( video, audio, f'{hls_path}/content.m3u8', # Name and Location of the content manifest file g=gop, # GOP size sc_threshold=0, # Disable scene change detection vcodec='copy', # Retain the source video codec (no transcoding) acodec='copy', # Retain the source audio codec (no transcoding) format='hls', hls_time=int("%%CHUNK_SIZE%%"), # HLS Segment size hls_playlist_type='vod', hls_flags='independent_segments', # Add #EXT-X-INDEPENDENT-SEGMENTS to the content manifest master_pl_name='master.m3u8', # Name of the master manifest file hls_segment_filename=f'{hls_path}/{program}_{event}_1_%05d.ts', # Name and Location of the .ts segments start_number=1 # Start the playlist sequence number (#EXT-X-MEDIA-SEQUENCE) from the specified number ) .run_async() # Asynchronously invoke the ffmpeg process (non-blocking) ) print("Started the FFMpeg process") except Exception: print(f"Encountered an exception while processing message with MessageId={message_id}") raise finally: print(f"Deleting message with MessageId={message_id} from the SQS queue") # Delete the SQS message sqs.delete_message( QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle ) while process is not None: retcode = process.poll() if retcode is not None: # done print(f"FFMpeg process exited with return code {retcode}") # Update the Event status to Complete put_event_status(event, program, "Complete") break else: # still running print("FFMpeg process is currently running") time.sleep(5) def main(): print(f"Starting to continuosly poll the SQS queue {SQS_QUEUE_URL} for messages") # Constantly perform Long polling on the SQS queue for messages while(True): response = sqs.receive_message( QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=1, WaitTimeSeconds=int(SQS_WAIT_TIME_SECS) ) if "Messages" in response: message = response["Messages"][0] print("Got 1 message from the queue") try: process_message(message) except Exception as e: print("There was an issue in processing the message. More details below:") print(str(e)) else: print("No messages found in the queue. Continuing to poll.") continue if __name__ == "__main__": main() EOT echo 'Adding an entry in incrontab to capture IN_CREATE events in /home/ec2-user/ffmpeg_hls_output directory' echo '/home/ec2-user/ffmpeg_hls_output IN_CREATE /home/ec2-user/send_to_s3.sh $# $@ %%DESTINATION_S3_BUCKET%%' | sudo -u ec2-user incrontab - echo 'Starting the incrond service' sudo systemctl start incrond echo 'Installing boto3, ffmpeg-python, and requests libraries required by the ffmpeg_hls_streamer_poller python script via pip' sudo -u ec2-user pip3 install --upgrade boto3 sudo -u ec2-user pip3 install --upgrade ffmpeg-python sudo -u ec2-user pip3 install --upgrade requests sudo -u ec2-user pip3 install --upgrade requests-aws4auth echo 'Starting the ffmpeg_hls_streamer_poller python script' sudo -u ec2-user nohup python3 -u /home/ec2-user/ffmpeg_hls_streamer_poller.py &> /home/ec2-user/ffmpeg_hls_streamer_poller.log & echo '===END OF USER DATA==='