{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multi-Region IoT - Persist Data\n",
    "\n",
    "Persist data accross regions by using [Amazon Simple Queue Service](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) (SQS) or DynamoDB global tables."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import datetime\n",
    "import json\n",
    "import logging\n",
    "import time\n",
    "\n",
    "from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient\n",
    "from os.path import join"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Note: If you get an error that the AWSIoTPythonSDK is not installed, install the SDK with the command below and import the libraries again!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install AWSIoTPythonSDK -t ."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Restore variable that have been defined in the notebook which has been used to create the ACM PCA setup."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r config\n",
    "print(\"config: {}\".format(json.dumps(config, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Global Vars\n",
    "\n",
    "For several actions in AWS IoT topic rules a service role is required. This roles allows the IoT Core to access other services. A role has already been created by the CloudFormation stack for the master region. \n",
    "\n",
    "The arn for this role can be found in the outputs section of your CloudFormation stack next to the key **IoTAccessServicesRoleArn**.\n",
    "\n",
    "Set the variable **topic_rule_role_arn** to contain this arn."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "topic_rule_role_arn = 'YOUR_ROLE_ARN_HERE'\n",
    "\n",
    "topic_rule_name_sqs = 'IoTMRCrossRegionSQSRule'\n",
    "topic_rule_name_sns = 'IoTMRCrossRegionSNSRule'\n",
    "\n",
    "queue_name = 'IoTCrossRegion'\n",
    "topic_name = 'IoTCrossRegion'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## IoT Endpoint\n",
    "To connect a device to the AWS IoT master region we need to get the iot endpoint."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c_iot = boto3.client('iot', region_name = config['aws_region_master'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the iot endpoint."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.describe_endpoint(endpointType='iot:Data-ATS')\n",
    "iot_endpoint = response['endpointAddress']\n",
    "print(\"iot_endpoint: {}\".format(iot_endpoint))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transfer data with Amazon Simple Queue Service (SQS)\n",
    "\n",
    "By using SQS in a topic rule data can be send across regions. \n",
    "\n",
    "An SQS queue in the slave region will be created and a topic rule in the master region which transfers incoming messages to the SQS queue in the slave region."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "SQS client in slave region."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c_sqs = boto3.client('sqs', region_name = config['aws_region_slave'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Queue operations\n",
    "Create the queue and get the queue url. The queue url is required to create the IoT topic rule.\n",
    "\n",
    "#### Create the queue"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sqs.create_queue(QueueName=queue_name)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Get the queue url"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sqs.get_queue_url(QueueName=queue_name)\n",
    "print(\"response: {}\\n\".format(json.dumps(response, indent=4, default=str)))\n",
    "queue_url = response['QueueUrl']\n",
    "print(\"queue_url: {}\".format(queue_url))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Topic rule\n",
    "\n",
    "Create the rule the topic rule."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the topic rule"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.create_topic_rule(\n",
    "    ruleName=topic_rule_name_sqs,\n",
    "    topicRulePayload={\n",
    "        'awsIotSqlVersion': '2016-03-23',\n",
    "        'sql': 'SELECT * FROM \\'cmd/+/cross/region\\'',\n",
    "        'actions': [{\n",
    "                'sqs': {\n",
    "                        'roleArn': topic_rule_role_arn,\n",
    "                        'queueUrl': queue_url,\n",
    "                        'useBase64': False\n",
    "                }\n",
    "            }],\n",
    "        'ruleDisabled': False\n",
    "    }\n",
    ")\n",
    "\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Verify\n",
    "Get the topic rule to verify that it has been created successfully."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.get_topic_rule(\n",
    "    ruleName=topic_rule_name_sqs\n",
    ")\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transfer data with Amazon Simple Notification Service (SNS) and AWS Lambda\n",
    "\n",
    "Another option to transfer data across regions is the use of SNS in combination with a Lambda function. In this example an SNS topic in the master region will be created. A Lambda function in the slave region was already created by CloudFormation. The Lambda endpoint from the slave region will be subscribed to the SNS topic. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c_sns = boto3.client('sns', region_name = config['aws_region_master'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the topic."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sns.create_topic(Name=topic_name)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the topic arn."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "topic_arn = response['TopicArn']\n",
    "print(\"topic_arn: {}\".format(topic_arn))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Subscribe to SNS topic\n",
    "\n",
    "The Lambda function in the slave region has been created by the slave CloudFormation stack. \n",
    "\n",
    "Set the variable **lambda_arn** to this arn.\n",
    "\n",
    "The arn for the Lambda can be found in the outputs section of your CloudFormation stack next to the key **CrossRegionLambdaFunctionArn**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lambda_arn = 'YOUR_LAMBDA_ARN_HERE'\n",
    "lambda_name = lambda_arn.split(':')[-1]\n",
    "statement_id = str(int(time.time()))\n",
    "print(\"lambda_name: {} statement_id: {}\".format(lambda_name, statement_id))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Add permission to the Lambda function\n",
    "\n",
    "To allow SNS to invoke the lambda function a permission must be added to the function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c_lambda = boto3.client('lambda', region_name = config['aws_region_slave'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_lambda.add_permission(\n",
    "    FunctionName=lambda_name,\n",
    "    StatementId=statement_id,\n",
    "    Action='lambda:invokeFunction',\n",
    "    Principal='sns.amazonaws.com',\n",
    "    SourceArn=topic_arn\n",
    ")\n",
    "\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sns.subscribe(\n",
    "    TopicArn=topic_arn,\n",
    "    Protocol='lambda',\n",
    "    Endpoint=lambda_arn,\n",
    "    ReturnSubscriptionArn=True\n",
    ")\n",
    "\n",
    "print(\"response: {}\\n\".format(json.dumps(response, indent=4, default=str)))\n",
    "subscription_arn = response['SubscriptionArn']\n",
    "print(\"subscription_arn: {}\".format(subscription_arn))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create topic rule\n",
    "\n",
    "Create a topic rule to forward messages to the SNS topic."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.create_topic_rule(\n",
    "    ruleName=topic_rule_name_sns,\n",
    "    topicRulePayload={\n",
    "        'awsIotSqlVersion': '2016-03-23',\n",
    "        'sql': 'SELECT * FROM \\'cmd/+/cross/region\\'',\n",
    "        'actions': [{\n",
    "                'sns': {\n",
    "                    'targetArn': topic_arn,\n",
    "                    'roleArn': topic_rule_role_arn,\n",
    "                    'messageFormat': 'RAW'\n",
    "                }\n",
    "            }],\n",
    "        'ruleDisabled': False\n",
    "    }\n",
    ")\n",
    "\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Verify\n",
    "Get the topic rule to verify that it has been created successfully."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.get_topic_rule(ruleName=topic_rule_name_sns)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Connect a Device\n",
    "Connect a device that you created earlier to the message broker from AWS IoT and send some messages. These messages will be forwarded by the topic rules that you created to SQS as well as SNS."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "thing_name = 'thing-mr04'\n",
    "root_ca = 'AmazonRootCA1.pem'\n",
    "\n",
    "device_key_file = '{}.device.key.pem'.format(thing_name)\n",
    "device_cert_file = '{}.device.cert.pem'.format(thing_name)\n",
    "\n",
    "# AWS IoT Python SDK needs logging\n",
    "logger = logging.getLogger(\"AWSIoTPythonSDK.core\")\n",
    "#logger.setLevel(logging.DEBUG)\n",
    "logger.setLevel(logging.INFO)\n",
    "streamHandler = logging.StreamHandler()\n",
    "formatter = logging.Formatter(\"[%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)s - %(message)s\")\n",
    "streamHandler.setFormatter(formatter)\n",
    "logger.addHandler(streamHandler)\n",
    "\n",
    "myAWSIoTMQTTClient = None\n",
    "myAWSIoTMQTTClient = AWSIoTMQTTClient(thing_name)\n",
    "myAWSIoTMQTTClient.configureEndpoint(iot_endpoint, 8883)\n",
    "myAWSIoTMQTTClient.configureCredentials(root_ca, \n",
    "                                        join(config['PCA_directory'], device_key_file), \n",
    "                                        join(config['PCA_directory'], device_cert_file))\n",
    "\n",
    "# AWSIoTMQTTClient connection configuration\n",
    "myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)\n",
    "myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing\n",
    "myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz\n",
    "myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec\n",
    "myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec\n",
    "\n",
    "# Connect and reconnect to AWS IoT\n",
    "try:\n",
    "    myAWSIoTMQTTClient.connect()\n",
    "except Exception as e:\n",
    "    logger.error('{}'.format(e))\n",
    "    time.sleep(5)\n",
    "    myAWSIoTMQTTClient.connect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Publish messages\n",
    "Publish some messages that should be transferred to the SQS queue in the other region.\n",
    "\n",
    "**Hint:** Before publishing message subscribe in the test client in the master region in the AWS IoT Core console to the topic `cmd/+/cross/region`. By doing so you can verify that your messages are reaching the IoT Core."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "topic = 'cmd/{}/cross/region'.format(thing_name)\n",
    "print(\"topic: {}\".format(topic))\n",
    "\n",
    "for i in range(5):\n",
    "    date_time = datetime.datetime.now().isoformat()\n",
    "    message = {\"thing_name\": \"{}\".format(thing_name), \"date_time\": date_time, \"i\": i}\n",
    "    print(\"publish: message: {}\".format(message))\n",
    "    myAWSIoTMQTTClient.publish(topic, json.dumps(message), 0)\n",
    "    time.sleep(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Poll the SQS queue\n",
    "\n",
    "To verify that the messages has been sent to SQS in the slave region poll the queue for messages. You should get messages from the queue. Feel free to execute polling multiple times."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Long poll for message on provided SQS queue\n",
    "response = c_sqs.receive_message(\n",
    "    QueueUrl=queue_url,\n",
    "    AttributeNames=[\n",
    "        'All'\n",
    "    ],\n",
    "    MaxNumberOfMessages=10,\n",
    "    MessageAttributeNames=[\n",
    "        'All'\n",
    "    ],\n",
    "    WaitTimeSeconds=20\n",
    ")\n",
    "\n",
    "print(\"queue_url: {}\\n\".format(queue_url))\n",
    "\n",
    "for message in response['Messages']:\n",
    "    body = message['Body']\n",
    "    message_id = message['MessageId']\n",
    "    #print(message)\n",
    "    print(\"message_id: {}\\nbody: {}\\n\".format(message_id, body))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## SNS/Lambda\n",
    "\n",
    "To verify that message have been sent also to the slave region with SNS and Lambda watch at CloudWatch in the slave region for the logs of your Lambda function.\n",
    "\n",
    "Logs can be found in `CloudWatch -> Logs -> /aws/lambda/<LAMBDA_FUNCTION_NAME`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Disconnet\n",
    "Disconnect the device from AWS IoT Core."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "myAWSIoTMQTTClient.disconnect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Clean Up\n",
    "Clean up your environment:\n",
    "\n",
    "* Remove permissions from the Lambda function\n",
    "* Unsubscribe the Lambda from the SNS topic\n",
    "* Delete SNS topic\n",
    "* Delete IoT topic rules"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_lambda.remove_permission(\n",
    "    FunctionName=lambda_name,\n",
    "    StatementId=statement_id\n",
    ")\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sns.unsubscribe(SubscriptionArn=subscription_arn)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sns.delete_topic(TopicArn=topic_arn)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.delete_topic_rule(ruleName=topic_rule_name_sqs)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_iot.delete_topic_rule(ruleName=topic_rule_name_sns)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = c_sns.delete_topic(TopicArn=topic_arn)\n",
    "print(\"response: {}\".format(json.dumps(response, indent=4, default=str)))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}