# Custom Celebrity Recognition Using Amazon Rekognition

***
This notebook provides a walkthrough of recognizing custom celebrities using Amazon Rekognition. You will first index faces of custom celebrities and then use SearchFaces API (https://docs.aws.amazon.com/rekognition/latest/APIReference/API_SearchFacesByImage.html and https://docs.aws.amazon.com/rekognition/latest/APIReference/API_StartFaceSearch.html) with sample image and video to detect custom celebrities.

***

# Initialize Stuff
***

In [None]:
# initialise Notebook
import boto3
from IPython.display import HTML, display, Image as IImage
from PIL import Image, ImageDraw, ImageFont
import time
import os
from io import BytesIO

In [None]:
# Get current region to choose correct bucket
mySession = boto3.session.Session()
awsRegion = mySession.region_name

In [None]:
# Initialize clients
rekognition = boto3.client('rekognition')
dynamodb = boto3.client('dynamodb')
s3 = boto3.client('s3')

In [None]:
# S3 bucket that contains sample images and videos

# We are providing sample images and videos in this bucket so
# you do not have to manually download/upload test images and videos.
bucketName = "aws-rek-immersionday-" + awsRegion

In [None]:
# DynamoDB Table and Rekognition Collection names. We will be creating these in this module.
ddbTableName = "my-celebrities"
collectionId = "my-celebrities"

In [None]:
# Create temporary directory
# This directory is not needed to call Rekognition APIs.
# We will only use this directory to download images from S3 bucket and draw bounding boxes

!mkdir m2tmp
tempFolder = 'm2tmp/'

# DynamoDB table to store custom celebrity metadata
***
In this step we will create a DynamoDB table to store custom celebrity metadata including id, name and url. You can store additional attributes for each celebrity if needed.

#### List existing DynamoDB tables in your account

In [None]:
# List existing DynamoDB Tables
# Before creating DynamoDB table, let us first look at the list of existing DynamoDB tables in our account.

listTablesResponse = dynamodb.list_tables()
display(listTablesResponse["TableNames"])

#### Create new DynamoDB Table

In [None]:
# Create new DynamoDB Table
        
createTableResponse = dynamodb.create_table(
    TableName=ddbTableName,
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'  #Partition key
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'S'
        },

    ],
    BillingMode='PAY_PER_REQUEST'
)

display(createTableResponse)

#### List DynamoDB Tables in your account to see newly created table 

In [None]:
# List DynamoDB Tables
# Let us look at list of our DynamoDB tables again to make sure that table we just created appears in the list.

listTablesResponse = dynamodb.list_tables()
display(listTablesResponse["TableNames"])

# Rekogniton Collection to store faces
***
In this step we will create a Rekognition Collection.

Amazon Rekognition can store information about detected faces in server-side containers known as [collections](https://docs.aws.amazon.com/rekognition/latest/dg/collections.html). You can use the facial information that's stored in a collection to search for known faces in images, stored videos, and streaming videos. In this section you will learn how you can create and manage Rekognition Collections.

#### List Rekognition Collections

In [None]:
# List Rekognition Collections
# Let us first see if we have already created any Rekognition collections in our account.
# If there is not an existing Rekognition in your account, you will see empty list
# otherwise you will a list with names of rekognition collections and face model version.

listCollectionsResponse = rekognition.list_collections()

display(listCollectionsResponse["CollectionIds"])
display(listCollectionsResponse["FaceModelVersions"])

#### Create new Rekognition collection

In [None]:
#cids = listCollectionsResponse["CollectionIds"]
#for cid in cids:
#    rekognition.delete_collection(CollectionId=cid)

In [None]:
# Create Rekognition Collection
# Let us now create a new Rekognition collection that we will use to store faces of custom celebrities.

createCollectionResponse = rekognition.create_collection(
    CollectionId=collectionId
)
display(createCollectionResponse)


#### List Rekognition collections to see newly created Rekognition collection

In [None]:
# List Rekognition Collections
# Let us make sure that Recognition we just created now appears in the list of collections in our AWS account.
listCollectionsResponse = rekognition.list_collections()

display(listCollectionsResponse["CollectionIds"])
display(listCollectionsResponse["FaceModelVersions"])

#### View additional information about the collection we just created

In [None]:
# Describe Rekognition Collection
# You can use DescribeCollection to get information, 
# such as the number of faces indexed into a collection 
# and the version of the model used by the collection for face detection etc.
# https://docs.aws.amazon.com/rekognition/latest/APIReference/API_DescribeCollection.html

# Since we have not indexed any faces yet, you should see FaceCount: 0

describeCollectionResponse = rekognition.describe_collection(
    CollectionId=collectionId
)
display(describeCollectionResponse)

# Index Custom Celebrity Faces
***

In this step, you will index faces of custom celebrities in Rekognition collection and store their additional information in the DynamoDB table created in earlier steps.

We will index multiple images for each celebrity. By indexing multiple faces we increase the likelihood of detecting celebrities when their face is at different angles, etc. We will use [IndexFaces](https://docs.aws.amazon.com/rekognition/latest/APIReference/API_IndexFaces.html) to detect faces in the input image and [add them](https://docs.aws.amazon.com/rekognition/latest/dg/add-faces-to-collection-procedure.html) to the specified collection.

You can read more about some of the best practices around [indexing faces here in the blog](https://aws.amazon.com/blogs/machine-learning/save-time-and-money-by-filtering-faces-during-indexing-with-amazon-rekognition/).

#### Define methods to add face to Rekognition collection and add related attributes to DynamoDB

In [None]:
# We will define a method to index a face along with the celebrity id
# https://docs.aws.amazon.com/rekognition/latest/APIReference/API_IndexFaces.html

def indexFace (bucketName, imageName, celebrityId):

    indexFaceResponse = rekognition.index_faces(
        CollectionId=collectionId,
        Image={
            'S3Object': {
                'Bucket': bucketName,
                'Name': imageName,
            }
        },
        ExternalImageId=celebrityId,
        DetectionAttributes=[
            'DEFAULT' #'DEFAULT'|'ALL',
        ],
        MaxFaces=1,
        QualityFilter='AUTO' #NONE | AUTO | LOW | MEDIUM | HIGH
    )
    
    display(indexFaceResponse)

# We will define a method to write metadata (id, name, url) of celebrity to DynamoDB
def addCelebrityToDynamoDB(celebrityId, celebrityName, celebrityUrl):
    ddbPutItemResponse = dynamodb.put_item(
        Item={
            'id': {'S': celebrityId},
            'name': {'S': celebrityName},
            'url': { 'S': celebrityUrl},
        },
        TableName=ddbTableName,
    )

## Index first celebrity

In [None]:
# Index Celebrity 1
celebrityId = "1"
celebrityName = "Chris Munns"
celebrityUrl = "http://www.amazon.com"

In [None]:
addCelebrityToDynamoDB(celebrityId, celebrityName, celebrityUrl)

#### Index face 1

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/chris01.png"})))

In [None]:
# After you run this cell the biggest face from the image will be indexed.
# You will get a JSON response with a variety of information, but notice FaceId, ImageId and ExternalImageId
# Later when we will search celebrities, we will use this ExteralImageId to extract metadata from DynamoDB.

indexFace(bucketName, "media/custom-celebrity-recognition/chris01.png", celebrityId)

#### Index face 2

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/chris02.png"})))

In [None]:
indexFace(bucketName, "media/custom-celebrity-recognition/chris02.png", celebrityId)

#### Index face 3

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/chris03.png"})))

In [None]:
indexFace(bucketName, "media/custom-celebrity-recognition/chris03.png", celebrityId)

In [None]:
# Describe Rekognition Collection
# With three faces indexed for celebrity 1, you shoud now see FaceCount: 3

describeCollectionResponse = rekognition.describe_collection(
    CollectionId=collectionId
)
display("FaceCount: {0}".format(describeCollectionResponse["FaceCount"]))

### Index second celebrity

In [None]:
# Index Celebrity 2
celebrityId = "2"
celebrityName = "Kashif Imran"
celebrityUrl = "http://aws.amazon.com"

In [None]:
addCelebrityToDynamoDB(celebrityId, celebrityName, celebrityUrl)

#### Index face 1

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/kashif01.jpg"})))

In [None]:
indexFace(bucketName, "media/custom-celebrity-recognition/kashif01.jpg", celebrityId)

#### Index face 2

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/kashif02.jpg"})))

In [None]:
indexFace(bucketName, "media/custom-celebrity-recognition/kashif02.jpg", celebrityId)

#### Index face 3

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': "media/custom-celebrity-recognition/kashif03.jpg"})))

In [None]:
indexFace(bucketName, "media/custom-celebrity-recognition/kashif03.jpg", celebrityId)

In [None]:
# Describe Rekognition Collection
# You should now have FaceCount: 6 since we have indexed 3 faces for each of the 2 celebrities we indexed.
describeCollectionResponse = rekognition.describe_collection(
    CollectionId=collectionId
)
display("FaceCount: {0}".format(describeCollectionResponse["FaceCount"]))

## Recognize custom celebrities in image
***
Now let us try the image with custom celebrities and see if we can recognize people in that image.


In [None]:
imageName = "media/custom-celebrity-recognition/serverless-bytes.png"

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': imageName})))

#### Call Rekognition to recognize custom celebrity in image by using face search

In [None]:
searchFacesResponse = rekognition.search_faces_by_image(
    CollectionId=collectionId,
    Image={
        'S3Object': {
            'Bucket': bucketName,
            'Name': imageName,
        }
    },
    MaxFaces=2,
    FaceMatchThreshold=95
)

#### Review raw JSON response of search face by image API call

In [None]:
# You will see Rekognition response with SearchedFaceBoundingBox (which contains information about the biggest face
# in the image). Rekognition also returns FaceMatches, a list of matched faces. Each matched face has additional
# information including FaceId, ImageId and ExternalImageId. We will use ExternalImageId to extract information
# from DynamoDB about this celebrity.

display(searchFacesResponse)

#### Display image with bounding box around recognized custom celebrity

In [None]:
# Define functions to show image and bounded boxes around recognized celebrities
  
def displayWithBoundingBoxes (sourceImage, boxes):
    # blue, green, red, grey
    colors = ((220,220,220),(230,230,230),(76,182,252),(52,194,123))
    
    # Download image locally
    imageLocation = tempFolder+os.path.basename(sourceImage)

    s3.download_file(bucketName, sourceImage, imageLocation)

    # Draws BB on Image
    bbImage = Image.open(imageLocation)
    draw = ImageDraw.Draw(bbImage)
    width, height = bbImage.size
    col = 0
    maxcol = len(colors)
    line= 3
    for box in boxes:
        x1 = int(box[1]['Left'] * width)
        y1 = int(box[1]['Top'] * height)
        x2 = int(box[1]['Left'] * width + box[1]['Width'] * width)
        y2 = int(box[1]['Top'] * height + box[1]['Height']  * height)
        
        draw.text((x1,y1),box[0],colors[col])
        for l in range(line):
            draw.rectangle((x1-l,y1-l,x2+l,y2+l),outline=colors[col])
        col = (col+1)%maxcol
    
    imageFormat = "PNG"
    ext = sourceImage.lower()
    if(ext.endswith('jpg') or ext.endswith('jpeg')):
       imageFormat = 'JPEG'

    bbImage.save(imageLocation,format=imageFormat)

    display(bbImage)
    
def getDynamoDBItem(itemId):
    ddbGetItemResponse = dynamodb.get_item(
        Key={'id': {'S': itemId} },
        TableName=ddbTableName
    )
    
    itemToReturn = ('', '', '')
    
    if('Item' in ddbGetItemResponse):
        itemToReturn = (ddbGetItemResponse['Item']['id']['S'], 
                ddbGetItemResponse['Item']['name']['S'],
                ddbGetItemResponse['Item']['url']['S'])
    
    return itemToReturn



In [None]:
# After your run this cell you should see one of the faces recognized using Amazon Rekognition.
# You only see one face recognized in this example because
# SearchFacesByImage, for a given input image, first detects the largest face in the image,
# and then searches the specified collection for matching faces.

# In next section we will use DetectFaces API call to first detect faces in the image and then
# use SearchFacesByImage for each detected face to get it recognized.

def displaySearchedFace(sfr):  

    boxes = []
    
    if(len(sfr['FaceMatches']) > 0):
        bb = sfbb = sfr['SearchedFaceBoundingBox']
        eid = sfr['FaceMatches'][0]['Face']['ExternalImageId']
        conf = sfr['FaceMatches'][0]['Similarity']

        celeb = getDynamoDBItem(eid)

        boxes.append(("{0}-{1}-{2}%".format(celeb[0], celeb[1], round(conf,2)), bb))

        displayWithBoundingBoxes(imageName, boxes)

displaySearchedFace(searchFacesResponse)

## Recognize all custom celebrities in image
***
Now let us try an image with custom celebrities and see if we can recognize all people in that image. To recognize all faces in the image, we will first call detect faces and then for each face using face search API to recognize each face in the image. 

In [None]:
imageName = "media/custom-celebrity-recognition/serverless-bytes.png"

#### Define helper functions to detect faces, crop faces in the main image, and then recognize each face

In [None]:
def detectFaces():
    detectFacesResponse = rekognition.detect_faces(
        Image={
            'S3Object': {
                'Bucket': bucketName,
                'Name': imageName
                }
            },
        Attributes=['DEFAULT'])
    return detectFacesResponse

In [None]:
def getFaceCrop(imageBinary, box, image_width, image_height):
    
    x1 = int(box['Left'] * image_width)-25
    y1 = int(box['Top'] * image_height)-25
    x2 = int(box['Left'] * image_width + box['Width'] * image_width)+25
    y2 = int(box['Top'] * image_height + box['Height']  * image_height)+25
    if x1 < 0 : x1=0
    if y1 < 0 : y1=0
    if x2 < 0 : x2=image_width
    if y2 < 0 : y2=image_height 
           
    coordinates = (x1,y1,x2,y2)

    image_crop = imageBinary.crop(coordinates)
    stream2 = BytesIO() 
    
    iformat = "JPEG"
    if(imageName.lower().endswith("png")):
        iformat = "PNG"
    
    image_crop.save(stream2,format=iformat)
    image_region_binary = stream2.getvalue()    
    stream2.close()
    
    return image_region_binary

In [None]:
def recognizeFace(faceCrop):
    searchFacesResponse = rekognition.search_faces_by_image(
        CollectionId=collectionId,
        Image={
            'Bytes': faceCrop
            },
        MaxFaces=2,
        FaceMatchThreshold=95
    )

    if(len(searchFacesResponse['FaceMatches']) > 0):
        eid = searchFacesResponse['FaceMatches'][0]['Face']['ExternalImageId']
        conf = searchFacesResponse['FaceMatches'][0]['Similarity']
        celeb = getDynamoDBItem(eid)

        return "{0}-{1}-{2}%".format(celeb[0], celeb[1], round(conf,2))
    else:
        return ""


In [None]:
def recognizeAllCustomCelebrities():
    detectedFaces = detectFaces()
    
    # Download image locally
    imageLocation = tempFolder+os.path.basename(imageName)
    s3.download_file(bucketName, imageName, imageLocation)

    imageBinary = Image.open(imageLocation)
    width, height = imageBinary.size 
    
    boxes = []
    for detectedFace in detectedFaces['FaceDetails']:
        faceCrop = getFaceCrop(imageBinary, detectedFace['BoundingBox'], width, height)
        recognizedFace = recognizeFace(faceCrop)
        if(recognizedFace):
            boxes.append((recognizedFace, detectedFace['BoundingBox']))
        else:
            boxes.append(("Unrecognized Face", detectedFace['BoundingBox']))
    displayWithBoundingBoxes(imageName, boxes)

In [None]:
recognizeAllCustomCelebrities()

## Recognize custom celebrities in video
***

In [None]:
videoName = "media/custom-celebrity-recognition/serverless-bytes.mov"

#### Start face search job to find faces in the video that match faces in our Rekognition collection

In [None]:
startFaceSearchResponse = rekognition.start_face_search(
    Video={
        'S3Object': {
            'Bucket': bucketName,
            'Name': videoName
        }
    },
    FaceMatchThreshold=90,
    CollectionId=collectionId,
)


faceSearchJobId = startFaceSearchResponse['JobId']
display("Job ID: {0}".format(faceSearchJobId))

#### Wait until the face search job is complete

In [None]:
getFaceSearch = rekognition.get_face_search(
    JobId=faceSearchJobId,
    SortBy='TIMESTAMP'
)

while(getFaceSearch['JobStatus'] == 'IN_PROGRESS'):
    time.sleep(5)
    print('.', end='')
 
    getFaceSearch = rekognition.get_face_search(
    JobId=faceSearchJobId,
    SortBy='TIMESTAMP'
)
    
display(getFaceSearch['JobStatus'])

#### Review raw JSON response from Rekognition

In [None]:
display(getFaceSearch)

#### Show recognized custom celebrities in the video

In [None]:
theCelebs = {}

# Display timestamps and celebrites detected at that time
strDetail = "Celebrites detected in vidoe<br>=======================================<br>"
strOverall = "Celebrities in the overall video:<br>=======================================<br>"

# Faces detected in each frame
for person in getFaceSearch['Persons']:
    if('FaceMatches' in person and len(person["FaceMatches"])> 0):
        ts = person["Timestamp"]
        theFaceMatches = {}
        for fm in person["FaceMatches"]:
            conf = fm["Similarity"]
            eid =  fm["Face"]["ExternalImageId"]
            if(eid not in theFaceMatches):
                theFaceMatches[eid] = (eid, ts, round(conf,2))
            if(eid not in theCelebs):
                theCelebs[eid] = (getDynamoDBItem(eid))
        for theFaceMatch in theFaceMatches:
            celeb = theCelebs[theFaceMatch]
            fminfo = theFaceMatches[theFaceMatch]
            strDetail = strDetail + "At {0} ms<br> {2} (ID:{1}) Conf: {4}%<br>".format(fminfo[1],
                       celeb[0], celeb[1], celeb[2], fminfo[2])

# Unique faces detected in video
for theCeleb in theCelebs:
    tc = theCelebs[theCeleb]
    strOverall = strOverall + "{1} (ID: {0})<br>".format(tc[0], tc[1], tc[2])

# Display results
display(HTML(strOverall))
    

#### Display video in player

In [None]:
# Display video in player

s3VideoUrl = s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': videoName})

videoTag = "<video controls='controls' autoplay width='640' height='360' name='Video' src='{0}'></video>".format(s3VideoUrl)

videoui = "<table><tr><td style='vertical-align: top'>{}</td><td>{}</td></tr></table>".format(videoTag, strDetail)

display(HTML(videoui))

### Index additional faces of known celebrities to improve recognition of these celebrities
You can further improve the performance of your solution by indexing faces of celebrities that the Rekognition celebrity API can already recognize for most of your media, but might not perfom as well in certain situations. Below we are indexing a few images of Jeremy Clarkson and Richard Hammond even though they are recognized well by Rekognition's celebrity API. We are using same ID for them that Rekognition Celebrity API returns, so we can detect when both Celebrity API and FaceAPI recognize same celebrity in a frame.

In [None]:
# Index Celebrity 3
celebrityId = "2mW0ey5n"
celebrityName = "Jeremy Clarkson"
celebrityUrl = "https://www.imdb.com/name/nm0165087/"

addCelebrityToDynamoDB(celebrityId, celebrityName, celebrityUrl)

indexFace(bucketName, "media/celebrity-recognition/jc04.png", celebrityId)
indexFace(bucketName, "media/celebrity-recognition/jc05.png", celebrityId)

In [None]:
# Index Celebrity 4
celebrityId = "4TK3NJ"
celebrityName = "Richard Hammond"
celebrityUrl = "https://www.imdb.com/name/nm1414369/"

addCelebrityToDynamoDB(celebrityId, celebrityName, celebrityUrl)

indexFace(bucketName, "media/celebrity-recognition/rh01.png", celebrityId)