// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package cloudwatch import ( "context" "log" "reflect" "sort" "sync" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/outputs" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" "github.com/aws/amazon-cloudwatch-agent/handlers" "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/internal/publisher" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) const ( defaultMaxDatumsPerCall = 1000 // PutMetricData only supports up to 1000 data metrics per call by default defaultMaxValuesPerDatum = 150 // By default only these number of values can be inserted into the value list bottomLinePayloadSizeInBytesToPublish = 999000 // 1MB payload size. Leave 1kb for the last datum buffer before applying compression ratio. metricChanBufferSize = 10000 datumBatchChanBufferSize = 50 // the number of requests we buffer maxConcurrentPublisher = 10 // the number of CloudWatch clients send request concurrently defaultForceFlushInterval = time.Minute highResolutionTagKey = "aws:StorageResolution" defaultRetryCount = 5 // this is the retry count, the total attempts would be retry count + 1 at most. backoffRetryBase = 200 * time.Millisecond MaxDimensions = 30 ) const ( opPutLogEvents = "PutLogEvents" opPutMetricData = "PutMetricData" ) type CloudWatch struct { config *Config svc cloudwatchiface.CloudWatchAPI // todo: may want to increase the size of the chan since the type changed. // 1 telegraf Metric could have many Fields. // Each field corresponds to a MetricDatum. metricChan chan *aggregationDatum datumBatchChan chan []*cloudwatch.MetricDatum metricDatumBatch *MetricDatumBatch shutdownChan chan struct{} retries int publisher *publisher.Publisher retryer *retryer.LogThrottleRetryer droppingOriginMetrics collections.Set[string] aggregator Aggregator aggregatorShutdownChan chan struct{} aggregatorWaitGroup sync.WaitGroup agentInfo agentinfo.AgentInfo lastRequestBytes int } // Compile time interface check. var _ exporter.Metrics = (*CloudWatch)(nil) func (c *CloudWatch) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } func (c *CloudWatch) Start(_ context.Context, host component.Host) error { c.agentInfo = agentinfo.New("") c.publisher, _ = publisher.NewPublisher( publisher.NewNonBlockingFifoQueue(metricChanBufferSize), maxConcurrentPublisher, 2*time.Second, c.WriteToCloudWatch) credentialConfig := &configaws.CredentialConfig{ Region: c.config.Region, AccessKey: c.config.AccessKey, SecretKey: c.config.SecretKey, RoleARN: c.config.RoleARN, Profile: c.config.Profile, Filename: c.config.SharedCredentialFilename, Token: c.config.Token, } configProvider := credentialConfig.Credentials() logger := models.NewLogger("outputs", "cloudwatch", "") logThrottleRetryer := retryer.NewLogThrottleRetryer(logger) svc := cloudwatch.New( configProvider, &aws.Config{ Endpoint: aws.String(c.config.EndpointOverride), Retryer: logThrottleRetryer, LogLevel: configaws.SDKLogLevel(), Logger: configaws.SDKLogger{}, }) svc.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{opPutLogEvents, opPutMetricData})) svc.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", c.agentInfo.UserAgent())) svc.Handlers.Build.PushBackNamed(handlers.NewDynamicCustomHeaderHandler("X-Amz-Agent-Stats", c.agentInfo.StatsHeader)) //Format unique roll up list c.config.RollupDimensions = GetUniqueRollupList(c.config.RollupDimensions) c.svc = svc c.retryer = logThrottleRetryer c.startRoutines() return nil } func (c *CloudWatch) startRoutines() { setNewDistributionFunc(c.config.MaxValuesPerDatum) c.metricChan = make(chan *aggregationDatum, metricChanBufferSize) c.datumBatchChan = make(chan []*cloudwatch.MetricDatum, datumBatchChanBufferSize) c.shutdownChan = make(chan struct{}) c.aggregatorShutdownChan = make(chan struct{}) c.aggregator = NewAggregator(c.metricChan, c.aggregatorShutdownChan, &c.aggregatorWaitGroup) perRequestConstSize := overallConstPerRequestSize + len(c.config.Namespace) + namespaceOverheads c.metricDatumBatch = newMetricDatumBatch(c.config.MaxDatumsPerCall, perRequestConstSize) go c.pushMetricDatum() go c.publish() } func (c *CloudWatch) Shutdown(ctx context.Context) error { log.Println("D! Stopping the CloudWatch output plugin") for i := 0; i < 5; i++ { if len(c.metricChan) == 0 && len(c.datumBatchChan) == 0 { break } else { log.Printf("D! CloudWatch Close, %vth time to sleep since there is still some metric data remaining to publish.", i) time.Sleep(time.Second) } } if metricChanLen, datumBatchChanLen := len(c.metricChan), len(c.datumBatchChan); metricChanLen != 0 || datumBatchChanLen != 0 { log.Printf("D! CloudWatch Close, metricChan length = %v, datumBatchChan length = %v.", metricChanLen, datumBatchChanLen) } close(c.shutdownChan) c.publisher.Close() c.retryer.Stop() log.Println("D! Stopped the CloudWatch output plugin") return nil } // ConsumeMetrics queues metrics to be published to CW. // The actual publishing will occur in a long running goroutine. // This method can block when publishing is backed up. func (c *CloudWatch) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { datums := ConvertOtelMetrics(metrics) for _, d := range datums { c.aggregator.AddMetric(d) } return nil } // pushMetricDatum groups datums into batches for efficient API calls. // When a batch is full it is queued up for sending. // Even if the batch is not full it will still get sent after the flush interval. func (c *CloudWatch) pushMetricDatum() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case metric := <-c.metricChan: datums := c.BuildMetricDatum(metric) numberOfPartitions := len(datums) for i := 0; i < numberOfPartitions; i++ { c.metricDatumBatch.Partition = append(c.metricDatumBatch.Partition, datums[i]) c.metricDatumBatch.Size += payload(datums[i]) if c.metricDatumBatch.isFull() { // if batch is full c.datumBatchChan <- c.metricDatumBatch.Partition c.metricDatumBatch.clear() } } case <-ticker.C: if c.timeToPublish(c.metricDatumBatch) { // if the time to publish comes c.lastRequestBytes = c.metricDatumBatch.Size c.datumBatchChan <- c.metricDatumBatch.Partition c.metricDatumBatch.clear() } case <-c.shutdownChan: return } } } type MetricDatumBatch struct { MaxDatumsPerCall int Partition []*cloudwatch.MetricDatum BeginTime time.Time Size int perRequestConstSize int } func newMetricDatumBatch(maxDatumsPerCall, perRequestConstSize int) *MetricDatumBatch { return &MetricDatumBatch{ MaxDatumsPerCall: maxDatumsPerCall, Partition: make([]*cloudwatch.MetricDatum, 0, maxDatumsPerCall), BeginTime: time.Now(), Size: perRequestConstSize, perRequestConstSize: perRequestConstSize, } } func (b *MetricDatumBatch) clear() { b.Partition = make([]*cloudwatch.MetricDatum, 0, b.MaxDatumsPerCall) b.BeginTime = time.Now() b.Size = b.perRequestConstSize } func (b *MetricDatumBatch) isFull() bool { return len(b.Partition) >= b.MaxDatumsPerCall || b.Size >= bottomLinePayloadSizeInBytesToPublish } func (c *CloudWatch) timeToPublish(b *MetricDatumBatch) bool { return len(b.Partition) > 0 && time.Since(b.BeginTime) >= c.config.ForceFlushInterval } // getFirstPushMs returns the time at which the first upload should occur. // It uses random jitter as an offset from the start of the given interval. func getFirstPushMs(interval time.Duration) int64 { publishJitter := publishJitter(interval) log.Printf("I! cloudwatch: publish with ForceFlushInterval: %v, Publish Jitter: %v", interval, publishJitter) nowMs := time.Now().UnixMilli() // Truncate i.e. round down, then add jitter. // If the rounded down time is in the past, move it forward. nextMs := nowMs - (nowMs % interval.Milliseconds()) + publishJitter.Milliseconds() if nextMs < nowMs { nextMs += interval.Milliseconds() } return nextMs } // publish loops until a shutdown occurs. // It periodically tries pushing batches of metrics (if there are any). // If the batch buffer fills up the interval will be gradually reduced to avoid // many agents bursting the backend. func (c *CloudWatch) publish() { currentInterval := c.config.ForceFlushInterval nextMs := getFirstPushMs(currentInterval) bufferFullOccurred := false for { shouldPublish := false select { case <-c.shutdownChan: log.Printf("D! cloudwatch: publish routine receives the shutdown signal, exiting.") return default: } nowMs := time.Now().UnixMilli() if c.metricDatumBatchFull() { if !bufferFullOccurred { // Set to true so this only happens once per push. bufferFullOccurred = true // Keep interval above above 1 second. if currentInterval.Seconds() > 1 { currentInterval /= 2 if currentInterval.Seconds() < 1 { currentInterval = 1 * time.Second } // Cut the remaining interval in half. nextMs = nowMs + ((nextMs - nowMs) / 2) } } } if nowMs >= nextMs { shouldPublish = true // Restore interval if buffer did not fill up during this interval. if !bufferFullOccurred { currentInterval = c.config.ForceFlushInterval } nextMs += currentInterval.Milliseconds() } if shouldPublish { c.pushMetricDatumBatch() bufferFullOccurred = false } // Sleep 1 second, unless the nextMs is less than a second away. if nextMs-nowMs > time.Second.Milliseconds() { time.Sleep(time.Second) } else { time.Sleep(time.Duration(nextMs-nowMs) * time.Millisecond) } } } // metricDatumBatchFull returns true if the channel/buffer of batches if full. func (c *CloudWatch) metricDatumBatchFull() bool { return len(c.datumBatchChan) >= datumBatchChanBufferSize } // pushMetricDatumBatch will try receiving on the channel, and if successful, // then it publishes the received batch. func (c *CloudWatch) pushMetricDatumBatch() { for { select { case datumBatch := <-c.datumBatchChan: c.publisher.Publish(datumBatch) continue default: } break } } // backoffSleep sleeps some amount of time based on number of retries done. func (c *CloudWatch) backoffSleep() { d := 1 * time.Minute if c.retries <= defaultRetryCount { d = backoffRetryBase * time.Duration(1< MaxDimensions { log.Printf("D! cloudwatch: dropping dimensions, max %v, count %v", MaxDimensions, len(tagMap)) } dimensions := make([]*cloudwatch.Dimension, 0, MaxDimensions) // This is pretty ugly but we always want to include the "host" tag if it exists. if host, ok := tagMap["host"]; ok && host != "" { dimensions = append(dimensions, &cloudwatch.Dimension{ Name: aws.String("host"), Value: aws.String(host), }) } sortedKeys := sortedTagKeys(tagMap) for _, k := range sortedKeys { if len(dimensions) >= MaxDimensions { break } if k == "host" { continue } value := tagMap[k] if value == "" { continue } dimensions = append(dimensions, &cloudwatch.Dimension{ Name: aws.String(k), Value: aws.String(tagMap[k]), }) } return dimensions } func (c *CloudWatch) ProcessRollup(rawDimension []*cloudwatch.Dimension) [][]*cloudwatch.Dimension { rawDimensionMap := map[string]string{} for _, v := range rawDimension { rawDimensionMap[*v.Name] = *v.Value } targetDimensionsList := c.config.RollupDimensions fullDimensionsList := [][]*cloudwatch.Dimension{rawDimension} for _, targetDimensions := range targetDimensionsList { i := 0 extraDimensions := make([]*cloudwatch.Dimension, len(targetDimensions)) for _, targetDimensionKey := range targetDimensions { if val, ok := rawDimensionMap[targetDimensionKey]; !ok { break } else { extraDimensions[i] = &cloudwatch.Dimension{ Name: aws.String(targetDimensionKey), Value: aws.String(val), } } i += 1 } if i == len(targetDimensions) && !reflect.DeepEqual(rawDimension, extraDimensions) { fullDimensionsList = append(fullDimensionsList, extraDimensions) } } return fullDimensionsList } func GetUniqueRollupList(inputLists [][]string) [][]string { uniqueLists := [][]string{} if len(inputLists) > 0 { uniqueLists = append(uniqueLists, inputLists[0]) } for _, inputList := range inputLists { count := 0 for _, u := range uniqueLists { if reflect.DeepEqual(inputList, u) { break } count += 1 if count == len(uniqueLists) { uniqueLists = append(uniqueLists, inputList) } } } log.Printf("I! cloudwatch: get unique roll up list %v", uniqueLists) return uniqueLists } func (c *CloudWatch) SampleConfig() string { return "" } func (c *CloudWatch) Description() string { return "Configuration for AWS CloudWatch output." } func (c *CloudWatch) Connect() error { return nil } func (c *CloudWatch) Close() error { return nil } func (c *CloudWatch) Write(metrics []telegraf.Metric) error { return nil } func init() { outputs.Add("cloudwatch", func() telegraf.Output { return &CloudWatch{} }) }