{ "cells": [ { "cell_type": "markdown", "id": "876b4937", "metadata": {}, "source": [ "# CV/ML Pipeline to extract highlights from videos using ML Models\n", "\n", "With this notebook you can create an end-to-end CV/ML Pipeline using [GStreamer](gstreamer.freedesktop.org/) and run ML models to extract information from the frames. We'll use a Person detection + Pose estimation model based on Yolov7 to identify and track people in video files. With Gstreamer we can combine multiple feeds/cameras and create a mosaic of images. This helps us to accelerate the process.\n", "\n", "First, deploy a pre-trained **Yolov7** to a SageMaker endpoint. Follow the instructions in [this notebook](01_Yolov7SageMakerInferentia.ipynb). Then, you can run this notebook." ] }, { "cell_type": "markdown", "id": "feef436c", "metadata": {}, "source": [ "## 1) Install dependencies" ] }, { "cell_type": "code", "execution_count": null, "id": "95e37a31", "metadata": {}, "outputs": [], "source": [ "# with this library we can build docker images and push them to ECR\n", "%pip install sagemaker-studio-image-build" ] }, { "cell_type": "markdown", "id": "f9b932b2", "metadata": {}, "source": [ "## 2) Initialize some variables" ] }, { "cell_type": "code", "execution_count": null, "id": "fa5cbb95", "metadata": { "scrolled": true }, "outputs": [], "source": [ "import os\n", "import io\n", "import boto3\n", "import tarfile\n", "import sagemaker\n", "\n", "sagemaker_session = sagemaker.Session()\n", "bucket = sagemaker_session.default_bucket()\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "region_name = sagemaker_session.boto_session.region_name\n", "\n", "image_name='gstreamer'\n", "image_tag=\"py3-1.0\"\n", "image_uri=f\"{account_id}.dkr.ecr.{region_name}.amazonaws.com/{image_name}:{image_tag}\"\n", "print(f'Custom docker image: {image_uri}')" ] }, { "cell_type": "markdown", "id": "0279f53b", "metadata": {}, "source": [ "## 3) Build a custom docker container with additional libraries\n", "**YOU DON\"T NEED TO RUN** this section if you already did that before" ] }, { "cell_type": "code", "execution_count": null, "id": "c8fca5e5", "metadata": {}, "outputs": [], "source": [ "!pygmentize container_02/Dockerfile" ] }, { "cell_type": "markdown", "id": "8298ebba", "metadata": { "scrolled": true }, "source": [ "### 3.1) Build and push the container image" ] }, { "cell_type": "code", "execution_count": null, "id": "242b1d6d", "metadata": {}, "outputs": [], "source": [ "!sm-docker build container_02/ --repository $image_name:$image_tag" ] }, { "cell_type": "markdown", "id": "260a05e0", "metadata": {}, "source": [ "## 4) Create an application for processing our videos\n", "This application will run inside a container executed by SageMaker Processing Jobs" ] }, { "cell_type": "markdown", "id": "ddc49c24", "metadata": {}, "source": [ "### 4.1) Tracker object that makes use of ByteTrack\n", "Source: https://github.com/ifzhang/ByteTrack \n", "This class assigns ids to detected objects and keeps track of them across multiple frames" ] }, { "cell_type": "code", "execution_count": null, "id": "44a95c38", "metadata": {}, "outputs": [], "source": [ "!pygmentize libs/tracker.py" ] }, { "cell_type": "markdown", "id": "7e0038a7", "metadata": {}, "source": [ "### 4.2) CV Pipeline that wraps a GStreamer pipeline\n", "Extend this class to create your own GStreamer pipeline solution" ] }, { "cell_type": "code", "execution_count": null, "id": "213faf51", "metadata": {}, "outputs": [], "source": [ "!pygmentize libs/cvpipeline.py" ] }, { "cell_type": "markdown", "id": "9062a172", "metadata": {}, "source": [ "### 4.3) SageMaker CV Pipeline\n", "Extends a CVPipeline and invokes a SageMaker Endpoint for each frame" ] }, { "cell_type": "code", "execution_count": null, "id": "39230ed6", "metadata": {}, "outputs": [], "source": [ "!pygmentize libs/smcvpipeline.py" ] }, { "cell_type": "markdown", "id": "31ee6845", "metadata": {}, "source": [ "### 4.4) Main application\n", "This script will parse all the parameters passed through SageMaker Processing jobs api and invoke the Gstreamer pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "2156a6e7", "metadata": {}, "outputs": [], "source": [ "!pygmentize code_02/pipeline.py" ] }, { "cell_type": "markdown", "id": "0c2ff4c8", "metadata": {}, "source": [ "### 4.5) Clone the correct version of ByteTrack\n", "This library is required when object tracking is enabled" ] }, { "cell_type": "code", "execution_count": null, "id": "875e59b8", "metadata": {}, "outputs": [], "source": [ "import os\n", "if not os.path.isdir('libs/bytetrack'):\n", " !git clone https://github.com/ifzhang/ByteTrack libs/bytetrack && \\\n", " cd libs/bytetrack && git checkout d1bf019" ] }, { "cell_type": "markdown", "id": "e4249e25", "metadata": {}, "source": [ "## 5) Kick-off a SageMaker Processing job to process all our video files" ] }, { "cell_type": "code", "execution_count": null, "id": "fd0d8a5c", "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.processing import ScriptProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.network import NetworkConfig\n", "\n", "sagemaker_session = sagemaker.Session()\n", "bucket = sagemaker_session.default_bucket()\n", "print(f\"s3://{bucket}/samples/\")" ] }, { "cell_type": "markdown", "id": "d34408e2", "metadata": {}, "source": [ "### 5.1) Upload your .mp4 files to S3\n", "If you don't have a video now and just want to run some tests, go to https://pixabay.com/videos/ or any other website which has video of people.\n", "\n", "Download the **.mp4** as 720p (1280x720) files and upload them to the S3 path printed in the last cell (above).\n", "\n", "Run the following command, then to make sure you uploaded the files:\n", "```bash\n", "aws s3 ls s3:///samples/ \n", "```" ] }, { "cell_type": "markdown", "id": "bf519d08", "metadata": {}, "source": [ "### 5.2) Finally run the Processing Job" ] }, { "cell_type": "code", "execution_count": null, "id": "9ca2f694", "metadata": {}, "outputs": [], "source": [ "import time\n", "script_processor = ScriptProcessor(\n", " base_job_name=f'cv-pipeline-{int(time.time()*1000)}',\n", " image_uri=image_uri,\n", " role=sagemaker.get_execution_role(),\n", " instance_type='ml.c5.xlarge',\n", " instance_count=1,\n", " max_runtime_in_seconds=60 * 30,\n", " command=[\"/home/ec2-user/entrypoint.sh\", \"python3\"],\n", " # for production it is important to define vpc_config and use a vpc_endpoint\n", " #vpc_config={\n", " # 'Subnets': ['', ''],\n", " # 'SecurityGroupIds': ['', '']\n", " #}\n", ")\n", "\n", "script_processor.run(\n", " code='code_02/pipeline.py',\n", " inputs=[\n", " # always keep this input in the first place to avoid\n", " # issues with the pipe name\n", " ProcessingInput(\n", " source=f's3://{bucket}/samples',\n", " destination='/opt/ml/processing/input/data', \n", " s3_input_mode='Pipe',\n", " s3_data_distribution_type='ShardedByS3Key'\n", " ),\n", " ProcessingInput(\n", " source='libs',\n", " destination='/opt/ml/processing/input/libs',\n", " s3_input_mode='File'\n", " ) \n", " ],\n", " outputs=[ProcessingOutput(\n", " source='/opt/ml/processing/output/predictions',\n", " destination=f's3://{bucket}/predictions/',\n", " s3_upload_mode='Continuous'\n", " )],\n", " arguments=[\n", " '--input-shape', '1280 720',\n", " '--endpoint-name', \"yolov7-pose-inferentia\",\n", " '--region-name', 'us-east-1'\n", " ]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "2836fa31", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 5 }