

# Bringing Media2Cloud Video Analysis into Amazon Neptune Knowledge Graph
This notebook accompanies my blog post to show how to move Media2Cloud (M2C) video analysis into a knowledge graph on Amazon Neptune. 

In this notebook you insert into Neptune two types of data:
- Seed data - an initial graph of orgs, their products, and main people in org
- M2C video analysis results

Then we bring them together! Refer to the blog post for a detailed discussion.

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0


## Seed initial data




### Extract names of S3 buckets from environment

In [None]:
import os
import subprocess

stream = os.popen("source ~/.bashrc ; echo $STAGE_BUCKET; echo $M2C_ANALYSIS_BUCKET")
lines=stream.read().split("\n")
STAGING_BUCKET=lines[0]
ANALYSIS_BUCKET=lines[1]

### Create local folder for analysis results

In [None]:
%%bash

mkdir -p m2c/analysis

### Bulk-load to Neptune from S3

In [None]:
%load -s s3://{STAGING_BUCKET}/data/seeddata.ttl -f turtle --store-to loadres --run

### Check status of load

In [None]:
%load_status {loadres['payload']['loadId']} --errors --details

### Query the seed data

#### Persons and role in org

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select * where {
 ?person a :Person .
 ?person :name ?label .
 OPTIONAL {?person skos:altLabel ?altlabel .} .
 OPTIONAL {?person :hasWikidataRef ?ref .} .
 ?person :hasRole ?role .
 ?role :hasRoleType ?roleType .
 ?role :hasRoleOrg ?org .
 
} order by ?person 


#### Federated query of person

In [None]:
%%sparql

prefix : 

SELECT ?wiki ?p ?o
WHERE 
{
 ?person a :Person .
 ?person :name "Steve Jobs" .
 ?person :hasWikidataRef ?wiki .
 
 SERVICE {
 ?wiki ?p ?o . 
 } 
}

#### Orgs and products

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select distinct ?org ?ref ?product ?productRef where {
 ?org a :Organization .
 OPTIONAL {?org :hasWikidataRef ?ref .} .
 OPTIONAL {
 ?product :producedBy ?org .
 OPTIONAL {?product :hasWikidataRef ?productRef . } . 
 }
 
} order by ?org ?product


## Now add in the M2C analysis

### Boto3 helpers to bring in the video files from S3

In [None]:
# S3 Helpers

import boto3
import json

# categories of findings we're interested in from Rekognition and Comprehend
REKOGNITION_CATS = ['celeb', 'person', 'face', 'label', 'text']
COMPREHEND_CATS = ['entity', 'keyphrase', 'sentiment']

# connection to analysis bucket in S3. Make sure you set ANALYSIS_BUCKET above.
s3 = boto3.client('s3')
s3r = boto3.resource('s3')
s3_ab = s3r.Bucket(ANALYSIS_BUCKET)

# Read S3 object at given key and return as JSON object
def open_json(key):
 obj = s3r.Object(ANALYSIS_BUCKET, key)
 file_content = obj.get()['Body'].read().decode('utf-8') 
 return json.loads(file_content)

# return list of top-level folders in analysis bucket
def get_video_folders():
 return s3.list_objects_v2(Bucket=ANALYSIS_BUCKET, Delimiter='/')['CommonPrefixes']
 
# For a given video (in the specified folder), build list of S3 files that have data for 
# the Rekognition and Comprehend categories.
def build_video_file_summary(folder_obj):
 # build initial video object
 path = folder_obj['Prefix']
 
 # skip the neptune folder, which we use to stage data for Neptune
 if path == 'neptune/':
 return None
 
 video = {'id': path.split("/")[0], 'rekognition': {}, 'comprehend': {}} 
 for r in REKOGNITION_CATS:
 video['rekognition'][r] = {'metadata': [], 'timeseries': [], 'raw': []}
 for c in COMPREHEND_CATS:
 video['comprehend'][c] = {'metadata': [], 'raw': []}
 
 # List those files and organize them by category
 for object_summary in s3_ab.objects.filter(Prefix=path):
 opath = object_summary.key
 splits = opath.split("/")
 subfolder = splits[2]
 video['filename'] = splits[1]
 if subfolder in ['metadata', 'timeseries']:
 cat = splits[3]
 item = splits[4]
 service = 'rekognition' if cat in video['rekognition'] else 'comprehend' if cat in video['comprehend'] else None
 if not(service is None) and cat in video[service]:
 video[service][cat][subfolder].append(opath)
 elif subfolder == 'raw':
 raw_ts = splits[3]
 raw_service = splits[4]
 if raw_service in ['rekognition', 'comprehend']:
 raw_cat = splits[5]
 raw_item = splits[6] 
 if raw_cat in video[raw_service]:
 video[raw_service][raw_cat]['raw'].append(opath)
 return video
 

#test
get_video_folders()


### Install RDFLib

In [None]:
!pip install rdflib

### RDFLib helpers to build the triples for M2C video analysis

In [None]:
# RDF helpers

from rdflib import Graph, URIRef, Literal, BNode
from rdflib.namespace import RDF, RDFS, OWL, SKOS
import urllib.parse

# main URI namespace
NAMESPACE="http://amazon.com/aws/wwso/neptune/demo/m2c/"

# List to create common triples. Common across all videos.
# We will also be creating lists per video in next cell.
common_g = Graph()

# OWL classes
CLASS_VIDEO_ANALYSIS = URIRef(NAMESPACE + "VideoAnalysis")
CLASS_CELEB = URIRef(NAMESPACE + "Celebrity")
CLASS_LABEL = URIRef(NAMESPACE + "Label")
CLASS_APPEARANCE = URIRef(NAMESPACE + "Appearance")
CLASS_EMOTION = URIRef(NAMESPACE + "Emotion")
CLASS_SENTIMENT = URIRef(NAMESPACE + "Sentiment")
CLASS_ENTITY = URIRef(NAMESPACE + "Entity")

# datatype properties
PROP_ID=URIRef(NAMESPACE + "id")
PROP_FILENAME=URIRef(NAMESPACE + "filename")
PROP_NAME=URIRef(NAMESPACE + "name")
PROP_APPEARANCE=URIRef(NAMESPACE + "appearance")
PROP_APPEARANCE_PCT=URIRef(NAMESPACE + "appearancePct")
PROP_PERSON_COUNT=URIRef(NAMESPACE + "personCount")
PROP_COUNT=URIRef(NAMESPACE + "count")
PROP_SUBTYPE=URIRef(NAMESPACE + "subtype")
PROP_OBSERVED_TEXT=URIRef(NAMESPACE + "observedText")
PROP_EXTRACTED_KEYPHRASE=URIRef(NAMESPACE + "extractedKeyphrase")
PROP_SENTIMENT=URIRef(NAMESPACE + "hasSentiment")
PROP_SENTIMENT_POSITIVE=URIRef(NAMESPACE + "sentimentCountPositive")
PROP_SENTIMENT_NEGATIVE=URIRef(NAMESPACE + "sentimentCountNegative")
PROP_SENTIMENT_MIXED=URIRef(NAMESPACE + "sentimentCountMixed")
PROP_SENTIMENT_NEUTRAL=URIRef(NAMESPACE + "sentimentCountNeutral")

# Object properties
PROP_HAS_GENDER=URIRef(NAMESPACE + "hasGender")
PROP_HAS_EXT_URL=URIRef(NAMESPACE + "hasExternalURL")
PROP_HAS_WIKIDATA_REF=URIRef(NAMESPACE + "hasWikidataRef")
PROP_HAS_APPEARANCE=URIRef(NAMESPACE + "hasAppearance")
PROP_HAS_CELEB_APPEARANCE=URIRef(NAMESPACE + "hasCelebAppearance")
PROP_HAS_LABEL_APPEARANCE=URIRef(NAMESPACE + "hasLabelAppearance")
PROP_HAS_APPEARANCE_SUBJECT=URIRef(NAMESPACE + "hasAppearanceSubject")
PROP_HAS_EMOTION=URIRef(NAMESPACE + "hasEmotion")
PROP_HAS_EXTRACTED_ENTITY=URIRef(NAMESPACE + "hasExtractedEntity")

# add triples defining an appearance. Add to the graph g for the video.
def add_appearance(g, prop, uri, j, vuri):
 auri = BNode()
 g.add((auri, RDF.type, CLASS_APPEARANCE))
 g.add((vuri, prop, auri))
 g.add((auri, PROP_APPEARANCE, Literal(j['appearance'])))
 g.add((auri, PROP_APPEARANCE_PCT, Literal(100.0 * j['appearance']/j['duration'])))
 g.add((auri, PROP_HAS_APPEARANCE_SUBJECT, uri))

# add a celebrity to the common graph
# note we also need to convert Wikidata web URL to an RDF URI
common_celeb = {}
def get_gender_uri(g):
 return URIRef(NAMESPACE + "gender/" + g)
def add_celeb(c):
 if c['Id'] in common_celeb:
 return common_celeb[c['Id']]
 
 curi = URIRef(NAMESPACE + "celeb/" + c['Id'])
 common_g.add((curi, RDF.type, CLASS_CELEB))
 common_g.add((curi, PROP_ID, Literal(c['Id'])))
 common_g.add((curi, PROP_NAME, Literal(c['Name'])))
 common_g.add((curi, PROP_HAS_GENDER, get_gender_uri(c['KnownGender']['Type'])))
 for u in c['Urls']:
 # Convert the wikidata to a URI
 if u.startswith("www.wikidata.org/wiki"):
 spl = u.split("/")
 spl[1] = 'entity'
 w = "http://" + "/".join(spl)
 common_g.add((curi, PROP_HAS_WIKIDATA_REF, URIRef(w)))
 common_g.add((curi, PROP_HAS_EXT_URL, URIRef(u)))
 common_celeb[c['Id']] = curi
 return curi
 
# add a label to the common graph
# and build SKOS broader from label parents.
common_label = {}
def get_label_uri(l):
 return URIRef(NAMESPACE + "label/" +urllib.parse.quote(l))
 
def add_label(c):
 if c['Name'] in common_label:
 return common_label[c['Name']]

 luri = get_label_uri(c['Name'])
 common_g.add((luri, RDF.type, CLASS_LABEL))
 common_g.add((luri, RDF.type, SKOS.Concept))
 common_g.add((luri, PROP_NAME, Literal(c['Name'])))
 common_g.add((luri, SKOS.prefLabel, Literal(c['Name'])))
 kid = luri
 for p in c['Parents']:
 parent = get_label_uri(p['Name'])
 if not(p['Name'] in common_label):
 common_g.add((parent, RDF.type, CLASS_LABEL))
 common_g.add((parent, RDF.type, SKOS.Concept))
 common_g.add((parent, PROP_NAME, Literal(p['Name'])))
 common_g.add((parent, SKOS.prefLabel, Literal(p['Name'])))
 #print("broader " + str(kid) + " " + str(parent))
 common_g.add((kid, SKOS.broader, parent))
 kid = parent
 return luri

# Add an emotion to video graph g
def add_emotion(g, gender, subtype, count, vuri):
 auri = BNode()
 g.add((auri, RDF.type, CLASS_EMOTION))
 g.add((vuri, PROP_HAS_EMOTION, auri))
 g.add((auri, PROP_COUNT, Literal(count)))
 g.add((auri, PROP_SUBTYPE, Literal(subtype)))
 g.add((auri, PROP_HAS_GENDER, get_gender_uri(gender)))

# Add an entity to video graph g
def add_entity(g, record, vuri):
 auri = BNode()
 g.add((auri, RDF.type, CLASS_ENTITY))
 g.add((vuri, PROP_HAS_EXTRACTED_ENTITY, auri))
 g.add((auri, PROP_SUBTYPE, Literal(record['type'])))
 g.add((auri, PROP_NAME, Literal(ent['text'])))
 
# Give URI of the sentimentCount property. We define 4 of these for Positive, Negative, Neutral, Mixed
def get_sentiment_uri(s):
 us = URIRef(NAMESPACE + "sentimentCount") + s
 return us
 
# save common graph to TTL file on the notebook instance
def save_common():
 common_g.serialize(format='ttl', destination='m2c/analysis/common.ttl')


### For each video in analysis bucket, build triples and write to RDF file on the notebook instance

In [None]:
import traceback

# Get all the video folders. If you have a lot of these, you might want to filter this list
# Loop through the videos and build RDF for each.
videos = get_video_folders()
for v in videos:
 try:
 # determine which S3 objects are needed
 summary = build_video_file_summary(v)
 if summary is None:
 continue
 
 # Create video-specific RDF graph we will be building on 
 g = Graph()
 video_uri = URIRef(NAMESPACE + "video/" + summary['id'])
 g.add((video_uri, RDF.type, CLASS_VIDEO_ANALYSIS))
 g.add((video_uri, PROP_FILENAME, Literal(summary['filename'])))
 g.add((video_uri, PROP_ID, Literal(summary['id'])))

 # Build celebs from Rekognition
 # Add a celebrity appearance to the video graph. Also add celeb to common graph.
 celebs = {}
 celebs_byname = {}
 for f in summary['rekognition']['celeb']['raw']:
 if f.endswith('mapFile.json'):
 continue # skip
 j = open_json(f)
 for cr in j['Celebrities']:
 c = cr['Celebrity']
 curi = add_celeb(c)
 if not(c['Id'] in celebs):
 celebs[c['Id']] = curi
 celebs_byname[c['Name']] = curi
 for f in summary['rekognition']['celeb']['timeseries']:
 j = open_json(f)
 curi = celebs_byname[j['label']]
 if not(curi is None):
 add_appearance(g, PROP_HAS_CELEB_APPEARANCE, curi, j, video_uri)

 # Build labels from Rekognition
 # Add a label appearance to the video graph. Also add label to common graph. 
 for f in summary['rekognition']['label']['raw']:
 if f.endswith('mapFile.json'):
 continue
 j = open_json(f)
 labels = {}
 for cr in j['Labels']:
 c = cr['Label']
 luri = add_label(c)
 if not(c['Name'] in labels):
 labels[c['Name']] = c
 for f in summary['rekognition']['label']['timeseries']:
 j = open_json(f)
 luri = get_label_uri(j['label'])
 if not(luri is None):
 add_appearance(g, PROP_HAS_LABEL_APPEARANCE, luri, j, video_uri)
 
 # Build persons from Rekognition.
 # Lots of detail, but just count is OK
 for f in summary['rekognition']['person']['raw']:
 if f.endswith("mapFile.json"): 
 j = open_json(f)
 g.add((video_uri, PROP_PERSON_COUNT, Literal(len(j))))

 # Build faces from Rekognition
 # We express this in terms of Emotion rather than face!
 # We will just build count of emotions per gender
 emotions = {}
 for f in summary['rekognition']['face']['raw']:
 if f.endswith("mapFile.json"): 
 continue
 j = open_json(f)
 for ff in j['Faces']:
 f = ff['Face']
 gender = f['Gender']['Value']
 if not(gender in emotions):
 emotions[gender] = {}
 for em in f['Emotions']:
 if em['Confidence'] >= 0.75:
 emv = em['Type']
 if not(emv in emotions[gender]):
 emotions[gender][emv] = 0
 emotions[gender][emv] = emotions[gender][emv] + 1
 for gender in emotions:
 for emo in emotions[gender]:
 add_emotion(g, gender, emo, emotions[gender][emo], video_uri)

 # Build text from Rekognition
 for f in summary['rekognition']['text']['raw']:
 if f.endswith('mapFile.json'):
 j = open_json(f)
 for txt in j:
 g.add((video_uri, PROP_OBSERVED_TEXT, Literal(txt)))
 
 # Build entity from Comprehend
 for f in summary['comprehend']['entity']['metadata']:
 j = open_json(f)
 for ent in j:
 add_entity(g, ent, video_uri)
 
 # Build keyphrases from Comprehend
 for f in summary['comprehend']['keyphrase']['metadata']:
 j = open_json(f)
 for kpr in j:
 g.add((video_uri, PROP_EXTRACTED_KEYPHRASE, Literal(kpr['text'])))

 # Build sentiment from Comprehend - just totals
 for f in summary['comprehend']['sentiment']['metadata']:
 j = open_json(f)
 sentiment = {}
 for s in j:
 if not(s['text'] in sentiment):
 sentiment[s['text']] = 0
 sentiment[s['text']] = sentiment[s['text']] + s['end'] - s['begin']
 for s in sentiment:
 g.add((video_uri, get_sentiment_uri(s), Literal(sentiment[s])))


 # save the triples for video-specific graph
 print("serialize " + summary['id'])
 g.serialize(format="ttl", destination="m2c/analysis/" + summary['id'] + ".ttl")
 except Exception as e:
 traceback.print_exc()

# serialize the common
print("common")
save_common()


### Upload RDF files to S3 

In [None]:
%%bash -s "$STAGING_BUCKET"

cd m2c/analysis
aws s3 sync . s3://$1/analysis

### Bulk-load these to Neptune

In [None]:
%load -s s3://{STAGING_BUCKET}/analysis -f turtle --store-to loadres2 --run

### Check load status

In [None]:
%load_status {loadres2['payload']['loadId']} --errors --details

### Query the combined data


#### Summary of videos

In [None]:
%%sparql

prefix : 

SELECT * where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 OPTIONAL {?video :personCount ?personCount . } .
 OPTIONAL {?video :sentimentCountPositive ?sentimentPos . } .
 OPTIONAL {?video :sentimentCountNegative ?sentimentNeg . } .
 OPTIONAL {?video :sentimentCountMixed ?sentimentMixed . } .
 OPTIONAL {?video :sentimentCountNeutral ?sentimentNeural . } .
} 

#### Describe one
Choose a VIDEO URI from the previous result and paste it the DESCRIBE QUERY. Scroll the Table view and notice sentiment, observed text, extracted entities and keyphrases, labels and celebs appearing, emotions. Try the Graph view to see visual.

If you are on a release prior to 1.2.1.0, comment out the hint:Query clause

In [None]:
%%sparql

PREFIX hint: 

DESCRIBE 
{
 hint:Query hint:describeMode "CBD"
}


#### Show celeb appearances in the videos

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select ?filename ?celebName ?celebRef ?appPct where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 OPTIONAL {
 ?video :hasCelebAppearance ?appo .
 ?appo :appearancePct ?appPct .
 ?appo :hasAppearanceSubject ?sub .
 OPTIONAL { ?sub :hasWikidataRef ?celebRef } .
 ?sub :name ?celebName
 
 } . 
} 

#### LINK: Tie celebs in videos to persons from seed, matching on wikidata ref

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select (?filename AS ?s) (?ref as ?p) (?person as ?o) where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 ?video :hasCelebAppearance/:hasAppearanceSubject/:hasWikidataRef ?ref.
 ?person :hasWikidataRef ?ref .
 ?person a :Person .
} 

#### LINK: Tie extracted entities to persons, orgs, products.
This uses exact string match on name or alt label.

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select distinct ?filename ?entName ?match ?entType where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 ?video :hasExtractedEntity ?ent .
 ?ent :name ?entName .
 ?ent :subtype ?entType .
 VALUES ?entType { "PERSON" "ORGANIZATION" "OTHER" "COMMERCIAL_ITEM" "EVENT" } .
 VALUES ?seedClass { :Person :Organization :Product } .
 ?match :name|skos:altLabel ?entName .
 ?match a ?seedClass .
} 

#### LINK: Tie any text in video analysis to seed object. 
Uses lower-case match. In practice, run this for a specific video.

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select distinct ?filename ?text ?match ?entName where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 ?video :hasExtractedEntity/:name|:extractedKeyphrase|:observedText|:hasLabelAppearance/:hasAppearanceSubject/:name ?text .
 VALUES ?seedClass { :Person :Organization :Product } .
 ?match :name|skos:altLabel ?entName .
 ?match a ?seedClass .
 FILTER(lcase(?text) = lcase(?entName)) .
} order by ?filename ?match

#### LINK: Match on m2c label and bring in m2c taxonomy for reference

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select distinct ?filename ?term ?entName (GROUP_CONCAT(?parentTerm;SEPARATOR=",") AS ?parentTerms)
where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 ?video :hasLabelAppearance/:hasAppearanceSubject ?term .
 ?term :name ?label .
 OPTIONAL {?term skos:broader+ ?parentTerm .} .
 
 VALUES ?seedClass { :Person :Organization :Product } .
 ?match :name|skos:altLabel ?entName .
 ?match a ?seedClass .
 
 FILTER (lcase(?entName) = lcase(?label)) .
} GROUP BY ?filename ?term ?entName 

#### LINK: Match on extracted entity and show what else is there

In [None]:
%%sparql

prefix : 
prefix rdf: 
prefix rdfs: 
prefix owl: 
prefix xsd: 
prefix skos: 

select distinct ?filename ?entName ?match (GROUP_CONCAT(?anyEntity;SEPARATOR=",") AS ?allEntities) 
where {
 ?video a :VideoAnalysis .
 ?video :filename ?filename .
 ?video :hasExtractedEntity ?ent .
 ?ent :name ?entName .
 ?ent :subtype ?entType .
 VALUES ?entType { "PERSON" "ORGANIZATION" "OTHER" "COMMERCIAL_ITEM" "EVENT" } .
 VALUES ?seedClass { :Person :Organization :Product } .
 ?match :name|skos:altLabel ?entName .
 ?match a ?seedClass .
 ?video :hasExtractedEntity/:name ?anyEntity .
} GROUP BY ?filename ?entName ?match 
LIMIT 10

## Cleanup (if necessary)
Delete all triples transactionally. OK if you are only using this small dataset. 

In [None]:
%%sparql

delete {?s ?p ?o} where {?s ?p ?o}