/* * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace Aws { namespace CloudWatchMetrics { namespace Utils { void MetricsCollector::Initialize(std::string metric_namespace, std::map & default_dimensions, int storage_resolution, const ros::NodeHandle& node_handle, const Aws::Client::ClientConfiguration & config, const Aws::SDKOptions & sdk_options, const Aws::CloudWatchMetrics::CloudWatchOptions & cloudwatch_options, const std::shared_ptr& metric_service_factory) { this->metric_namespace_ = std::move(metric_namespace); this->default_dimensions_ = default_dimensions; this->storage_resolution_.store(storage_resolution); this->node_handle_ = node_handle; this->metric_service_ = metric_service_factory->createMetricService(this->metric_namespace_, config, sdk_options, cloudwatch_options); } void MetricsCollector::SubscribeAllTopics() { ReadTopics(topics_); for (auto & topic : topics_) { ros::Subscriber sub = node_handle_.subscribe( topic, kNodeSubQueueSize, [this](const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg) -> void { this->RecordMetrics(metric_list_msg); }); subscriptions_.push_back(sub); } } int MetricsCollector::RecordMetrics( const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg) { int batched_count = 0; AWS_LOGSTREAM_DEBUG(__func__, "Received " << metric_list_msg->metrics.size() << " metrics"); for (auto metric_msg = metric_list_msg->metrics.begin(); metric_msg != metric_list_msg->metrics.end(); ++metric_msg) { std::map dimensions; for (auto & default_dimension : default_dimensions_) { dimensions.emplace(default_dimension.first, default_dimension.second); // ignore the return, if we get a duplicate we're // going to stick with the first one } for (const auto & dimension : metric_msg->dimensions) { dimensions.emplace(dimension.name, dimension.value); // ignore the return, if we get a duplicate // we're going to stick with the first one } AWS_LOGSTREAM_DEBUG(__func__, "Recording metric with name=[" << metric_msg->metric_name << "]"); // create a MetricObject with message parameters to batch Aws::CloudWatchMetrics::Utils::MetricObject metric_object {metric_msg->metric_name, metric_msg->value, metric_msg->unit, GetMetricDataEpochMillis(*metric_msg), dimensions, this->storage_resolution_.load()}; bool batched = metric_service_->batchData(metric_object); if (!batched) { AWS_LOGSTREAM_ERROR(__func__, "Failed to record metric"); } batched_count++; } return batched_count; } int64_t MetricsCollector::GetMetricDataEpochMillis(const ros_monitoring_msgs::MetricData & metric_msg) { return metric_msg.time_stamp.toNSec() / 1000000; } void MetricsCollector::TriggerPublish(const ros::TimerEvent &) { AWS_LOG_DEBUG(__func__, "Flushing metrics"); this->metric_service_->publishBatchedData(); } bool MetricsCollector::start() { bool is_started = true; this->SubscribeAllTopics(); if (this->metric_service_) { is_started &= this->metric_service_->start(); } is_started &= Service::start(); return is_started; } bool MetricsCollector::shutdown() { bool is_shutdown = Service::shutdown(); if (this->metric_service_) { is_shutdown &= this->metric_service_->shutdown(); } return is_shutdown; } bool MetricsCollector::checkIfOnline(std_srvs::Trigger::Request& request, std_srvs::Trigger::Response& response) { AWS_LOGSTREAM_DEBUG(__func__, "received request " << request); if (!this->metric_service_) { response.success = false; response.message = "The MetricsCollector is not initialized"; return true; } response.success = this->metric_service_->isConnected(); response.message = response.success ? "The MetricsCollector is connected" : "The MetricsCollector is not connected"; return true; } } // namespace Utils } // namespace CloudWatchMetrics } // namespace Aws