// Copyright 2018-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and limitations under the License. package telemetry import ( "os" "sync/atomic" "time" "unsafe" "github.com/aws/aws-xray-daemon/pkg/conn" "github.com/aws/aws-xray-daemon/pkg/util/timer" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/xray" log "github.com/cihub/seelog" ) const dataCutoffIntervalSecs = 60 const bufferSize = 30 const requestSize = 10 // T is instance of Telemetry. var T *Telemetry // Telemetry is used to record X-Ray daemon health. type Telemetry struct { // Instance of XRay. client conn.XRay timer timer.Timer // Amazon Resource Name (ARN) of the AWS resource running the daemon. resourceARN string // Instance id of the EC2 instance running X-Ray daemon. instanceID string // Host name of the EC2 instance running X-Ray daemon. hostname string // Self pointer. currentRecord *xray.TelemetryRecord // Timer channel. timerChan <-chan time.Time // Boolean channel, set to true when Quit channel is set to true. Done chan bool // Boolean channel, set to true when daemon is closed, Quit chan bool // Channel of TelemetryRecord used to send to X-Ray service. recordChan chan *xray.TelemetryRecord // When segment is received, postTelemetry is set to true, // indicating send telemetry data for the received segment. postTelemetry bool } // Init instantiates a new instance of Telemetry. func Init(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) { T = newT(awsConfig, s, resourceARN, noMetadata) log.Debug("Telemetry initiated") } // EvaluateConnectionError processes error with respect to request failure status code. func EvaluateConnectionError(err error) { requestFailure, ok := err.(awserr.RequestFailure) if ok { statusCode := requestFailure.StatusCode() if statusCode >= 500 && statusCode < 600 { T.Connection5xx(1) } else if statusCode >= 400 && statusCode < 500 { T.Connection4xx(1) } else { T.ConnectionOther(1) } } else { if conn.IsTimeoutError(err) { T.ConnectionTimeout(1) } else { awsError, ok := err.(awserr.Error) if ok { if awsError.Code() == "RequestError" { T.ConnectionUnknownHost(1) } } else { T.ConnectionOther(1) } } } } // GetTestTelemetry returns an empty telemetry record. func GetTestTelemetry() *Telemetry { return &Telemetry{ currentRecord: getEmptyTelemetryRecord(), } } // SegmentReceived increments SegmentsReceivedCount for the Telemetry record. func (t *Telemetry) SegmentReceived(count int64) { atomic.AddInt64(t.currentRecord.SegmentsReceivedCount, count) // Only send telemetry data when we receive any segment or else skip any telemetry data t.postTelemetry = true } // SegmentSent increments SegmentsSentCount for the Telemetry record. func (t *Telemetry) SegmentSent(count int64) { atomic.AddInt64(t.currentRecord.SegmentsSentCount, count) } // SegmentSpillover increments SegmentsSpilloverCount for the Telemetry record. func (t *Telemetry) SegmentSpillover(count int64) { atomic.AddInt64(t.currentRecord.SegmentsSpilloverCount, count) } // SegmentRejected increments SegmentsRejectedCount for the Telemetry record. func (t *Telemetry) SegmentRejected(count int64) { atomic.AddInt64(t.currentRecord.SegmentsRejectedCount, count) } // ConnectionTimeout increments TimeoutCount for the Telemetry record. func (t *Telemetry) ConnectionTimeout(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.TimeoutCount, count) } // ConnectionRefusal increments ConnectionRefusedCount for the Telemetry record. func (t *Telemetry) ConnectionRefusal(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.ConnectionRefusedCount, count) } // Connection4xx increments HTTPCode4XXCount for the Telemetry record. func (t *Telemetry) Connection4xx(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode4XXCount, count) } // Connection5xx increments HTTPCode5XXCount count for the Telemetry record. func (t *Telemetry) Connection5xx(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode5XXCount, count) } // ConnectionUnknownHost increments unknown host BackendConnectionErrors count for the Telemetry record. func (t *Telemetry) ConnectionUnknownHost(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.UnknownHostCount, count) } // ConnectionOther increments other BackendConnectionErrors count for the Telemetry record. func (t *Telemetry) ConnectionOther(count int64) { atomic.AddInt64(t.currentRecord.BackendConnectionErrors.OtherCount, count) } func newT(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) *Telemetry { timer := &timer.Client{} hostname := "" instanceID := "" var metadataClient *ec2metadata.EC2Metadata if !noMetadata { metadataClient = ec2metadata.New(s) } hostnameEnv := os.Getenv("AWS_HOSTNAME") if hostnameEnv != "" { hostname = hostnameEnv log.Debugf("Fetch hostname %v from environment variables", hostnameEnv) } else if metadataClient != nil { hn, err := metadataClient.GetMetadata("hostname") if err != nil { log.Debugf("Get hostname metadata failed: %s", err) } else { hostname = hn log.Debugf("Using %v hostname for telemetry records", hostname) } } else { log.Debug("No hostname set for telemetry records") } instanceIDEnv := os.Getenv("AWS_INSTANCE_ID") if instanceIDEnv != "" { instanceID = instanceIDEnv log.Debugf("Fetch instance ID %v from environment variables", instanceIDEnv) } else if metadataClient != nil { instID, err := metadataClient.GetMetadata("instance-id") if err != nil { log.Errorf("Get instance id metadata failed: %s", err) } else { instanceID = instID log.Debugf("Using %v Instance Id for Telemetry records", instanceID) } } else { log.Debug("No Instance Id set for telemetry records") } record := getEmptyTelemetryRecord() t := &Telemetry{ timer: timer, resourceARN: resourceARN, instanceID: instanceID, hostname: hostname, currentRecord: record, timerChan: getDataCutoffDelay(timer), Done: make(chan bool), Quit: make(chan bool), recordChan: make(chan *xray.TelemetryRecord, bufferSize), postTelemetry: false, } telemetryClient := conn.NewXRay(awsConfig, s) t.client = telemetryClient go t.pushData() return t } func getZeroInt64() *int64 { var zero int64 zero = 0 return &zero } func getEmptyTelemetryRecord() *xray.TelemetryRecord { return &xray.TelemetryRecord{ SegmentsReceivedCount: getZeroInt64(), SegmentsRejectedCount: getZeroInt64(), SegmentsSentCount: getZeroInt64(), SegmentsSpilloverCount: getZeroInt64(), BackendConnectionErrors: &xray.BackendConnectionErrors{ HTTPCode4XXCount: getZeroInt64(), HTTPCode5XXCount: getZeroInt64(), ConnectionRefusedCount: getZeroInt64(), OtherCount: getZeroInt64(), TimeoutCount: getZeroInt64(), UnknownHostCount: getZeroInt64(), }, } } func (t *Telemetry) pushData() { for { quit := false select { case <-t.Quit: quit = true break case <-t.timerChan: } emptyRecord := getEmptyTelemetryRecord() recordToReport := unsafe.Pointer(emptyRecord) recordToPushPointer := unsafe.Pointer(t.currentRecord) // Rotation Logic: // Swap current record to record to report. // Record to report is set to empty record which is set to current record t.currentRecord = (*xray.TelemetryRecord)(atomic.SwapPointer(&recordToReport, recordToPushPointer)) currentTime := time.Now() record := (*xray.TelemetryRecord)(recordToReport) record.Timestamp = ¤tTime t.add(record) t.sendAll() if quit { close(t.recordChan) log.Debug("telemetry: done!") t.Done <- true break } else { t.timerChan = getDataCutoffDelay(t.timer) } } } func (t *Telemetry) add(record *xray.TelemetryRecord) { // Only send telemetry data when we receive first segment or else do not send any telemetry data. if t.postTelemetry { select { case t.recordChan <- record: default: select { case <-t.recordChan: log.Debug("Telemetry Buffers truncated") t.add(record) default: log.Debug("Telemetry Buffers dequeued") } } } else { log.Debug("Skipped telemetry data as no segments found") } } func (t *Telemetry) sendAll() { records := t.collectAllRecords() recordsNoSend, err := t.sendRecords(records) if err != nil { log.Debugf("Failed to send telemetry %v record(s). Re-queue records. %v", len(records), err) // There might be possibility that new records might be archived during re-queue records. // But as timer is set after records are send this will not happen for _, record := range recordsNoSend { t.add(record) } } } func (t *Telemetry) collectAllRecords() []*xray.TelemetryRecord { records := make([]*xray.TelemetryRecord, bufferSize) records = records[:0] var record *xray.TelemetryRecord done := false for !done { select { case record = <-t.recordChan: recordLen := len(records) if recordLen < bufferSize { records = append(records, record) } default: done = true } } return records } func (t *Telemetry) sendRecords(records []*xray.TelemetryRecord) ([]*xray.TelemetryRecord, error) { if len(records) > 0 { for i := 0; i < len(records); i = i + requestSize { endIndex := len(records) if endIndex > i+requestSize { endIndex = i + requestSize } recordsToSend := records[i:endIndex] input := xray.PutTelemetryRecordsInput{ EC2InstanceId: &t.instanceID, Hostname: &t.hostname, ResourceARN: &t.resourceARN, TelemetryRecords: recordsToSend, } _, err := t.client.PutTelemetryRecords(&input) if err != nil { EvaluateConnectionError(err) return records[i:], err } } log.Debugf("Send %v telemetry record(s)", len(records)) } return nil, nil } func getDataCutoffDelay(timer timer.Timer) <-chan time.Time { return timer.After(time.Duration(time.Second * dataCutoffIntervalSecs)) }