package statsd import ( "fmt" "sync" "time" ) /* telemetryInterval is the interval at which telemetry will be sent by the client. */ const telemetryInterval = 10 * time.Second /* clientTelemetryTag is a tag identifying this specific client. */ var clientTelemetryTag = "client:go" /* clientVersionTelemetryTag is a tag identifying this specific client version. */ var clientVersionTelemetryTag = "client_version:5.1.1" // Telemetry represents internal metrics about the client behavior since it started. type Telemetry struct { // // Those are produced by the 'Client' // // TotalMetrics is the total number of metrics sent by the client before aggregation and sampling. TotalMetrics uint64 // TotalMetricsGauge is the total number of gauges sent by the client before aggregation and sampling. TotalMetricsGauge uint64 // TotalMetricsCount is the total number of counts sent by the client before aggregation and sampling. TotalMetricsCount uint64 // TotalMetricsHistogram is the total number of histograms sent by the client before aggregation and sampling. TotalMetricsHistogram uint64 // TotalMetricsDistribution is the total number of distributions sent by the client before aggregation and // sampling. TotalMetricsDistribution uint64 // TotalMetricsSet is the total number of sets sent by the client before aggregation and sampling. TotalMetricsSet uint64 // TotalMetricsTiming is the total number of timings sent by the client before aggregation and sampling. TotalMetricsTiming uint64 // TotalEvents is the total number of events sent by the client before aggregation and sampling. TotalEvents uint64 // TotalServiceChecks is the total number of service_checks sent by the client before aggregation and sampling. TotalServiceChecks uint64 // TotalDroppedOnReceive is the total number metrics/event/service_checks dropped when using ChannelMode (see // WithChannelMode option). TotalDroppedOnReceive uint64 // // Those are produced by the 'sender' // // TotalPayloadsSent is the total number of payload (packet on the network) succesfully sent by the client. When // using UDP we don't know if packet dropped or not, so all packet are considered as succesfully sent. TotalPayloadsSent uint64 // TotalPayloadsDropped is the total number of payload dropped by the client. This includes all cause of dropped // (TotalPayloadsDroppedQueueFull and TotalPayloadsDroppedWriter). When using UDP This won't includes the // network dropped. TotalPayloadsDropped uint64 // TotalPayloadsDroppedWriter is the total number of payload dropped by the writer (when using UDS or named // pipe) due to network timeout or error. TotalPayloadsDroppedWriter uint64 // TotalPayloadsDroppedQueueFull is the total number of payload dropped internally because the queue of payloads // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size. TotalPayloadsDroppedQueueFull uint64 // TotalBytesSent is the total number of bytes succesfully sent by the client. When using UDP we don't know if // packet dropped or not, so all packet are considered as succesfully sent. TotalBytesSent uint64 // TotalBytesDropped is the total number of bytes dropped by the client. This includes all cause of dropped // (TotalBytesDroppedQueueFull and TotalBytesDroppedWriter). When using UDP This // won't includes the network dropped. TotalBytesDropped uint64 // TotalBytesDroppedWriter is the total number of bytes dropped by the writer (when using UDS or named pipe) due // to network timeout or error. TotalBytesDroppedWriter uint64 // TotalBytesDroppedQueueFull is the total number of bytes dropped internally because the queue of payloads // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size. TotalBytesDroppedQueueFull uint64 // // Those are produced by the 'aggregator' // // AggregationNbContext is the total number of contexts flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContext uint64 // AggregationNbContextGauge is the total number of contexts for gauges flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextGauge uint64 // AggregationNbContextCount is the total number of contexts for counts flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextCount uint64 // AggregationNbContextSet is the total number of contexts for sets flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextSet uint64 // AggregationNbContextHistogram is the total number of contexts for histograms flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextHistogram uint64 // AggregationNbContextDistribution is the total number of contexts for distributions flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextDistribution uint64 // AggregationNbContextTiming is the total number of contexts for timings flushed by the aggregator when either // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled. AggregationNbContextTiming uint64 } type telemetryClient struct { c *Client tags []string aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry. tagsByType map[metricType][]string sender *sender worker *worker lastSample Telemetry // The previous sample of telemetry sent } func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *telemetryClient { t := &telemetryClient{ c: c, tags: append(c.tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), aggEnabled: aggregationEnabled, tagsByType: map[metricType][]string{}, } t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge") t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count") t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set") t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing") t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram") t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution") return t } func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) { telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } t := newTelemetryClient(c, transport, aggregationEnabled) // Creating a custom sender/worker with 1 worker in mutex mode for the // telemetry that share the same bufferPool. // FIXME due to performance pitfall, we're always using UDP defaults // even for UDS. t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool) t.worker = newWorker(pool, t.sender) return t, nil } func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) { wg.Add(1) go func() { defer wg.Done() ticker := time.NewTicker(telemetryInterval) for { select { case <-ticker.C: t.sendTelemetry() case <-stop: ticker.Stop() if t.sender != nil { t.sender.close() } return } } }() } func (t *telemetryClient) sendTelemetry() { for _, m := range t.flush() { if t.worker != nil { t.worker.processMetric(m) } else { t.c.send(m) } } if t.worker != nil { t.worker.flush() } } func (t *telemetryClient) getTelemetry() Telemetry { if t == nil { // telemetry was disabled through the WithoutTelemetry option return Telemetry{} } tlm := Telemetry{} t.c.flushTelemetryMetrics(&tlm) t.c.sender.flushTelemetryMetrics(&tlm) t.c.agg.flushTelemetryMetrics(&tlm) tlm.TotalMetrics = tlm.TotalMetricsGauge + tlm.TotalMetricsCount + tlm.TotalMetricsSet + tlm.TotalMetricsHistogram + tlm.TotalMetricsDistribution + tlm.TotalMetricsTiming tlm.TotalPayloadsDropped = tlm.TotalPayloadsDroppedQueueFull + tlm.TotalPayloadsDroppedWriter tlm.TotalBytesDropped = tlm.TotalBytesDroppedQueueFull + tlm.TotalBytesDroppedWriter if t.aggEnabled { tlm.AggregationNbContext = tlm.AggregationNbContextGauge + tlm.AggregationNbContextCount + tlm.AggregationNbContextSet + tlm.AggregationNbContextHistogram + tlm.AggregationNbContextDistribution + tlm.AggregationNbContextTiming } return tlm } // flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing. func (t *telemetryClient) flush() []metric { m := []metric{} // same as Count but without global namespace telemetryCount := func(name string, value int64, tags []string) { m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1}) } tlm := t.getTelemetry() // We send the diff between now and the previous telemetry flush. This keep the same telemetry behavior from V4 // so users dashboard's aren't broken when upgrading to V5. It also allow to graph on the same dashboard a mix // of V4 and V5 apps. telemetryCount("datadog.dogstatsd.client.metrics", int64(tlm.TotalMetrics-t.lastSample.TotalMetrics), t.tags) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsGauge-t.lastSample.TotalMetricsGauge), t.tagsByType[gauge]) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsCount-t.lastSample.TotalMetricsCount), t.tagsByType[count]) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsHistogram-t.lastSample.TotalMetricsHistogram), t.tagsByType[histogram]) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsDistribution-t.lastSample.TotalMetricsDistribution), t.tagsByType[distribution]) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsSet-t.lastSample.TotalMetricsSet), t.tagsByType[set]) telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsTiming-t.lastSample.TotalMetricsTiming), t.tagsByType[timing]) telemetryCount("datadog.dogstatsd.client.events", int64(tlm.TotalEvents-t.lastSample.TotalEvents), t.tags) telemetryCount("datadog.dogstatsd.client.service_checks", int64(tlm.TotalServiceChecks-t.lastSample.TotalServiceChecks), t.tags) telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(tlm.TotalDroppedOnReceive-t.lastSample.TotalDroppedOnReceive), t.tags) telemetryCount("datadog.dogstatsd.client.packets_sent", int64(tlm.TotalPayloadsSent-t.lastSample.TotalPayloadsSent), t.tags) telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(tlm.TotalPayloadsDropped-t.lastSample.TotalPayloadsDropped), t.tags) telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(tlm.TotalPayloadsDroppedQueueFull-t.lastSample.TotalPayloadsDroppedQueueFull), t.tags) telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(tlm.TotalPayloadsDroppedWriter-t.lastSample.TotalPayloadsDroppedWriter), t.tags) telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(tlm.TotalBytesDropped-t.lastSample.TotalBytesDropped), t.tags) telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(tlm.TotalBytesSent-t.lastSample.TotalBytesSent), t.tags) telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(tlm.TotalBytesDroppedQueueFull-t.lastSample.TotalBytesDroppedQueueFull), t.tags) telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(tlm.TotalBytesDroppedWriter-t.lastSample.TotalBytesDroppedWriter), t.tags) if t.aggEnabled { telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(tlm.AggregationNbContext-t.lastSample.AggregationNbContext), t.tags) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextGauge-t.lastSample.AggregationNbContextGauge), t.tagsByType[gauge]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextSet-t.lastSample.AggregationNbContextSet), t.tagsByType[set]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextCount-t.lastSample.AggregationNbContextCount), t.tagsByType[count]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextHistogram-t.lastSample.AggregationNbContextHistogram), t.tagsByType[histogram]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextDistribution-t.lastSample.AggregationNbContextDistribution), t.tagsByType[distribution]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextTiming-t.lastSample.AggregationNbContextTiming), t.tagsByType[timing]) } t.lastSample = tlm return m }