# Personal Protective Equipment Detection using Amazon Rekognition

***
This notebook provides a walkthrough of [PPE Detection API](https://docs.aws.amazon.com/rekognition/latest/dg/ppe-detection.html) in Amazon Rekognition to detect Personal Protective Equipment (PPE) worn by persons.
***

# Initialize stuff

In [None]:
# Install necessary packages
!pip install opencv-python-headless

In [None]:
# Initialise Notebook
import boto3
from IPython.display import HTML, display, Image as IImage
from PIL import Image, ImageDraw, ExifTags, ImageColor
import json
import cv2
import math
import os
import io

In [None]:
# Curent AWS Region. Use this to choose corresponding S3 bucket with sample content

mySession = boto3.session.Session()
awsRegion = mySession.region_name

In [None]:
# Init clients
rekognition = boto3.client('rekognition')
s3 = boto3.client('s3')

In [None]:
# S3 bucket that contains sample images and videos

# We are providing sample images and videos in this bucket so
# you do not have to manually download/upload test images and videos.

bucketName = "aws-rek-immersionday-" + awsRegion

In [None]:
# Create temporary directory
# This directory is not needed to call Rekognition APIs.
# We will only use this directory to download images from S3 bucket and draw bounding boxes

!mkdir m1tmp
tempFolder = 'm1tmp/'

# Detect Persons wearing PPEs in image
***

In [None]:
imageName = "media/ppe-detection/ppe-image-group.jpg"
#imageName = "media/ppe-detection/ppe-image-single.jpg"

In [None]:
display(IImage(url=s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': imageName})))

#### Call Rekognition to detect persons and PPE

In [None]:
# Call Amazon Rekognition to detect PPE in the image
# https://docs.aws.amazon.com/rekognition/latest/dg/ppe-procedure-image.html
detectPPEResponse = rekognition.detect_protective_equipment(
        Image={
            'S3Object': {
                'Bucket': bucketName,
                'Name': imageName
            }
        },
        SummarizationAttributes={
            'MinConfidence': 90,
            'RequiredEquipmentTypes': [
                'FACE_COVER',
                'HEAD_COVER',
                'HAND_COVER', #you can remove required equipment type from the list based on your requirements
            ]
        }
    )

# print(detectPPEResponse)

#### Review the raw JSON reponse from Rekognition

In [None]:
# Show JSON response returned by Rekognition PPE Detection API
# In the JSON response below, you will see detected body parts, confidence score and additional information.

display(detectPPEResponse)

#### Call Rekognition to detect persons and PPE

In [None]:
# Call Amazon Rekognition to detect PPE in the image
# https://docs.aws.amazon.com/rekognition/latest/dg/ppe-procedure-image.html
detectPPEResponse = rekognition.detect_protective_equipment(
        Image={
            'S3Object': {
                'Bucket': bucketName,
                'Name': imageName
            }
        },
        SummarizationAttributes={
            'MinConfidence': 90,
            'RequiredEquipmentTypes': [
                'FACE_COVER',
                'HEAD_COVER',
                #'HAND_COVER', #only detecting face and head cover
            ]
        }
    )

# print(detectPPEResponse)

#### Review the raw JSON reponse from Rekognition

In [None]:
# Show JSON response returned by Rekognition PPE Detection API
# As we changed the required equipment type list above, 
# you will see that Rekognition detected that all persons in the image are wearing required PPEs

display(detectPPEResponse)

#### Drawing bounding boxes around PPE

In [None]:
# it will draw green bounding box if specific PPE is found
# it will draw warning yellow bounding box within face mask bounding box if confidence is lower than supplied value
# if specific PPE (such as hand cover, face cover etc.) is not found, it will draw red bounding box around body/person

def detect_ppe(sourceImage, confidence):

    fill_green='#00d400'
    fill_red='#ff0000'
    fill_yellow='#ffff00'
    line_width=3
    
    imageLocation = tempFolder+os.path.basename(sourceImage)
    s3.download_file(bucketName, sourceImage, imageLocation)

    # open image and get image data from stream.
    image = Image.open(open(imageLocation,'rb'))
    stream = io.BytesIO()
    image.save(stream, format=image.format)    
    image_binary = stream.getvalue()
    imgWidth, imgHeight = image.size  
    draw = ImageDraw.Draw(image)  
    

    for person in detectPPEResponse['Persons']:
        
        found_mask=False

        for body_part in person['BodyParts']:
            ppe_items = body_part['EquipmentDetections']
                 
            for ppe_item in ppe_items:
                # found a mask
                if ppe_item['Type'] == 'FACE_COVER': # you can change it to "HEAD_COVER" or "HAND_COVER"
                    fill_color=fill_green
                    found_mask=True
                    # check if mask covers face
                    if ppe_item['CoversBodyPart']['Value'] == False:
                        fill_color=fill='#ff0000'
                    # draw bounding box around mask
                    box = ppe_item['BoundingBox']
                    left = imgWidth * box['Left']
                    top = imgHeight * box['Top']
                    width = imgWidth * box['Width']
                    height = imgHeight * box['Height']
                    points = (
                            (left,top),
                            (left + width, top),
                            (left + width, top + height),
                            (left , top + height),
                            (left, top)
                        )
                    draw.line(points, fill=fill_color, width=line_width)

                     # Check if confidence is lower than supplied value       
                    if ppe_item['CoversBodyPart']['Confidence'] < confidence:
                        # draw warning yellow bounding box within face mask bounding box
                        offset=line_width+ line_width 
                        points = (
                                    (left+offset,top + offset),
                                    (left + width-offset, top+offset),
                                    ((left) + (width-offset), (top-offset) + (height)),
                                    (left+ offset , (top) + (height -offset)),
                                    (left + offset, top + offset)
                                )
                        draw.line(points, fill=fill_yellow, width=line_width)
                
        if found_mask==False:
            # no face mask found so draw red bounding box around body
            box = person['BoundingBox']
            left = imgWidth * box['Left']
            top = imgHeight * box['Top']
            width = imgWidth * box['Width']
            height = imgHeight * box['Height']
            points = (
                (left,top),
                (left + width, top),
                (left + width, top + height),
                (left , top + height),
                (left, top)
                )
            draw.line(points, fill=fill_red, width=line_width)

    display(image)

In [None]:
confidence=80 # confidence if the bodypart is covered by the particular PPE 
detect_ppe(imageName, confidence)

# Recognize persons wearing PPEs in video

- First we sample our video into individual image frames.
- We can then call `detect_protective_equipment` to detect persons wearing PPEs in the image frame
***

In [None]:
videoName = "media/ppe-detection/ppe-video.mov"
videoFile = tempFolder + "ppe-video.mov"

s3.download_file(bucketName, videoName, videoFile)

#### Call Rekognition to start a job for object detection

In [None]:
# Start video label recognition job
ppeLabels = []    
cap = cv2.VideoCapture(videoFile)
frameRate = cap.get(5) # frame rate
while(cap.isOpened()):
    frameId = cap.get(1) # current frame number
    print("Processing frame id: {}".format(frameId))
    ret, frame = cap.read()
    if (ret != True):
        break
    if (frameId % math.floor(frameRate) == 0):
        hasFrame, imageBytes = cv2.imencode(".jpg", frame)

        if(hasFrame):
            response = rekognition. detect_protective_equipment(
                Image={
                    'Bytes': imageBytes.tobytes(),
                }
            )
        
        for person in response["Persons"]:
            person["Timestamp"] = (frameId/frameRate)*1000
            ppeLabels.append(person)

#### Show video in the player

In [None]:
# Show video in a player

s3VideoUrl = s3.generate_presigned_url('get_object', Params={'Bucket': bucketName, 'Key': videoName})

videoTag = "<video controls='controls' autoplay width='640' height='360' name='Video' src='{0}'></video>".format(s3VideoUrl)

videoui = "<table><tr><td style='vertical-align: top'>{}</td></tr></table>".format(videoTag)

display(HTML(videoui))

#### Review raw JSON reponse from Rekognition

In [None]:
# Show JSON response returned by Rekognition PPE Detection API
# In the JSON response below, you will see list of detected body parts and timestamp.
# For each detected body part, you will see information like name of the body part and confidence
# If PPE is detected, it will provide confidence if the body part is covered and bounding box

display(ppeLabels)

#### Export the results

In [None]:
with open(videoFile + ".json", "w") as f:
    f.write(json.dumps(ppeLabels)) 

cap.release()

# Recognize persons wearing PPEs in a live stream
***

In [None]:
# This is a sample code to detect PPE in a live stream. 
# Before you run this code, you would need to set up an RTSP stream on a publicly accessible IP.


"""
import cv2
import boto3
import time
from datetime import datetime
import json

def processFrame(videoStreamUrl):
    cap = cv2.VideoCapture(videoStreamUrl)
    ret, frame = cap.read()
    if ret:
        hasFrame, imageBytes = cv2.imencode(".jpg", frame)
        if hasFrame:
            session = boto3.session.Session()
            rekognition = session.client('rekognition')
            response = rekognition. detect_protective_equipment(
                    Image={
                        'Bytes': imageBytes.tobytes(),
                    }
                )
            print(response)
    cap.release()

# Video stream
videoStreamUrl = "rtsp://@192.168.10.100" # change this to your RTSP stream location
frameCaptureThreshold = 300

while (True):
    try:
        processFrame(videoStreamUrl)
    except Exception as e:
        print("Error: {}.".format(e))

    time.sleep(frameCaptureThreshold)

"""

***
### References
- https://docs.aws.amazon.com/rekognition/latest/APIReference/API_DetectProtectiveEquipment.html
- https://docs.aws.amazon.com/rekognition/latest/dg/ppe-request-response.html

***

You have successfully used Amazon Rekognition to identify personal protective equipment in images and videos.