{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PART 2 - Text and Metadata extraction from the audio of the video file\n",
"\n",
"In the following section of the workshop, we're going to:\n",
"- Transcribe the file's audio into text using Amazon Transcribe\n",
"- Prepare the transcript data in the format expected by Amazon Comprehend\n",
"- Run a topic modelling job using Amazon Comprehend to extract topics\n",
"- Run an NER (Named Entity Recognition) job using Amazon Comprehend to extract names and entities (e.g. countries, places, etc)\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"All those metadata will then be used alongside with metadata extracted via computer vision with Rekognition to populate our knowlege graph in part 3.\n",
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#load stored variable from previous notebooks\n",
"%store -r"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#installing a useful library to manipulate json object\n",
"!pip install jsonlines\n",
"!pip install pandas\n",
"!pip install boto3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Transcribe the file's audio into text\n",
"Amazon Transcribe uses machine learning to recognize speech in audio and video files and transcribe that speech into text. Practical use cases for Amazon Transcribe include transcriptions of customer-agent calls and closed captions for videos.\n",
"\n",
"https://docs.aws.amazon.com/transcribe/latest/dg/transcribe-whatis.html"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import os\n",
"import random\n",
"import time\n",
"import urllib\n",
"import json\n",
"import csv\n",
"import tarfile\n",
"import pandas as pd\n",
"import jsonlines"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"transcribe = boto3.client('transcribe')\n",
"\n",
"#creating a unique name for the job\n",
"transcribe_job_name = \"transcribe_job_knowledge_graph\" + str(random.randint(0, 100000))\n",
"\n",
"#s3 path to your video file\n",
"transcribe_job_uri = \"s3://\" + os.path.join(bucket, s3_video_input_path, video_file)\n",
"\n",
"#starting the transcription job\n",
"transcription_job = transcribe.start_transcription_job(\n",
" TranscriptionJobName=transcribe_job_name,\n",
" Media={'MediaFileUri': transcribe_job_uri},\n",
" MediaFormat='mp4',\n",
" LanguageCode='en-US',\n",
" OutputBucketName=bucket\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Monitoring the job's completion"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(transcribe_job_name)\n",
"while True:\n",
" status = transcribe.get_transcription_job(TranscriptionJobName=transcribe_job_name)\n",
" if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:\n",
" break\n",
" print(\".\", end='')\n",
" time.sleep(5)\n",
"print(status['TranscriptionJob']['TranscriptionJobStatus'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Download the transcript file from the s3 bucket"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3 = boto3.Session().resource('s3')\n",
"\n",
"#retrieving the transcript file URI\n",
"s3_transcript_file_url = status['TranscriptionJob']['Transcript']['TranscriptFileUri']\n",
"\n",
"S3_transcript_file_name = s3_transcript_file_url.split('/')[-1]\n",
"\n",
"#local path where to store the transcript file\n",
"local_transcribe_file_path = os.path.join(tmp_local_folder, S3_transcript_file_name)\n",
"\n",
"#downloading locally the transcript file\n",
"s3.Bucket(bucket).Object(S3_transcript_file_name).download_file(local_transcribe_file_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#loading the transcript object in memory\n",
"transcribe_file = open(local_transcribe_file_path)\n",
"transcribe_json_data = json.load(transcribe_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's have a look at the output. below is the itemised version of the transcript, word by word for the 5 first words."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"transcript_items = transcribe_json_data['results']['items']\n",
"transcript_items[:5]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Formating the transcript to be consumed by Amazon Comprehend for the following 2 jobs.\n",
"The documentation explains that we can format the input CSV file in 2 ways. Either we provide one document per file or a file containing one document per line. We're going to pick the latter option."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We have different ways of splitting that text into \"blocks\" of words. One logical way of doing it could be to do it sentence by sentence.
\n",
"We're choosing here to segment our text transcript by chunk of 1 minute.
\n",
"Reason being that later we're going to attach video/audio metadata to 1 minute video segments in order to have a fine grained level of information on our video.
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#setting the size of our segments to 1 minute\n",
"segment_size_ms = 60000"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following function is using the timestamp from each item to break the whole transcript into 1min chunks."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def prepare_transcribed_text_for_topic_modelling(transcript_items, segment_size_ms=60000):\n",
"\n",
" #initiatlising current segment with segment size\n",
" current_segment_end = segment_size_ms\n",
" sentence_list_per_segment = []\n",
" buffer_sentence = []\n",
" for item in transcript_items:\n",
" \n",
" #filter on pronunciation, ignoring punctuation for the moment\n",
" type_ = item['type']\n",
" if type_ == 'pronunciation':\n",
" start = float(item['start_time']) * 1000\n",
" end = float(item['end_time']) * 1000\n",
" content = item['alternatives'][0]['content']\n",
" \n",
" # splitting text across the different segments\n",
" if start <= current_segment_end :\n",
" buffer_sentence.append(content)\n",
" else:\n",
" if (len(buffer_sentence) > 0):\n",
" #appending \"\\r\\n\" at the end of each line - requirement from comprehend\n",
" sentence_list_per_segment.append(' '.join(buffer_sentence))\n",
" buffer_sentence = []\n",
" current_segment_end += segment_size_ms\n",
" \n",
" #flush the buffer at the end\n",
" if (len(buffer_sentence) > 0):\n",
" sentence_list_per_segment.append(' '.join(buffer_sentence))\n",
" \n",
" return sentence_list_per_segment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#getting the transcript in the right format\n",
"video_transcript = prepare_transcribed_text_for_topic_modelling(transcript_items, segment_size_ms)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We're now writing the transcript in csv format in S3 to be consumed by Comprehend"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#writing the transcript in csv format in S3 to be consumed by Comprehend\n",
"def write_list_to_csv(local_file_path, rows, bucket, path):\n",
" filename = local_file_path.split('/')[-1]\n",
" #create file locally\n",
" with open(local_file_path, 'w+') as f:\n",
" write = csv.writer(f)\n",
" for row in rows:\n",
" write.writerow([row])\n",
" #upload to S3\n",
" boto3.resource('s3').Bucket(bucket).Object(os.path.join(path, filename)).upload_file(local_file_path)\n",
" print(f\"{filename} uploaded to s3://{bucket}/{path}/{filename}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"transcript_filename = 'video_transcript.csv'\n",
"s3_comprehend_input_path = 'comprehend-input'\n",
"\n",
"write_list_to_csv(os.path.join(tmp_local_folder, transcript_filename), \n",
" video_transcript, \n",
" bucket, \n",
" s3_comprehend_input_path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We're just checking the number of lines in the file we just created which should correspond to the duration of our video in minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"num_lines = sum(1 for line in open(os.path.join(tmp_local_folder, transcript_filename)))\n",
"print(f'Number of lines in our file: {num_lines}')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Comprehend - Topic detection\n",
"We're now ready to launch the first job.\n",
"\n",
"You can use Amazon Comprehend to examine the content of a collection of documents to determine common themes. For example, you can give Amazon Comprehend a collection of news articles, and it will determine the subjects, such as sports, politics, or entertainment. The text in the documents doesn't need to be annotated.\n",
"\n",
"Amazon Comprehend uses a Latent Dirichlet Allocation-based learning model to determine the topics in a set of documents. It examines each document to determine the context and meaning of a word. The set of words that frequently belong to the same context across the entire document set make up a topic.\n",
"\n",
"https://docs.aws.amazon.com/comprehend/latest/dg/topic-modeling.html"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"comprehend = boto3.client('comprehend')\n",
"\n",
"s3_output_data_comprehend = os.path.join(\"s3://\", bucket, 'comprehend-tm-output')\n",
"s3_input_data_comprehend = os.path.join(\"s3://\", bucket, s3_comprehend_input_path)\n",
"\n",
"#note that we're setting the number of topics to 15\n",
"response = comprehend.start_topics_detection_job(\n",
" InputDataConfig={\n",
" 'S3Uri': s3_input_data_comprehend,\n",
" 'InputFormat': 'ONE_DOC_PER_LINE'\n",
" },\n",
" OutputDataConfig={\n",
" 'S3Uri': s3_output_data_comprehend,\n",
" },\n",
" DataAccessRoleArn=role_arn,\n",
" JobName='comprehend_job_knowledge_graph_' + str(random.randint(0,100000)),\n",
" NumberOfTopics=15\n",
")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Monitoring the progress of the job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"while True:\n",
" status = comprehend.describe_topics_detection_job(JobId=response['JobId'])\n",
" if status['TopicsDetectionJobProperties']['JobStatus'] in ['COMPLETED', 'FAILED']:\n",
" break\n",
" print(\".\", end='')\n",
" time.sleep(10)\n",
"print(comprehend.describe_topics_detection_job(JobId=response['JobId'])['TopicsDetectionJobProperties']['JobStatus'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After Amazon Comprehend processes your document collection, it returns a compressed archive containing two files, topic-terms.csv and doc-topics.csv. \n",
"\n",
"The first output file, topic-terms.csv, is a list of topics in the collection. For each topic, the list includes by default the top terms by topic according to their weight. \n",
"\n",
"The second file, doc-topics.csv, lists the documents associated with a topic and the proportion of the document that is concerned with the topic. If you specified ONE_DOC_PER_FILE, the document is identified by the file name. If you specified ONE_DOC_PER_LINE (like in our case), the document is identified by the file name and the 0-indexed line number within the file. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download and extract the comprehend topic detection output"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#function to extract a tar file\n",
"def extract(tar_file, path):\n",
" opened_tar = tarfile.open(tar_file)\n",
" \n",
" if tarfile.is_tarfile(tar_file):\n",
" opened_tar.extractall(path)\n",
" return path\n",
" else:\n",
" print(\"The tar file you entered is not a tar file\")\n",
"\n",
"#download\n",
"def download_and_extract_comprehend_job_output(output_s3_uri, dl_path):\n",
" s3_bucket = output_s3_uri.split('/')[2]\n",
" s3_file_path = '/'.join(output_s3_uri.split('/', 3)[3:])\n",
" local_file_path = os.path.join(dl_path, output_s3_uri.split('/')[-1])\n",
"\n",
" boto3.resource('s3').Bucket(s3_bucket).Object(s3_file_path).download_file(local_file_path)\n",
" return extract(local_file_path, dl_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"topics_output_s3_uri = comprehend.describe_topics_detection_job(JobId=response['JobId'])['TopicsDetectionJobProperties']['OutputDataConfig']['S3Uri']\n",
"\n",
"job_comprehend_output_folder = download_and_extract_comprehend_job_output(topics_output_s3_uri, tmp_local_folder)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Looking into the 2 output files and loading this into dataframes for later use."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"topics_file = 'doc-topics.csv'\n",
"topic_terms_file = 'topic-terms.csv'\n",
"comprehend_topics_df = pd.read_csv(os.path.join(tmp_local_folder, topics_file))\n",
"comprehend_terms_df = pd.read_csv(os.path.join(tmp_local_folder, topic_terms_file))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Displaying the 5 first documents and their topics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"comprehend_topics_df.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Displaying top 10 words for topic 1. This will give us an idea of what this topic is about. Remember that topic modelling is not outputing a specific label but instead an unlabeled topic or grouping of documents for which we have a list of prominent words and their weight/importance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"comprehend_terms_df[comprehend_terms_df['topic'] == 1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Comprehend NER Named Entity Recognition\n",
"\n",
"We're now looking at extracting Named entities from the video's transcript, still using Amazon Comprehend.\n",
"\n",
"An entity is a textual reference to the unique name of a real-world object such as people, places, and commercial items, and to precise references to measures such as dates and quantities.\n",
"\n",
"https://docs.aws.amazon.com/comprehend/latest/dg/how-entities.html"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"response_NER = comprehend.start_entities_detection_job(\n",
" InputDataConfig={\n",
" 'S3Uri': s3_input_data_comprehend,\n",
" 'InputFormat': 'ONE_DOC_PER_LINE'\n",
" },\n",
" OutputDataConfig={\n",
" 'S3Uri': s3_output_data_comprehend,\n",
" },\n",
" LanguageCode='en',\n",
" DataAccessRoleArn=role_arn,\n",
" JobName='comprehend_job_knowledge_graph_NER' + str(random.randint(0,100000)),\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"while True:\n",
" status_NER = comprehend.describe_entities_detection_job(JobId=response_NER['JobId'])\n",
" if status_NER['EntitiesDetectionJobProperties']['JobStatus'] in ['COMPLETED', 'FAILED']:\n",
" break\n",
" print(\".\", end='')\n",
" time.sleep(10)\n",
"print(comprehend.describe_entities_detection_job(JobId=response_NER['JobId'])['EntitiesDetectionJobProperties']['JobStatus'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#retrieving the outputs of our NER job\n",
"ner_output_s3_uri = comprehend.describe_entities_detection_job(JobId=response_NER['JobId'])['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri']\n",
"job_comprehend_output_folder = download_and_extract_comprehend_job_output(ner_output_s3_uri, tmp_local_folder)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's look into the output of the NER job. As you can see we've got different types of entities: PERSON, DATE, QUANTITY, LOCATION, ORGANIZATION, OTHERS."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ner_job_data = []\n",
"with jsonlines.open(os.path.join(tmp_local_folder, 'output')) as ner_json_reader:\n",
" for obj in ner_json_reader:\n",
" ner_job_data.append(obj['Entities'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ner_job_data[0]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's store that aside for it to be used in part 3 of the workshop"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%store segment_size_ms\n",
"%store comprehend_terms_df\n",
"%store comprehend_topics_df\n",
"%store ner_job_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Base Python)",
"language": "python",
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-2:452832661640:image/python-3.6"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}