/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ #include #include #include #include #include #include #include #include #include #include using namespace Aws::Utils; namespace Aws { namespace Monitoring { static const char DEFAULT_MONITORING_ALLOC_TAG[] = "DefaultMonitoringAllocTag"; static const int CLIENT_ID_LENGTH_LIMIT = 256; static const int USER_AGENT_LENGTH_LIMIT = 256; static const int ERROR_MESSAGE_LENGTH_LIMIT = 512; const char DEFAULT_MONITORING_CLIENT_ID[] = ""; // default to empty; const char DEFAULT_MONITORING_HOST[] = "127.0.0.1"; // default to loopback ip address instead of "localhost" based on design specification. unsigned short DEFAULT_MONITORING_PORT = 31000; //default to 31000; bool DEFAULT_MONITORING_ENABLE = false; //default to false; const int DefaultMonitoring::DEFAULT_MONITORING_VERSION = 1; const char DefaultMonitoring::DEFAULT_CSM_CONFIG_ENABLED[] = "csm_enabled"; const char DefaultMonitoring::DEFAULT_CSM_CONFIG_CLIENT_ID[] = "csm_client_id"; const char DefaultMonitoring::DEFAULT_CSM_CONFIG_HOST[] = "csm_host"; const char DefaultMonitoring::DEFAULT_CSM_CONFIG_PORT[] = "csm_port"; const char DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_ENABLED[] = "AWS_CSM_ENABLED"; const char DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_CLIENT_ID[] = "AWS_CSM_CLIENT_ID"; const char DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_HOST[] = "AWS_CSM_HOST"; const char DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_PORT[] = "AWS_CSM_PORT"; struct DefaultContext { Aws::Utils::DateTime apiCallStartTime; Aws::Utils::DateTime attemptStartTime; int retryCount = 0; bool lastAttemptSucceeded = false; bool lastErrorRetryable = false; //doesn't apply if last attempt succeeded. const Aws::Client::HttpResponseOutcome* outcome = nullptr; }; static inline void FillRequiredFieldsToJson(Json::JsonValue& json, const Aws::String& type, const Aws::String& service, const Aws::String& api, const Aws::String& clientId, const DateTime& timestamp, int version, const Aws::String& userAgent) { json.WithString("Type", type) .WithString("Service", service) .WithString("Api", api) .WithString("ClientId", clientId.substr(0, CLIENT_ID_LENGTH_LIMIT)) .WithInt64("Timestamp", timestamp.Millis()) .WithInteger("Version", version) .WithString("UserAgent", userAgent.substr(0, USER_AGENT_LENGTH_LIMIT)); } static inline void FillRequiredApiCallFieldsToJson(Json::JsonValue& json, int attemptCount, int64_t apiCallLatency, bool maxRetriesExceeded) { json.WithInteger("AttemptCount", attemptCount) .WithInt64("Latency", apiCallLatency) .WithInteger("MaxRetriesExceeded", maxRetriesExceeded ? 1 : 0); } static inline void FillRequiredApiAttemptFieldsToJson(Json::JsonValue& json, const Aws::String& domainName, int64_t attemptLatency) { json.WithString("Fqdn", domainName) .WithInt64("AttemptLatency", attemptLatency); } static inline void ExportResponseHeaderToJson(Json::JsonValue& json, const Aws::Http::HeaderValueCollection& headers, const Aws::String& headerName, const Aws::String& targetName) { auto iter = headers.find(headerName); if (iter != headers.end()) { json.WithString(targetName, iter->second); } } static inline void ExportHttpMetricsToJson(Json::JsonValue& json, const Aws::Monitoring::HttpClientMetricsCollection& httpMetrics, Aws::Monitoring::HttpClientMetricsType type) { auto iter = httpMetrics.find(GetHttpClientMetricNameByType(type)); if (iter != httpMetrics.end()) { json.WithInt64(GetHttpClientMetricNameByType(type), iter->second); } } static inline void FillOptionalApiCallFieldsToJson(Json::JsonValue& json, const Aws::Http::HttpRequest* request, const Aws::Client::HttpResponseOutcome& outcome) { if (!request->GetSigningRegion().empty()) { json.WithString("Region", request->GetSigningRegion()); } if (!outcome.IsSuccess()) { if (outcome.GetError().GetExceptionName().empty()) // Not Aws Exception { json.WithString("FinalSdkExceptionMessage", outcome.GetError().GetMessage().substr(0, ERROR_MESSAGE_LENGTH_LIMIT)); } else // Aws Exception { json.WithString("FinalAwsException", outcome.GetError().GetExceptionName()) .WithString("FinalAwsExceptionMessage", outcome.GetError().GetMessage().substr(0, ERROR_MESSAGE_LENGTH_LIMIT)); } json.WithInteger("FinalHttpStatusCode", static_cast(outcome.GetError().GetResponseCode())); } else { json.WithInteger("FinalHttpStatusCode", static_cast(outcome.GetResult()->GetResponseCode())); } } static inline void FillOptionalApiAttemptFieldsToJson(Json::JsonValue& json, const Aws::Http::HttpRequest* request, const Aws::Client::HttpResponseOutcome& outcome, const CoreMetricsCollection& metricsFromCore) { /** *No matter request succeeded or not, these fields should be included as long as their requirements *are met. We should be able to access response (so as to access original request) if the response has error. */ if (request->HasAwsSessionToken() && !request->GetAwsSessionToken().empty()) { json.WithString("SessionToken", request->GetAwsSessionToken()); } if (!request->GetSigningRegion().empty()) { json.WithString("Region", request->GetSigningRegion()); } if (!request->GetSigningAccessKey().empty()) { json.WithString("AccessKey", request->GetSigningAccessKey()); } const auto& headers = outcome.IsSuccess() ? outcome.GetResult()->GetHeaders() : outcome.GetError().GetResponseHeaders(); ExportResponseHeaderToJson(json, headers, StringUtils::ToLower("x-amzn-RequestId"), "XAmznRequestId"); ExportResponseHeaderToJson(json, headers, StringUtils::ToLower("x-amz-request-id"), "XAmzRequestId"); ExportResponseHeaderToJson(json, headers, StringUtils::ToLower("x-amz-id-2"), "XAmzId2"); if (!outcome.IsSuccess()) { if (outcome.GetError().GetExceptionName().empty()) // Not Aws Exception { json.WithString("SdkExceptionMessage", outcome.GetError().GetMessage().substr(0, ERROR_MESSAGE_LENGTH_LIMIT)); } else // Aws Exception { json.WithString("AwsException", outcome.GetError().GetExceptionName()) .WithString("AwsExceptionMessage", outcome.GetError().GetMessage().substr(0, ERROR_MESSAGE_LENGTH_LIMIT)); } json.WithInteger("HttpStatusCode", static_cast(outcome.GetError().GetResponseCode())); } else { json.WithInteger("HttpStatusCode", static_cast(outcome.GetResult()->GetResponseCode())); } // Optional MetricsCollectedFromCore ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::AcquireConnectionLatency); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::ConnectionReused); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::ConnectLatency); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::DestinationIp); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::DnsLatency); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::RequestLatency); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::SslLatency); ExportHttpMetricsToJson(json, metricsFromCore.httpClientMetrics, HttpClientMetricsType::TcpLatency); } DefaultMonitoring::DefaultMonitoring(const Aws::String& clientId, const Aws::String& host, unsigned short port): m_udp(host.c_str(), port), m_clientId(clientId) { } void* DefaultMonitoring::OnRequestStarted(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request) const { AWS_UNREFERENCED_PARAM(request); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "OnRequestStart Service: " << serviceName << "Request: " << requestName); auto context = Aws::New(DEFAULT_MONITORING_ALLOC_TAG); context->apiCallStartTime = Aws::Utils::DateTime::Now(); context->attemptStartTime = context->apiCallStartTime; context->retryCount = 0; return context; } void DefaultMonitoring::OnRequestSucceeded(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request, const Aws::Client::HttpResponseOutcome& outcome, const CoreMetricsCollection& metricsFromCore, void* context) const { AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "OnRequestSucceeded Service: " << serviceName << "Request: " << requestName); CollectAndSendAttemptData(serviceName, requestName, request, outcome, metricsFromCore, context); } void DefaultMonitoring::OnRequestFailed(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request, const Aws::Client::HttpResponseOutcome& outcome, const CoreMetricsCollection& metricsFromCore, void* context) const { AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "OnRequestFailed Service: " << serviceName << "Request: " << requestName); CollectAndSendAttemptData(serviceName, requestName, request, outcome, metricsFromCore, context); } void DefaultMonitoring::OnRequestRetry(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request, void* context) const { AWS_UNREFERENCED_PARAM(request); DefaultContext* defaultContext = static_cast(context); defaultContext->retryCount++; defaultContext->attemptStartTime = Aws::Utils::DateTime::Now(); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "OnRequestRetry Service: " << serviceName << "Request: " << requestName << " RetryCnt:" << defaultContext->retryCount); } void DefaultMonitoring::OnFinish(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request, void* context) const { AWS_UNREFERENCED_PARAM(request); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "OnRequestFinish Service: " << serviceName << "Request: " << requestName); DefaultContext* defaultContext = static_cast(context); Aws::Utils::Json::JsonValue json; FillRequiredFieldsToJson(json, "ApiCall", serviceName, requestName, m_clientId, defaultContext->apiCallStartTime, DEFAULT_MONITORING_VERSION, request->GetUserAgent()); FillRequiredApiCallFieldsToJson(json, defaultContext->retryCount + 1, (DateTime::Now() - defaultContext->apiCallStartTime).count(), (!defaultContext->lastAttemptSucceeded && defaultContext->lastErrorRetryable)); FillOptionalApiCallFieldsToJson(json, request.get(), *(defaultContext->outcome)); Aws::String compactData = json.View().WriteCompact(); m_udp.SendData(reinterpret_cast(compactData.c_str()), static_cast(compactData.size())); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Send API Metrics: \n" << json.View().WriteReadable()); Aws::Delete(defaultContext); } void DefaultMonitoring::CollectAndSendAttemptData(const Aws::String& serviceName, const Aws::String& requestName, const std::shared_ptr& request, const Aws::Client::HttpResponseOutcome& outcome, const CoreMetricsCollection& metricsFromCore, void* context) const { DefaultContext* defaultContext = static_cast(context); defaultContext->outcome = &outcome; defaultContext->lastAttemptSucceeded = outcome.IsSuccess() ? true : false; defaultContext->lastErrorRetryable = (!outcome.IsSuccess() && outcome.GetError().ShouldRetry()) ? true : false; Aws::Utils::Json::JsonValue json; FillRequiredFieldsToJson(json, "ApiCallAttempt", serviceName, requestName, m_clientId, defaultContext->attemptStartTime, DEFAULT_MONITORING_VERSION, request->GetUserAgent()); FillRequiredApiAttemptFieldsToJson(json, request->GetUri().GetAuthority(), (DateTime::Now() - defaultContext->attemptStartTime).count()); FillOptionalApiAttemptFieldsToJson(json, request.get(), outcome, metricsFromCore); Aws::String compactData = json.View().WriteCompact(); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Send Attempt Metrics: \n" << json.View().WriteReadable()); m_udp.SendData(reinterpret_cast(compactData.c_str()), static_cast(compactData.size())); } Aws::UniquePtr DefaultMonitoringFactory::CreateMonitoringInstance() const { Aws::String clientId(DEFAULT_MONITORING_CLIENT_ID); // default to empty Aws::String host(DEFAULT_MONITORING_HOST); // default to 127.0.0.1 unsigned short port = DEFAULT_MONITORING_PORT; // default to 31000 bool enable = DEFAULT_MONITORING_ENABLE; //default to false; //check profile_config Aws::String tmpEnable = Aws::Config::GetCachedConfigValue(DefaultMonitoring::DEFAULT_CSM_CONFIG_ENABLED); Aws::String tmpClientId = Aws::Config::GetCachedConfigValue(DefaultMonitoring::DEFAULT_CSM_CONFIG_CLIENT_ID); Aws::String tmpHost = Aws::Config::GetCachedConfigValue(DefaultMonitoring::DEFAULT_CSM_CONFIG_HOST); Aws::String tmpPort = Aws::Config::GetCachedConfigValue(DefaultMonitoring::DEFAULT_CSM_CONFIG_PORT); if (!tmpEnable.empty()) { enable = StringUtils::CaselessCompare(tmpEnable.c_str(), "true") ? true : false; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved csm_enabled from profile_config to be " << enable); } if (!tmpClientId.empty()) { clientId = tmpClientId; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved csm_client_id from profile_config to be " << clientId); } if (!tmpHost.empty()) { host = tmpHost; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved csm_host from profile_config to be " << host); } if (!tmpPort.empty()) { port = static_cast(StringUtils::ConvertToInt32(tmpPort.c_str())); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved csm_port from profile_config to be " << port); } // check environment variables tmpEnable = Aws::Environment::GetEnv(DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_ENABLED); tmpClientId = Aws::Environment::GetEnv(DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_CLIENT_ID); tmpHost = Aws::Environment::GetEnv(DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_HOST); tmpPort = Aws::Environment::GetEnv(DefaultMonitoring::DEFAULT_CSM_ENVIRONMENT_VAR_PORT); if (!tmpEnable.empty()) { enable = StringUtils::CaselessCompare(tmpEnable.c_str(), "true") ? true : false; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved AWS_CSM_ENABLED from Environment variable to be " << enable); } if (!tmpClientId.empty()) { clientId = tmpClientId; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved AWS_CSM_CLIENT_ID from Environment variable to be " << clientId); } if (!tmpHost.empty()) { host = tmpHost; AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved AWS_CSM_HOST from Environment variable to be " << host); } if (!tmpPort.empty()) { port = static_cast(StringUtils::ConvertToInt32(tmpPort.c_str())); AWS_LOGSTREAM_DEBUG(DEFAULT_MONITORING_ALLOC_TAG, "Resolved AWS_CSM_PORT from Environment variable to be " << port); } if (!enable) { return nullptr; } return Aws::MakeUnique(DEFAULT_MONITORING_ALLOC_TAG, clientId, host, port); } } }