/* * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ #include #include #include #include #include "aws_iot_config.h" #include "aws_iot_json_utils.h" #include "aws_iot_log.h" #include "aws_iot_version.h" #include "aws_iot_mqtt_client_interface.h" #include "aws_iot_jobs_interface.h" #define HOST_ADDRESS_SIZE 255 /** * @brief Default cert location */ static char certDirectory[PATH_MAX + 1] = "../../../certs"; /** * @brief Default MQTT HOST URL is pulled from the aws_iot_config.h */ static char HostAddress[HOST_ADDRESS_SIZE] = AWS_IOT_MQTT_HOST; /** * @brief Default MQTT port is pulled from the aws_iot_config.h */ static uint32_t port = AWS_IOT_MQTT_PORT; static jsmn_parser jsonParser; static jsmntok_t jsonTokenStruct[MAX_JSON_TOKEN_EXPECTED]; static int32_t tokenCount; static IoT_Error_t executeSystemCmd(const char *cmd) { #if defined (__APPLE__) || (__linux__) || (__unix__) IoT_Error_t rc = FAILURE; if (cmd != NULL) { IOT_INFO("Execute command: %s", cmd); if ( system(cmd) < 0 ) { IOT_ERROR("fail to execute"); rc = FAILURE; } else { rc = SUCCESS; } } return rc; #else #error "Implement your execution code here!" #endif } static void iot_update_job_status(AWS_IoT_Client *pClient, char *jobId, JobExecutionStatus eStatus, const char *pcMsg) { char topicToPublishUpdate[MAX_JOB_TOPIC_LENGTH_BYTES]; char messageBuffer[200]; AwsIotJobExecutionUpdateRequest updateRequest; IoT_Error_t rc; updateRequest.status = eStatus; if (pcMsg) updateRequest.statusDetails = pcMsg; else updateRequest.statusDetails = "{\"reason\":\"UnknowException\"}"; updateRequest.expectedVersion = 0; updateRequest.executionNumber = 0; updateRequest.includeJobExecutionState = false; updateRequest.includeJobDocument = false; updateRequest.clientToken = NULL; rc = aws_iot_jobs_send_update(pClient, QOS0, AWS_IOT_MY_THING_NAME, jobId, &updateRequest, topicToPublishUpdate, sizeof(topicToPublishUpdate), messageBuffer, sizeof(messageBuffer)); if (SUCCESS != rc) { IOT_ERROR("aws_iot_jobs_send_update returned error : %d ", rc); return; } } static void iot_get_pending_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) { IOT_UNUSED(pData); IOT_UNUSED(pClient); IOT_INFO("\nJOB_GET_PENDING_TOPIC callback"); IOT_INFO("topic: %.*s", topicNameLen, topicName); IOT_INFO("payload: %.*s", (int)params->payloadLen, (char *)params->payload); jsmn_init(&jsonParser); tokenCount = jsmn_parse(&jsonParser, params->payload, (int)params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED); if (tokenCount < 0) { IOT_WARN("Failed to parse JSON: %d", tokenCount); return; } /* Assume the top-level element is an object */ if (tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) { IOT_WARN("Top Level is not an object"); return; } jsmntok_t *jobs; jobs = findToken("inProgressJobs", params->payload, jsonTokenStruct); if (jobs) { IOT_INFO("inProgressJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start); } jobs = findToken("queuedJobs", params->payload, jsonTokenStruct); if (jobs) { IOT_INFO("queuedJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start); } } static void iot_next_job_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) { char topicToPublishUpdate[MAX_JOB_TOPIC_LENGTH_BYTES]; char messageBuffer[200]; jsmntok_t *tokExecution; jsmntok_t *tokJobId; jsmntok_t *tokJobDocument; jsmntok_t *tok; char jobId[MAX_SIZE_OF_JOB_ID + 1]; char cmdToBeExecuted[MAX_SIZE_OF_CMD + 1]; IoT_Error_t rc; IOT_UNUSED(pData); IOT_UNUSED(pClient); IOT_INFO("\nJOB_NOTIFY_NEXT_TOPIC / JOB_DESCRIBE_TOPIC($next) callback"); IOT_INFO("topic: %.*s", topicNameLen, topicName); IOT_INFO("payload: %.*s", (int)params->payloadLen, (char *)params->payload); jsmn_init(&jsonParser); tokenCount = jsmn_parse(&jsonParser, params->payload, (int)params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED); if (tokenCount < 0) { IOT_WARN("Failed to parse JSON: %d", tokenCount); return; } /* Assume the top-level element is an object */ if (tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) { IOT_WARN("Top Level is not an object"); return; } if ((tokExecution = findToken("execution", params->payload, jsonTokenStruct)) == NULL) { IOT_INFO("execution property not found, nothing to do"); } else if ((tokJobId = findToken("jobId", params->payload, tokExecution)) == NULL) { IOT_INFO("jobId property not found, nothing to do"); } else if ((rc = parseStringValue(jobId, MAX_SIZE_OF_JOB_ID + 1, params->payload, tokJobId)) != SUCCESS) { IOT_ERROR("parseStringValue returned error : %d ", rc); } else { if ( ( tokJobDocument = findToken("jobDocument", params->payload, tokExecution) ) == NULL ) { IOT_INFO("tokJobDocument property not found, nothing to do"); iot_update_job_status(pClient, jobId, JOB_EXECUTION_SUCCEEDED, "{\"reason\":\"Nop\"}"); } else if ( ( tok = findToken("cmdToBeExecuted", params->payload, tokJobDocument) ) == NULL ) { IOT_INFO("cmdToBeExecuted property not found, nothing to do"); iot_update_job_status(pClient, jobId, JOB_EXECUTION_SUCCEEDED, "{\"reason\":\"Nop\"}"); } else if ( (tok->end - tok->start) > MAX_SIZE_OF_CMD ) { IOT_ERROR("cmdToBeExecuted property execeeds max size, skip this command"); iot_update_job_status(pClient, jobId, JOB_EXECUTION_FAILED, "{\"reason\":\"CmdFormatError\"}"); } else if ( ( rc = parseStringValue(cmdToBeExecuted, MAX_SIZE_OF_CMD + 1, params->payload, tok) ) != SUCCESS ) { IOT_ERROR("parseStringValue returned error : %d ", rc); iot_update_job_status(pClient, jobId, JOB_EXECUTION_FAILED, "{\"reason\":\"CmdFormatError\"}"); } else if ( ( rc = executeSystemCmd(cmdToBeExecuted) ) != SUCCESS ) { IOT_ERROR("executeSystemCmd returned error : %d ", rc); iot_update_job_status(pClient, jobId, JOB_EXECUTION_FAILED, "{\"reason\":\"CmdExeError\"}"); } else { IOT_INFO("executeSystemCmd returned success"); iot_update_job_status(pClient, jobId, JOB_EXECUTION_SUCCEEDED, "{\"reason\":\"CmdExeSuccess\"}"); } } } static void iot_update_accepted_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) { IOT_UNUSED(pData); IOT_UNUSED(pClient); IOT_INFO("\nJOB_UPDATE_TOPIC / accepted callback"); IOT_INFO("topic: %.*s", topicNameLen, topicName); IOT_INFO("payload: %.*s", (int)params->payloadLen, (char *)params->payload); } static void iot_update_rejected_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) { IOT_UNUSED(pData); IOT_UNUSED(pClient); IOT_INFO("\nJOB_UPDATE_TOPIC / rejected callback"); IOT_INFO("topic: %.*s", topicNameLen, topicName); IOT_INFO("payload: %.*s", (int)params->payloadLen, (char *)params->payload); /* Do error handling here for when the update was rejected */ } static void disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data) { IOT_WARN("MQTT Disconnect"); IoT_Error_t rc = FAILURE; if (NULL == pClient) { return; } IOT_UNUSED(data); if (aws_iot_is_autoreconnect_enabled(pClient)) { IOT_INFO("Auto Reconnect is enabled, Reconnecting attempt will start now"); } else { IOT_WARN("Auto Reconnect not enabled. Starting manual reconnect..."); rc = aws_iot_mqtt_attempt_reconnect(pClient); if (NETWORK_RECONNECTED == rc) { IOT_WARN("Manual Reconnect Successful"); } else { IOT_WARN("Manual Reconnect Failed - %d", rc); } } } int main(int argc, char **argv) { char rootCA[PATH_MAX + 1]; char clientCRT[PATH_MAX + 1]; char clientKey[PATH_MAX + 1]; char CurrentWD[PATH_MAX + 1]; IoT_Error_t rc = FAILURE; AWS_IoT_Client client; IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault; IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault; getcwd(CurrentWD, sizeof(CurrentWD)); snprintf(rootCA, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_ROOT_CA_FILENAME); snprintf(clientCRT, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_CERTIFICATE_FILENAME); snprintf(clientKey, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_PRIVATE_KEY_FILENAME); IOT_DEBUG("rootCA %s", rootCA); IOT_DEBUG("clientCRT %s", clientCRT); IOT_DEBUG("clientKey %s", clientKey); mqttInitParams.enableAutoReconnect = false; // We enable this later below mqttInitParams.pHostURL = HostAddress; mqttInitParams.port = port; mqttInitParams.pRootCALocation = rootCA; mqttInitParams.pDeviceCertLocation = clientCRT; mqttInitParams.pDevicePrivateKeyLocation = clientKey; mqttInitParams.mqttCommandTimeout_ms = 20000; mqttInitParams.tlsHandshakeTimeout_ms = 5000; mqttInitParams.isSSLHostnameVerify = true; mqttInitParams.disconnectHandler = disconnectCallbackHandler; mqttInitParams.disconnectHandlerData = NULL; rc = aws_iot_mqtt_init(&client, &mqttInitParams); if (SUCCESS != rc) { IOT_ERROR("aws_iot_mqtt_init returned error : %d ", rc); return rc; } connectParams.keepAliveIntervalInSec = 600; connectParams.isCleanSession = true; connectParams.MQTTVersion = MQTT_3_1_1; connectParams.pClientID = AWS_IOT_MQTT_CLIENT_ID; connectParams.clientIDLen = (uint16_t)strlen(AWS_IOT_MQTT_CLIENT_ID); connectParams.isWillMsgPresent = false; IOT_INFO("Connecting..."); rc = aws_iot_mqtt_connect(&client, &connectParams); if (SUCCESS != rc) { IOT_ERROR("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port); return rc; } /* * Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h * #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL * #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL */ rc = aws_iot_mqtt_autoreconnect_set_status(&client, true); if (SUCCESS != rc) { IOT_ERROR("Unable to set Auto Reconnect to true - %d", rc); return rc; } char topicToSubscribeGetPending[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToSubscribeNotifyNext[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToSubscribeGetNext[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToSubscribeUpdateAccepted[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToSubscribeUpdateRejected[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToPublishGetPending[MAX_JOB_TOPIC_LENGTH_BYTES]; char topicToPublishGetNext[MAX_JOB_TOPIC_LENGTH_BYTES]; rc = aws_iot_jobs_subscribe_to_job_messages( &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_GET_PENDING_TOPIC, JOB_WILDCARD_REPLY_TYPE, iot_get_pending_callback_handler, NULL, topicToSubscribeGetPending, sizeof(topicToSubscribeGetPending)); if (SUCCESS != rc) { IOT_ERROR("Error subscribing JOB_GET_PENDING_TOPIC: %d ", rc); return rc; } rc = aws_iot_jobs_subscribe_to_job_messages( &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_NOTIFY_NEXT_TOPIC, JOB_REQUEST_TYPE, iot_next_job_callback_handler, NULL, topicToSubscribeNotifyNext, sizeof(topicToSubscribeNotifyNext)); if (SUCCESS != rc) { IOT_ERROR("Error subscribing JOB_NOTIFY_NEXT_TOPIC: %d ", rc); return rc; } rc = aws_iot_jobs_subscribe_to_job_messages( &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, JOB_DESCRIBE_TOPIC, JOB_WILDCARD_REPLY_TYPE, iot_next_job_callback_handler, NULL, topicToSubscribeGetNext, sizeof(topicToSubscribeGetNext)); if (SUCCESS != rc) { IOT_ERROR("Error subscribing JOB_DESCRIBE_TOPIC ($next): %d ", rc); return rc; } rc = aws_iot_jobs_subscribe_to_job_messages( &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_ACCEPTED_REPLY_TYPE, iot_update_accepted_callback_handler, NULL, topicToSubscribeUpdateAccepted, sizeof(topicToSubscribeUpdateAccepted)); if (SUCCESS != rc) { IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/accepted: %d ", rc); return rc; } rc = aws_iot_jobs_subscribe_to_job_messages( &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_REJECTED_REPLY_TYPE, iot_update_rejected_callback_handler, NULL, topicToSubscribeUpdateRejected, sizeof(topicToSubscribeUpdateRejected)); if (SUCCESS != rc) { IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/rejected: %d ", rc); return rc; } rc = aws_iot_jobs_send_query(&client, QOS0, AWS_IOT_MY_THING_NAME, NULL, NULL, topicToPublishGetPending, sizeof(topicToPublishGetPending), NULL, 0, JOB_GET_PENDING_TOPIC); AwsIotDescribeJobExecutionRequest describeRequest; describeRequest.executionNumber = 0; describeRequest.includeJobDocument = true; describeRequest.clientToken = NULL; rc = aws_iot_jobs_describe(&client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, &describeRequest, topicToPublishGetNext, sizeof(topicToPublishGetNext), NULL, 0); for (;;) { //Max time the yield function will wait for read messages rc = aws_iot_mqtt_yield(&client, 100); if (NETWORK_ATTEMPTING_RECONNECT == rc) { // If the client is attempting to reconnect we will skip the rest of the loop. continue; } } return rc; }