{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\n", "\n", "SPDX-License-Identifier: MIT-0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Video metadata extraction and knowledge graph workshop" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Objectives:\n", "This repository contains a series of 4 jupyter notebooks demonstrating how AWS AI Services like Amazon Rekognition, Amazon Transcribe and Amazon Comprehend can help you extract valuable metadata from your video assets and store that information in a Graph database like Amazon Neptune for maximum query performance and flexibility.\n", "At the end of the workshop you'll typically be able to search for a specific label or entity and return a list of 1min video segments related to your search across your videos.\n", "\n", "To extract metadata from a video, we'll use a the following AWS AI services:\n", "- Amazon Rekognition to cut the video in scenes and detect label from the video itself\n", "- Amazon Transcribe to convert audio into text\n", "- Amazon Comprehend to extract entities and topics from the transcribed text via Topic Modelling and Named Entity recognition.\n", "\n", "The metadata related to the video, segments, scenes, entities, labels will be stored in Amazon Neptune.\n", "Amazon Neptune is a fully managed low latency graph database service that will allow us to store metadata as nodes (aka vertices) and branches (aka edges) to represent relationships between the nodes.\n", "https://aws.amazon.com/neptune/\n", "\n", "The diagram below summarises the workflow:\n", "\n", "![Overall workflow](../static/overview.png \"Overall workflow\")\n", "\n", "Topics addressed within the different notebooks:\n", "\n", "Part 0:
\n", "Create the environment (S3 bucket, IAM roles/polices, SNS topic, etc) and upload your sample video\n", "\n", "Part 1:
\n", "Use Amazon Rekognition to detect scenes and labels from your video\n", "\n", "Part 2:
\n", "Use Amazon Transcribe and Amazon Comprehend to respectively transcibe audio to text and extract metadata (topics, Named Entities) from transcripts.\n", "\n", "Part 3:
\n", "Store all the previously extracted metadata in Amazon Neptune and query the graph.\n", "\n", "Part 4:
\n", "Resources clean-up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Costs\n", "Please note that you might incur costs by running those notebooks. Most of those AI services have free tier but depending on how much you've already used or depending on the size of the video assets you're using, it might go over the limit.\n", "\n", "Finally, if you're not planning to use those resources anymore at the end of the workshop, don't forget to shutdown/delete your Amazon Neptune instance, your Sagemaker studio notebook instances and run the part4-cleanup notebook to delete all the other resources created throughout the notebooks (S3 buckets, IAM roles, SNS topics, etc).\n", "\n", "Before proceeding, please check the related services pricing pages:\n", "\n", "https://aws.amazon.com/transcribe/pricing/\n", "\n", "https://aws.amazon.com/comprehend/pricing/\n", "\n", "https://aws.amazon.com/rekognition/pricing/\n", "\n", "https://aws.amazon.com/neptune/pricing/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Part 0 - Environment setup - S3 Bucket creation, SNS topic and IAM role" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the steps below we're going to create the S3 bucket where we'll upload our video, the SNS topic that some AWS services will use to publish outcomes of the jobs as well as the required policies/roles for the various AWS services to access those objects.
\n", "\n", "Please note that you will need to provide an valid .mp4 video stored in a S3 bucket as input for this workshop. It is NOT included in the github repo assets. \n", "\n", "This video will be used for the different metadata extraction steps. We suggest you use ~5min editorial video or video trailer for which you have the required copyrights.\n", "\n", "The example we used to run the various jobs and generate the graphs is a video trailer from an Amazon Studios production." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install boto3\n", "!pip install sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "import random\n", "import json\n", "import time\n", "import os\n", "import shutil\n", "import logging\n", "import sys\n", "logging.basicConfig(format='%(asctime)s | %(levelname)s : %(message)s',\n", " level=logging.INFO, stream=sys.stdout)\n", "log = logging.getLogger('knowledge-graph-logger')\n", "\n", "s3 = boto3.client('s3')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "IMPORTANT:
\n", "\n", "Make sure before you start executing this notebook that the execution role you've configured for your notebook or studio instance has the following permissions:\n", "- read/write permission to your S3 buckets\n", "- IAM permission to create the policy/role\n", "- SNS permission to create a SNS topic\n", "- permissions to invoke Amazon Rekognition, Amazon Comprehend, Amazon Transcribe APIs (e.g. AmazonRekognitionFullAccess, ComprehendFullAccess, AmazonTranscribeFullAccess)\n", "\n", "You'll get \"AuthorizationErrorException\" messages otherwise." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "\n", "#get sagemaker execution role Arn\n", "sagemaker_role = sagemaker.get_execution_role()\n", "\n", "#get the role's name\n", "sagemaker_role_name = sagemaker_role.split('/')[-1]\n", "\n", "print(f'sagemaker role name:{sagemaker_role_name} \\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The below cell will list all managed iam policies associated with your sagemaker execution role. Check that it has the required permission before proceeding. Note that this cell will not run if your sagemaker execution role doesn't have the required IAM rights." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#retrieve associated managed iam policies\n", "paginator = iam.list_attached_role_policies(RoleName=sagemaker_role_name)\n", "\n", "#listing\n", "for policy in paginator['AttachedPolicies']:\n", " print(policy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SNS topic creation\n", "We're creating a simple topic that will later be used by Amazon Rekognition notably to publish the outcome/status of the video analysis jobs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sns = boto3.client('sns')\n", "\n", "def create_sns_topic(name):\n", " try:\n", " topic = sns.create_topic(Name=name)\n", " except:\n", " log.exception(\"Couldn't create topic %s.\", name)\n", " raise\n", " else:\n", " return topic['TopicArn']\n", " \n", "sns_topic_arn = create_sns_topic('knowledge-graph-lab-rek-sns-topic')\n", "\n", "print(sns_topic_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### S3 bucket creation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Amazon S3 bucket names are globally unique. To create a unique bucket name, we're appending your account ID and a random int at the end of the bucket name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "region = 'ap-southeast-2' #specify the region of your choice\n", "\n", "#retrieving your account ID\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "#bucket name\n", "bucket = 'sagemaker-knowledge-graph-' + region + '-' + account_id + '-' + str(random.randint(0,100000))\n", "\n", "log.info(f'bucket name: {bucket}')\n", "\n", "#create the bucket\n", "s3.create_bucket(\n", " Bucket=bucket,\n", " CreateBucketConfiguration={'LocationConstraint': region}\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Creating the bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create IAM policy\n", "Amazon Rekognition, Transcribe and Comprehend will need to be able to read the contents of your S3 bucket. So add a bucket policy which allows that." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_bucket_policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Id\": \"KnowledgeGraphS3BucketAccessPolicy\",\n", " \"Statement\": [\n", " {\n", " \"Sid\": \"KnowledgeGraphS3BucketAccessPolicy\",\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"rekognition.amazonaws.com\",\n", " \"Service\": \"transcribe.amazonaws.com\",\n", " \"Service\": \"comprehend.amazonaws.com\"\n", " },\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\",\n", " \"s3:PutObject\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:s3:::{}\".format(bucket),\n", " \"arn:aws:s3:::{}/*\".format(bucket)\n", " ]\n", " }\n", " ]\n", "}\n", "\n", "s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(s3_bucket_policy));" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### IAM Role creation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We create the role that Amazon Rekognition, Comprehend, Transcribe will need to run jobs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role_name = account_id+\"-knowledgeGraphLab\"\n", "\n", "assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"rekognition.amazonaws.com\",\n", " \"Service\": \"transcribe.amazonaws.com\",\n", " \"Service\": \"comprehend.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "\n", "try:\n", " create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", " );\n", " \n", "except iam.exceptions.EntityAlreadyExistsException as e:\n", " print('Warning: role already exists:', e)\n", " create_role_response = iam.get_role(\n", " RoleName = role_name\n", " );\n", "\n", "role_arn = create_role_response[\"Role\"][\"Arn\"]\n", "\n", "# Pause to allow role to be fully consistent\n", "time.sleep(10)\n", "\n", "print('IAM Role: {}'.format(role_arn))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "We create 2 policies, for S3 and SNS, that we attach to the role we created above.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\",\n", " \"s3:PutObject\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:s3:::{}\".format(bucket),\n", " \"arn:aws:s3:::{}/*\".format(bucket)\n", " ]\n", " }\n", " ]\n", "}\n", "\n", "#creating the s3 policy\n", "s3_policy_response = iam.create_policy(\n", " PolicyName='s3AccessForRekCompTrans',\n", " PolicyDocument=json.dumps(s3_policy),\n", ")\n", "\n", "s3_policy_arn = s3_policy_response['Policy']['Arn']\n", "\n", "print(s3_policy_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#attaching the above policy to the role\n", "attach_s3_policy_response = iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = s3_policy_response['Policy']['Arn'])\n", "\n", "print('Response:{}'.format(attach_s3_policy_response['ResponseMetadata']['HTTPStatusCode']))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sns_policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"sns:*\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": sns_topic_arn\n", " }\n", " ]\n", "}\n", "#creating the sns policy\n", "sns_policy_response = iam.create_policy(\n", " PolicyName='snsAccessForRekognition-' + str(random.randint(0,1000)),\n", " PolicyDocument=json.dumps(sns_policy),\n", ")\n", "\n", "sns_policy_arn = sns_policy_response['Policy']['Arn']\n", "\n", "print(sns_policy_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#attaching the built-in AmazonSNSFullAccess\n", "attach_sns_policy_response = iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = sns_policy_arn)\n", "\n", "print('Response:{}'.format(attach_sns_policy_response['ResponseMetadata']['HTTPStatusCode']))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Uploading the video to the newly created S3 bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please specify below the S3 bucket where you've stored the video file you'll use to run the notebooks. Please keep in mind that it needs to be a valid .mp4 and that your sagemaker execution role has access to your S3 bucket. You'll get an access denied exception otherwise." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#S3 URL where you have uploaded your video\n", "your_s3_original_video = 's3://< your s3 bucket>/< path to the .mp4 file>'\n", "\n", "#extracting video names and prefix\n", "your_s3_bucket = your_s3_original_video.split('/')[2]\n", "your_s3_prefix = '/'.join(your_s3_original_video.split('/')[3:])\n", "video_file = your_s3_original_video.split('/')[-1]\n", "video_name = video_file.split('.')[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Downloading the file locally from the public S3 bucket to your notebook instance and uploading it to the target S3 bucket for processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#creating a temporary folder on your instance to store the video locally.\n", "tmp_local_folder = './tmp'\n", "if not os.path.exists(tmp_local_folder):\n", " #create folder\n", " os.makedirs(tmp_local_folder)\n", "else:\n", " #remove folder and files\n", " shutil.rmtree(tmp_local_folder)\n", " #wait for deletion to finish\n", " while os.path.exists(tmp_local_folder): # check if it exists\n", " pass\n", " #create folder\n", " os.makedirs(tmp_local_folder)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#download the file locally\n", "s3.download_file(your_s3_bucket, your_s3_prefix, os.path.join(tmp_local_folder, video_file))\n", "\n", "#upload the video file to the target S3 bucket\n", "s3_video_input_path = 'input'\n", "s3.upload_file(os.path.join(tmp_local_folder, video_file), bucket, os.path.join(s3_video_input_path, video_file))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Amazon Neptune\n", "\n", "For part3 of the workshop, you will need to create a Neptune DB cluster. \n", "\n", "IMPORTANT: please make sure you create a brand new Neptune instance for this workshop as we'll be cleaning it of its content\n", "\n", "The easiest is to create your db via the console. \n", "\n", "Make sure you are in the same region where you previously created your jupyter notebook instance.\n", "\n", "Engine options: at the time when I developed this workshop, the 1.0.5.1.R2 version was the latest.\n", "\n", "DB cluster identifier: specify a relevant name\n", "\n", "Templates: \"Development and Testing\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Amazon Neptune DB creation](../static/neptune-creation-part1.png \"Amazon Neptune DB creation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "DB instance size: db.t3.medium\n", "\n", "Multi-AZ deployment: No\n", "\n", "Connectivity: make sure you choose the same VPC as the one you're using for your notebook instance. In my case I am using the default one.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Amazon Neptune DB creation](../static/neptune-creation-part2.png \"Amazon Neptune DB creation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notebook configuration: uncheck the \"Create notebook\". we are going to create a separate notebook in sagemaker.\n", "\n", "leave the rest as default and click \"Create Database\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Amazon Neptune DB creation](../static/neptune-creation-part3.png \"Amazon Neptune DB creation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once your cluster's status is \"Available\", retrieve the endpoint url and port and update the endpoint variable below." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Amazon Neptune endpoint](../static/neptune-ui.png \"Amazon Neptune endpoint\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "your_neptune_endpoint_url = 'wss://:/gremlin'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Defining some variable we'll use later for the different metadata extraction jobs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store tmp_local_folder\n", "%store bucket\n", "%store s3_video_input_path\n", "%store video_file\n", "%store video_name\n", "%store role_arn\n", "%store role_name\n", "%store sns_topic_arn\n", "%store s3_policy_arn\n", "%store sns_policy_arn\n", "%store your_neptune_endpoint_url" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Base Python)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-2:452832661640:image/python-3.6" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }