# Multi-Region IoT - Persist Data

Persist data accross regions by using [Amazon Simple Queue Service](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) (SQS) or DynamoDB global tables.

## Libraries

In [None]:
import boto3
import datetime
import json
import logging
import time

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from os.path import join

#### Note: If you get an error that the AWSIoTPythonSDK is not installed, install the SDK with the command below and import the libraries again!

In [None]:
!pip install AWSIoTPythonSDK -t .

Restore variable that have been defined in the notebook which has been used to create the ACM PCA setup.

In [None]:
%store -r config
print("config: {}".format(json.dumps(config, indent=4, default=str)))

## Global Vars

For several actions in AWS IoT topic rules a service role is required. This roles allows the IoT Core to access other services. A role has already been created by the CloudFormation stack for the master region. 

The arn for this role can be found in the outputs section of your CloudFormation stack next to the key **IoTAccessServicesRoleArn**.

Set the variable **topic_rule_role_arn** to contain this arn.

In [None]:
topic_rule_role_arn = 'YOUR_ROLE_ARN_HERE'

topic_rule_name_sqs = 'IoTMRCrossRegionSQSRule'
topic_rule_name_sns = 'IoTMRCrossRegionSNSRule'

queue_name = 'IoTCrossRegion'
topic_name = 'IoTCrossRegion'

## IoT Endpoint
To connect a device to the AWS IoT master region we need to get the iot endpoint.

In [None]:
c_iot = boto3.client('iot', region_name = config['aws_region_master'])

Get the iot endpoint.

In [None]:
response = c_iot.describe_endpoint(endpointType='iot:Data-ATS')
iot_endpoint = response['endpointAddress']
print("iot_endpoint: {}".format(iot_endpoint))

## Transfer data with Amazon Simple Queue Service (SQS)

By using SQS in a topic rule data can be send across regions. 

An SQS queue in the slave region will be created and a topic rule in the master region which transfers incoming messages to the SQS queue in the slave region.

SQS client in slave region.

In [None]:
c_sqs = boto3.client('sqs', region_name = config['aws_region_slave'])

### Queue operations
Create the queue and get the queue url. The queue url is required to create the IoT topic rule.

#### Create the queue

In [None]:
response = c_sqs.create_queue(QueueName=queue_name)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

#### Get the queue url

In [None]:
response = c_sqs.get_queue_url(QueueName=queue_name)
print("response: {}\n".format(json.dumps(response, indent=4, default=str)))
queue_url = response['QueueUrl']
print("queue_url: {}".format(queue_url))

## Topic rule

Create the rule the topic rule.

Create the topic rule

In [None]:
response = c_iot.create_topic_rule(
    ruleName=topic_rule_name_sqs,
    topicRulePayload={
        'awsIotSqlVersion': '2016-03-23',
        'sql': 'SELECT * FROM \'cmd/+/cross/region\'',
        'actions': [{
                'sqs': {
                        'roleArn': topic_rule_role_arn,
                        'queueUrl': queue_url,
                        'useBase64': False
                }
            }],
        'ruleDisabled': False
    }
)

print("response: {}".format(json.dumps(response, indent=4, default=str)))

## Verify
Get the topic rule to verify that it has been created successfully.

In [None]:
response = c_iot.get_topic_rule(
    ruleName=topic_rule_name_sqs
)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

## Transfer data with Amazon Simple Notification Service (SNS) and AWS Lambda

Another option to transfer data across regions is the use of SNS in combination with a Lambda function. In this example an SNS topic in the master region will be created. A Lambda function in the slave region was already created by CloudFormation. The Lambda endpoint from the slave region will be subscribed to the SNS topic. 

In [None]:
c_sns = boto3.client('sns', region_name = config['aws_region_master'])

Create the topic.

In [None]:
response = c_sns.create_topic(Name=topic_name)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

Get the topic arn.

In [None]:
topic_arn = response['TopicArn']
print("topic_arn: {}".format(topic_arn))

### Subscribe to SNS topic

The Lambda function in the slave region has been created by the slave CloudFormation stack. 

Set the variable **lambda_arn** to this arn.

The arn for the Lambda can be found in the outputs section of your CloudFormation stack next to the key **CrossRegionLambdaFunctionArn**.

In [None]:
lambda_arn = 'YOUR_LAMBDA_ARN_HERE'
lambda_name = lambda_arn.split(':')[-1]
statement_id = str(int(time.time()))
print("lambda_name: {} statement_id: {}".format(lambda_name, statement_id))

### Add permission to the Lambda function

To allow SNS to invoke the lambda function a permission must be added to the function.

In [None]:
c_lambda = boto3.client('lambda', region_name = config['aws_region_slave'])

In [None]:
response = c_lambda.add_permission(
    FunctionName=lambda_name,
    StatementId=statement_id,
    Action='lambda:invokeFunction',
    Principal='sns.amazonaws.com',
    SourceArn=topic_arn
)

print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_sns.subscribe(
    TopicArn=topic_arn,
    Protocol='lambda',
    Endpoint=lambda_arn,
    ReturnSubscriptionArn=True
)

print("response: {}\n".format(json.dumps(response, indent=4, default=str)))
subscription_arn = response['SubscriptionArn']
print("subscription_arn: {}".format(subscription_arn))

## Create topic rule

Create a topic rule to forward messages to the SNS topic.

In [None]:
response = c_iot.create_topic_rule(
    ruleName=topic_rule_name_sns,
    topicRulePayload={
        'awsIotSqlVersion': '2016-03-23',
        'sql': 'SELECT * FROM \'cmd/+/cross/region\'',
        'actions': [{
                'sns': {
                    'targetArn': topic_arn,
                    'roleArn': topic_rule_role_arn,
                    'messageFormat': 'RAW'
                }
            }],
        'ruleDisabled': False
    }
)

print("response: {}".format(json.dumps(response, indent=4, default=str)))

## Verify
Get the topic rule to verify that it has been created successfully.

In [None]:
response = c_iot.get_topic_rule(ruleName=topic_rule_name_sns)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

## Connect a Device
Connect a device that you created earlier to the message broker from AWS IoT and send some messages. These messages will be forwarded by the topic rules that you created to SQS as well as SNS.

In [None]:
thing_name = 'thing-mr04'
root_ca = 'AmazonRootCA1.pem'

device_key_file = '{}.device.key.pem'.format(thing_name)
device_cert_file = '{}.device.cert.pem'.format(thing_name)

# AWS IoT Python SDK needs logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
#logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)s - %(message)s")
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

myAWSIoTMQTTClient = None
myAWSIoTMQTTClient = AWSIoTMQTTClient(thing_name)
myAWSIoTMQTTClient.configureEndpoint(iot_endpoint, 8883)
myAWSIoTMQTTClient.configureCredentials(root_ca, 
                                        join(config['PCA_directory'], device_key_file), 
                                        join(config['PCA_directory'], device_cert_file))

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# Connect and reconnect to AWS IoT
try:
    myAWSIoTMQTTClient.connect()
except Exception as e:
    logger.error('{}'.format(e))
    time.sleep(5)
    myAWSIoTMQTTClient.connect()

## Publish messages
Publish some messages that should be transferred to the SQS queue in the other region.

**Hint:** Before publishing message subscribe in the test client in the master region in the AWS IoT Core console to the topic `cmd/+/cross/region`. By doing so you can verify that your messages are reaching the IoT Core.

In [None]:
topic = 'cmd/{}/cross/region'.format(thing_name)
print("topic: {}".format(topic))

for i in range(5):
    date_time = datetime.datetime.now().isoformat()
    message = {"thing_name": "{}".format(thing_name), "date_time": date_time, "i": i}
    print("publish: message: {}".format(message))
    myAWSIoTMQTTClient.publish(topic, json.dumps(message), 0)
    time.sleep(1)

## Poll the SQS queue

To verify that the messages has been sent to SQS in the slave region poll the queue for messages. You should get messages from the queue. Feel free to execute polling multiple times.

In [None]:
# Long poll for message on provided SQS queue
response = c_sqs.receive_message(
    QueueUrl=queue_url,
    AttributeNames=[
        'All'
    ],
    MaxNumberOfMessages=10,
    MessageAttributeNames=[
        'All'
    ],
    WaitTimeSeconds=20
)

print("queue_url: {}\n".format(queue_url))

for message in response['Messages']:
    body = message['Body']
    message_id = message['MessageId']
    #print(message)
    print("message_id: {}\nbody: {}\n".format(message_id, body))

## SNS/Lambda

To verify that message have been sent also to the slave region with SNS and Lambda watch at CloudWatch in the slave region for the logs of your Lambda function.

Logs can be found in `CloudWatch -> Logs -> /aws/lambda/<LAMBDA_FUNCTION_NAME`.

## Disconnet
Disconnect the device from AWS IoT Core.

In [None]:
myAWSIoTMQTTClient.disconnect()

## Clean Up
Clean up your environment:

* Remove permissions from the Lambda function
* Unsubscribe the Lambda from the SNS topic
* Delete SNS topic
* Delete IoT topic rules

In [None]:
response = c_lambda.remove_permission(
    FunctionName=lambda_name,
    StatementId=statement_id
)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_sns.unsubscribe(SubscriptionArn=subscription_arn)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_sns.delete_topic(TopicArn=topic_arn)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_iot.delete_topic_rule(ruleName=topic_rule_name_sqs)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_iot.delete_topic_rule(ruleName=topic_rule_name_sns)
print("response: {}".format(json.dumps(response, indent=4, default=str)))

In [None]:
response = c_sns.delete_topic(TopicArn=topic_arn)
print("response: {}".format(json.dumps(response, indent=4, default=str)))