# Kinesis Video Streams Sample Application

This notebook shows how to create an application for Panorama to support multiple cameras output via Amazon Kinesis Video Streams.

By completing this notebook, you will learn:

- How to write a Python script for your application that creates multiple real-time video streaming via Amazon Kinesis Video Streams and AWS IoT
- How to programmatically package and deploy applications using the Panorama CLI
- How to use abstract camera node and input node to override the configuration to support multiple cameras and Amazon Kinesis Video Streams programmatically

---
1. Prerequisites
2. Set up
3. Package application
4. Deploy application to device
5. Clean up

# Prerequisites

In a terminal session on this Jupyter notebook server, run `aws configure`. This allows this notebook server to access Panorama resources and deploy applications on your behalf.

# Set up

Import libraries for use with this notebook environment, you do not need these libraries when you write your application code.

In [None]:
import sys
import os
import time
import json
import uuid

import boto3

sys.path.insert( 0, os.path.abspath( '../common/test_utility' ) )
import panorama_test_utility

In [None]:
account_id = boto3.client('sts').get_caller_identity()['Account']
region_name = boto3.session.Session().region_name
app_name = 'kinesis_video_streams_app'
code_package_name = 'kinesis_video_streams_code'

print( 'account_id :', account_id )
print( 'region_name :', region_name )

# Following configurations are required when you use real hardware, 
# thus can be any dummy strings when you use only Test Utility.
device_id = input( "Device Id (format : device-*)" ).strip()

# Enter application role to be deployed in panorama device (Role with permission for accessing AWS IoT is a must.
# You can grant other AWS service permissions to this role if needed.) Format: arn:aws:iam::{account}:role/{roleName}
application_role_arn = input( 'Application ROLE ARN' ).strip()

print(device_id)

## Set up application

Every application uses the creator's AWS Account ID as the prefix to uniquely identifies the application resources. Running `panorama-cli import-application` replaces the generic account Id with your account Id.

In [None]:
!cd ./kinesis_video_streams_app/ && panorama-cli import-application

## Create AWS IoT Thing and credential

To obtain Amazon Kinesis Video Streams service access token, there are two ways to get it: one is to use [Identity and Access Management(IAM)](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-iam.html) and the other is to use [AWS IoT thing's credential](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-iot.html). In order to support multiple cameras without creating too many IAM roles, using AWS IoT Thing credential to represent camera is highly recommended for managing control access policy. Please follow the steps to create the related AWS IoT Thing, credential and policy.

In [None]:
# Please modify prefix if needed.
IAM_ROLE_NAME = 'KinesesVideoStreamsPanramaSampleIAMRole' + str(int(time.time()))
IAM_POLICY_NAME = 'KinesesVideoStreamsPanramaSampleIAMPolicy' + str(int(time.time()))
IOT_ROLE_ALIAS_NAME = 'KinesesVideoStreamsPanramaSampleIotRoleAlias' + str(int(time.time()))
IOT_POLICY_NAME = 'KinesesVideoStreamsPanramaSampleIotPolicy' + str(int(time.time()))

IOT_CERT_ARN = {}
IOT_PUBLIC_KEY = {}
IOT_PRIVATE_KEY = {}
IOT_CERT_PEM = {}
IOT_CERT_ID = {}

iam_client = boto3.client('iam')
iot_client = boto3.client('iot')

# Load sample assume role policy from templates/.
with open('./templates/iam-policy-document.json') as iam_policy_json:
 iam_policy_payload = json.load(iam_policy_json)
 try:
 res = iam_client.create_role(
 RoleName=IAM_ROLE_NAME,
 AssumeRolePolicyDocument=json.dumps(iam_policy_payload)
 )
 IAM_ROLE_ARN = res['Role']['Arn']
 except Exception as e:
 print('Failed on IAM create_role')
 print(e)

# Load sample kinesis video streams policy from templates/.
with open("./templates/iam-permission-document.json") as iam_permission_json:
 iam_permission_payload = json.load(iam_permission_json)
 try:
 res = iam_client.put_role_policy(
 RoleName=IAM_ROLE_NAME,
 PolicyName=IAM_POLICY_NAME,
 PolicyDocument=json.dumps(iam_permission_payload)
 )
 except Exception as e:
 print('Failed on IAM put_role_policy')
 print(e)

# Create IoT Role Alias.
try:
 res = iot_client.create_role_alias(
 roleAlias=IOT_ROLE_ALIAS_NAME,
 roleArn=IAM_ROLE_ARN,
 credentialDurationSeconds=3600
 )
 IOT_ROLE_ALIAS_ARN = res['roleAliasArn']
except Exception as e:
 print('Failed on IoT create_role_alias')
 print(e)
 
# Create IoT Policy and attach role alias.
with open("./templates/iot-policy-document.json") as iot_policy_json:
 iot_policy_payload = json.load(iot_policy_json)
 for statement in iot_policy_payload['Statement']:
 statement['Resource'] = IOT_ROLE_ALIAS_ARN
 
 try:
 res = iot_client.create_policy(
 policyName=IOT_POLICY_NAME,
 policyDocument=json.dumps(iot_policy_payload)
 )
 except Exception as e:
 print('Failed on IoT create_policy')
 print(e)

# Create IoT Thing, here we use camera name as AWS IoT Thing Name.
# These camera names will also be used to crate AWS Panorama Data Sources in the following step.
CAMERAS = [x.strip() for x in input("Please input with the unique camera names (format : cam-1,cam-2,...): ").split(",")]
cert_path = f'./{app_name}/packages/{account_id}-{code_package_name}-1.0/src/certs/'

for CAMERA in CAMERAS:
 # Create IoT Thing.
 try:
 res = iot_client.create_thing(thingName=CAMERA)
 except Exception as e:
 print('Failed on IoT create_thing')
 print(e)

 # Create IoT certificate, private key and public key. Then save them in target folder.
 try:
 data = iot_client.create_keys_and_certificate(setAsActive = True)
 credentials = json.loads(json.dumps(data, sort_keys=False, indent=4))
 
 IOT_CERT_ARN[CAMERA] = credentials['certificateArn']
 IOT_PUBLIC_KEY[CAMERA] = credentials['keyPair']['PublicKey']
 IOT_PRIVATE_KEY[CAMERA] = credentials['keyPair']['PrivateKey']
 IOT_CERT_PEM[CAMERA] = credentials['certificatePem']
 IOT_CERT_ID[CAMERA] = credentials['certificateId']
 
 with open(f'{cert_path}/{CAMERA}-public.key', 'w') as outfile:
 outfile.write(IOT_PUBLIC_KEY[CAMERA])
 
 with open(f'{cert_path}/{CAMERA}-private.key', 'w') as outfile:
 outfile.write(IOT_PRIVATE_KEY[CAMERA])
 
 with open(f'{cert_path}/{CAMERA}-cert.pem', 'w') as outfile:
 outfile.write(IOT_CERT_PEM[CAMERA])
 except Exception as e:
 print('Failed on IoT create_keys_and_certificate')
 print(e)
 
 # Attach IoT policy to certificate and attach certificate to IoT thing.
 try:
 res = iot_client.attach_policy(
 policyName = IOT_POLICY_NAME,
 target = IOT_CERT_ARN[CAMERA]
 )
 except Exception as e:
 print('Failed on IoT attach_policy')
 print(e)
 
 # Attach certificate to IoT thing.
 try:
 res = iot_client.attach_thing_principal(
 thingName = CAMERA,
 principal = IOT_CERT_ARN[CAMERA]
 )
 except Exception as e:
 print('Failed on IoT attach_thing_principal')
 print(e)

# Package app

## Overriding camera node and Amazon Kinesis Video Streams configuration

In this sample, we could support multiple cameras and multiple Amazon KVS streams. Please follow the steps to create override json documentation(in kinesis_video_streams_app/deployment_overrides/override_configuration.json).

In [None]:
# Loading override template and update camera node information
with open(f"./{app_name}/deployment_overrides/override_configuration_template.json") as override_configuration:
 override_configuration_payload = json.load(override_configuration)
 for CAMERA in CAMERAS:
 override_configuration_payload['nodeGraphOverrides']['packages'].append({
 'name': '{acc_id}::{name}'.format(acc_id=account_id, name=CAMERA),
 'version': '1.0'
 })
 override_configuration_payload['nodeGraphOverrides']['nodes'].append({
 'name': '{}'.format(CAMERA),
 'interface': '{acc_id}::{name}.{name}'.format(acc_id=account_id, name=CAMERA),
 'overridable': True,
 'overrideMandatory': False,
 'launch': "onAppStart"
 })
 override_configuration_payload['nodeGraphOverrides']['nodeOverrides'][0]['with'].append({
 'name': '{}'.format(CAMERA)
 })

# Update kvs_stream_name, using camera name as kvs stream name
kvs_stream_names = ','.join(CAMERAS)
override_configuration_payload['nodeGraphOverrides']['nodes'].append({
 'name': 'override_kvs_stream_name',
 "interface": "string",
 "value": '{}'.format(kvs_stream_names),
 "overridable": True,
})

# Update AWS IoT role alias
override_configuration_payload['nodeGraphOverrides']['nodes'].append({
 'name': 'override_iot_role_alias',
 "interface": "string",
 "value": '{}'.format(IOT_ROLE_ALIAS_NAME),
 "overridable": True,
})

# Update Amazon KVS service region. Theoretically we could support running Panorama in one region and servicing Amazon KVS in another region
override_configuration_payload['nodeGraphOverrides']['nodes'].append({
 'name': 'override_kvs_region',
 "interface": "string",
 "value": '{}'.format(region_name),
 "overridable": True,
})

# Create and write content to override documentation
with open(f"./{app_name}/deployment_overrides/override_configuration.json", "w") as override_configuration_final:
 json.dump(override_configuration_payload, override_configuration_final, indent=4)

### Preview override-manifest file("override_configuration.json")

In [None]:
panorama_test_utility.preview_text_file( f"./{app_name}/deployment_overrides/override_configuration.json" )

### Create New Camera

Because we are using an ```abstract_rtsp_media_source```, we have to create a camera or serval cameras before we can use the ```abstract_rtsp_media_source```

**NOTE** : Update your RTSP Info in the next cell, Username, Password and RTSP Stream URL

In [None]:
panorama_client = boto3.client('panorama')

for CAMERA in CAMERAS:
 # Please input your camera credential.
 print('You are going to input credential for {}'.format(CAMERA))
 CAMERA_CREDS = {
 "Username": input("Please input with user name of camera: "),
 "Password": input("Please input with password of camera: "),
 "StreamUrl": input("Please input with rtsp address of camera including port: ")
 }

 try:
 res = panorama_client.create_node_from_template_job(
 NodeName=CAMERA,
 OutputPackageName=CAMERA,
 OutputPackageVersion='1.0',
 TemplateParameters=CAMERA_CREDS,
 TemplateType='RTSP_CAMERA_STREAM'
 )
 print(res)
 except Exception as e:
 print(f'Failed on Panorama create_node_from_template_job :{CAMERA}')
 print(e)

### Build application with container

In [None]:
container_asset_name = 'kinesis_video_streams_code'

In [None]:
%%capture captured_output

# Building container image.This process takes time (5min ~ 10min)
# FIXME : without %%capture, browser tab crashes because of too much output from the command.

!cd ./kinesis_video_streams_app && panorama-cli build \
 --container-asset-name {container_asset_name} \
 --package-path packages/{account_id}-{code_package_name}-1.0

In [None]:
stdout_lines = captured_output.stdout.splitlines()
stderr_lines = captured_output.stderr.splitlines()
print(" :")
print(" :")
for line in stdout_lines[-30:] + stderr_lines[-30:]:
 print(line)

### Upload application to Panorama for deploying to devices

In [None]:
# This step takes some time, depending on your network environment.
!cd ./kinesis_video_streams_app && panorama-cli package-application

### Ready for deploying to a device

Congrats! Your app is now ready to deploy to a device. Next, you can continue in this notebook to deploy the app programmatically or you can go to the Panorama console and deploying using the AWS Console. The console makes it easier to select camera streams and select the devices you want to deploy to. Programmatic deployment is faster to complete and easier to automate.

# Deploy application to device

Let's make sure the device we are deploying to is available.

In [None]:
response = panorama_client.describe_device(
 DeviceId= device_id
)

print('You are deploying to Device: {}'.format(response['Name']))

## Deploy application

You are ready to deploy your app.

In [None]:
with open(f"./{app_name}/graphs/{app_name}/graph.json") as fd:
 manifest_payload = json.load(fd)
 
with open(f"./{app_name}/deployment_overrides/override_configuration.json") as fd:
 override_payload = json.load(fd)


In [None]:
try:
 resp = panorama_client.create_application_instance(
 Name=app_name,
 ManifestPayload={'PayloadData': json.dumps(manifest_payload)},
 ManifestOverridesPayload={'PayloadData': json.dumps(override_payload)},
 DefaultRuntimeContextDevice=device_id,
 RuntimeRoleArn=application_role_arn
 )
except Exception as e:
 # raise e
 print(e)

In [None]:
app_id = resp['ApplicationInstanceId']
print( "Application Instance Id :", app_id )

progress_dots = panorama_test_utility.ProgressDots()
while True:
 response = panorama_client.describe_application_instance( ApplicationInstanceId = app_id )
 status = response['Status']
 progress_dots.update_status( f'{status} ({response["StatusDescription"]})' )
 if status in ('DEPLOYMENT_SUCCEEDED','DEPLOYMENT_FAILED'):
 break
 time.sleep(60)

## Verify from Amazon Kinesis Video Streams console

In [None]:
# Get Amazon Kinesis Video Streams HLS playback url
kvs_client = boto3.client('kinesisvideo')

for CAMERA in CAMERAS:
 # Get data endpoint
 KVS_DATA_ENDPOINT = kvs_client.get_data_endpoint(
 StreamName=CAMERA,
 APIName='GET_HLS_STREAMING_SESSION_URL'
 )['DataEndpoint']
 
 kvs_media_client = boto3.client('kinesis-video-archived-media',endpoint_url=KVS_DATA_ENDPOINT)
 try:
 # Get live HLS stream url for 12 hours(43200 minutes)
 KVS_HLS_URL = kvs_media_client.get_hls_streaming_session_url(
 StreamName=CAMERA,
 PlaybackMode='LIVE',
 Expires=43200
 )['HLSStreamingSessionURL']
 print(KVS_HLS_URL)
 except Exception as e:
 print(f'Failed on get_hls_streaming_session_url :{CAMERA}')
 print(e)
 
print('Paste the URL on Safari, QuickTime Player, or player that supports streaming in HLS format, such as hls.js')
print('If you want to check the video stream on Amazon Kinesis Video Streams console, Chrome browser is highly recommended')

In [None]:
# Store the arn of Amazon Kinesis Video Streams for cleaning up.
KVS_STREAM_ARN = {}

for CAMERA in CAMERAS:
 try:
 res = kvs_client.describe_stream(
 StreamName=CAMERA
 )
 KVS_STREAM_ARN[CAMERA] = res['StreamInfo']['StreamARN']
 except Exception as e:
 print(f'Failed on get arn, video stream name :{CAMERA}')
 print(e)

# Clean up

In [None]:
# Run this session if you want to delete the resources you created including AWS IoT, AWS Panorama and IAM.
clean_flag = input("Clean the created resources (y/n):").strip()

if (clean_flag == 'y'):
 # Clean up Panorama application
 panorama_test_utility.remove_application( device_id, app_id )
 
 # Clean up AWS IoT resources
 for CAMERA in CAMERAS:
 res = iot_client.update_certificate(certificateId=IOT_CERT_ID[CAMERA], newStatus='INACTIVE')
 res = iot_client.detach_policy(policyName=IOT_POLICY_NAME, target=IOT_CERT_ARN[CAMERA])
 res = iot_client.detach_thing_principal(thingName = CAMERA, principal = IOT_CERT_ARN[CAMERA])
 res = iot_client.delete_certificate(certificateId=IOT_CERT_ID[CAMERA])
 res = iot_client.delete_thing(thingName=CAMERA)
 
 # Clean up Panorama data resources
 try:
 res = panorama_client.delete_package(ForceDelete=True, PackageId=f"packageName/{CAMERA}")
 except Exception as e:
 print(f'Failed on Panorama delete_package :{CAMERA}')
 print(e)
 
 # Clean up Amazon Kinesis Video Streams resources
 res = kvs_client.delete_stream(
 StreamARN=KVS_STREAM_ARN[CAMERA]
 )
 
 # Clean up IAM resources
 res = iam_client.delete_role_policy(RoleName=IAM_ROLE_NAME, PolicyName=IAM_POLICY_NAME)
 res = iam_client.delete_role(RoleName=IAM_ROLE_NAME)
 res = iot_client.delete_role_alias(roleAlias=IOT_ROLE_ALIAS_NAME)
 res = iot_client.delete_policy(policyName=IOT_POLICY_NAME)