package opentelemetry import ( "context" "time" "github.com/influxdata/influxdb-observability/common" "github.com/influxdata/influxdb-observability/influx2otel" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "go.opentelemetry.io/collector/model/otlpgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" // This causes the gRPC library to register gzip compression. _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" ) type OpenTelemetry struct { ServiceAddress string `toml:"service_address"` tls.ClientConfig Timeout config.Duration `toml:"timeout"` Compression string `toml:"compression"` Headers map[string]string `toml:"headers"` Attributes map[string]string `toml:"attributes"` Log telegraf.Logger `toml:"-"` metricsConverter *influx2otel.LineProtocolToOtelMetrics grpcClientConn *grpc.ClientConn metricsServiceClient otlpgrpc.MetricsClient callOptions []grpc.CallOption } const sampleConfig = ` ## Override the default (localhost:4317) OpenTelemetry gRPC service ## address:port # service_address = "localhost:4317" ## Override the default (5s) request timeout # timeout = "5s" ## Optional TLS Config. ## ## Root certificates for verifying server certificates encoded in PEM format. # tls_ca = "/etc/telegraf/ca.pem" ## The public and private keypairs for the client encoded in PEM format. ## May contain intermediate certificates. # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS, but skip TLS chain and host verification. # insecure_skip_verify = false ## Send the specified TLS server name via SNI. # tls_server_name = "foo.example.com" ## Override the default (gzip) compression used to send data. ## Supports: "gzip", "none" # compression = "gzip" ## Additional OpenTelemetry resource attributes # [outputs.opentelemetry.attributes] # "service.name" = "demo" ## Additional gRPC request metadata # [outputs.opentelemetry.headers] # key1 = "value1" ` func (o *OpenTelemetry) SampleConfig() string { return sampleConfig } func (o *OpenTelemetry) Description() string { return "Send OpenTelemetry metrics over gRPC" } func (o *OpenTelemetry) Connect() error { logger := &otelLogger{o.Log} if o.ServiceAddress == "" { o.ServiceAddress = defaultServiceAddress } if o.Timeout <= 0 { o.Timeout = defaultTimeout } if o.Compression == "" { o.Compression = defaultCompression } metricsConverter, err := influx2otel.NewLineProtocolToOtelMetrics(logger) if err != nil { return err } var grpcTLSDialOption grpc.DialOption if tlsConfig, err := o.ClientConfig.TLSConfig(); err != nil { return err } else if tlsConfig != nil { grpcTLSDialOption = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } else { grpcTLSDialOption = grpc.WithInsecure() } grpcClientConn, err := grpc.Dial(o.ServiceAddress, grpcTLSDialOption) if err != nil { return err } metricsServiceClient := otlpgrpc.NewMetricsClient(grpcClientConn) o.metricsConverter = metricsConverter o.grpcClientConn = grpcClientConn o.metricsServiceClient = metricsServiceClient if o.Compression != "" && o.Compression != "none" { o.callOptions = append(o.callOptions, grpc.UseCompressor(o.Compression)) } return nil } func (o *OpenTelemetry) Close() error { if o.grpcClientConn != nil { err := o.grpcClientConn.Close() o.grpcClientConn = nil return err } return nil } func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error { batch := o.metricsConverter.NewBatch() for _, metric := range metrics { var vType common.InfluxMetricValueType switch metric.Type() { case telegraf.Gauge: vType = common.InfluxMetricValueTypeGauge case telegraf.Untyped: vType = common.InfluxMetricValueTypeUntyped case telegraf.Counter: vType = common.InfluxMetricValueTypeSum case telegraf.Histogram: vType = common.InfluxMetricValueTypeHistogram case telegraf.Summary: vType = common.InfluxMetricValueTypeSummary default: o.Log.Warnf("unrecognized metric type %Q", metric.Type()) continue } err := batch.AddPoint(metric.Name(), metric.Tags(), metric.Fields(), metric.Time(), vType) if err != nil { o.Log.Warnf("failed to add point: %s", err) continue } } md := otlpgrpc.NewMetricsRequest() md.SetMetrics(batch.GetMetrics()) if md.Metrics().ResourceMetrics().Len() == 0 { return nil } if len(o.Attributes) > 0 { for i := 0; i < md.Metrics().ResourceMetrics().Len(); i++ { for k, v := range o.Attributes { md.Metrics().ResourceMetrics().At(i).Resource().Attributes().UpsertString(k, v) } } } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout)) if len(o.Headers) > 0 { ctx = metadata.NewOutgoingContext(ctx, metadata.New(o.Headers)) } defer cancel() _, err := o.metricsServiceClient.Export(ctx, md, o.callOptions...) return err } const ( defaultServiceAddress = "localhost:4317" defaultTimeout = config.Duration(5 * time.Second) defaultCompression = "gzip" ) func init() { outputs.Add("opentelemetry", func() telegraf.Output { return &OpenTelemetry{ ServiceAddress: defaultServiceAddress, Timeout: defaultTimeout, Compression: defaultCompression, } }) }