//go:build linux // +build linux package dpdk import ( "encoding/json" "fmt" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) const ( description = "Reads metrics from DPDK applications using v2 telemetry interface." sampleConfig = ` ## Path to DPDK telemetry socket. This shall point to v2 version of DPDK telemetry interface. # socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2" ## Duration that defines how long the connected socket client will wait for a response before terminating connection. ## This includes both writing to and reading from socket. Since it's local socket access ## to a fast packet processing application, the timeout should be sufficient for most users. ## Setting the value to 0 disables the timeout (not recommended) # socket_access_timeout = "200ms" ## Enables telemetry data collection for selected device types. ## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status). ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats). # device_types = ["ethdev"] ## List of custom, application-specific telemetry commands to query ## The list of available commands depend on the application deployed. Applications can register their own commands ## via telemetry library API http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands ## For e.g. L3 Forwarding with Power Management Sample Application this could be: ## additional_commands = ["/l3fwd-power/stats"] # additional_commands = [] ## Allows turning off collecting data for individual "ethdev" commands. ## Remove "/ethdev/link_status" from list to start getting link status metrics. [inputs.dpdk.ethdev] exclude_commands = ["/ethdev/link_status"] ## When running multiple instances of the plugin it's recommended to add a unique tag to each instance to identify ## metrics exposed by an instance of DPDK application. This is useful when multiple DPDK apps run on a single host. ## [inputs.dpdk.tags] ## dpdk_instance = "my-fwd-app" ` defaultPathToSocket = "/var/run/dpdk/rte/dpdk_telemetry.v2" defaultAccessTimeout = config.Duration(200 * time.Millisecond) maxCommandLength = 56 maxCommandLengthWithParams = 1024 pluginName = "dpdk" ethdevListCommand = "/ethdev/list" rawdevListCommand = "/rawdev/list" ) type dpdk struct { SocketPath string `toml:"socket_path"` AccessTimeout config.Duration `toml:"socket_access_timeout"` DeviceTypes []string `toml:"device_types"` EthdevConfig ethdevConfig `toml:"ethdev"` AdditionalCommands []string `toml:"additional_commands"` Log telegraf.Logger `toml:"-"` connector *dpdkConnector rawdevCommands []string ethdevCommands []string ethdevExcludedCommandsFilter filter.Filter } type ethdevConfig struct { EthdevExcludeCommands []string `toml:"exclude_commands"` } func init() { inputs.Add(pluginName, func() telegraf.Input { dpdk := &dpdk{ // Setting it here (rather than in `Init()`) to distinguish between "zero" value, // default value and don't having value in config at all. AccessTimeout: defaultAccessTimeout, } return dpdk }) } func (dpdk *dpdk) SampleConfig() string { return sampleConfig } func (dpdk *dpdk) Description() string { return description } // Performs validation of all parameters from configuration func (dpdk *dpdk) Init() error { if dpdk.SocketPath == "" { dpdk.SocketPath = defaultPathToSocket dpdk.Log.Debugf("using default '%v' path for socket_path", defaultPathToSocket) } if dpdk.DeviceTypes == nil { dpdk.DeviceTypes = []string{"ethdev"} } var err error if err = isSocket(dpdk.SocketPath); err != nil { return err } dpdk.rawdevCommands = []string{"/rawdev/xstats"} dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/link_status"} if err = dpdk.validateCommands(); err != nil { return err } if dpdk.AccessTimeout < 0 { return fmt.Errorf("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)") } if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 { return fmt.Errorf("plugin was configured with nothing to read") } dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands) if err != nil { return fmt.Errorf("error occurred during filter prepation for ethdev excluded commands - %v", err) } dpdk.connector = newDpdkConnector(dpdk.SocketPath, dpdk.AccessTimeout) initMessage, err := dpdk.connector.connect() if initMessage != nil { dpdk.Log.Debugf("Successfully connected to %v running as process with PID %v with len %v", initMessage.Version, initMessage.Pid, initMessage.MaxOutputLen) } return err } // Checks that user-supplied commands are unique and match DPDK commands format func (dpdk *dpdk) validateCommands() error { dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands) for _, commandWithParams := range dpdk.AdditionalCommands { if len(commandWithParams) == 0 { return fmt.Errorf("got empty command") } if commandWithParams[0] != '/' { return fmt.Errorf("'%v' command should start with '/'", commandWithParams) } if commandWithoutParams := stripParams(commandWithParams); len(commandWithoutParams) >= maxCommandLength { return fmt.Errorf("'%v' command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength) } if len(commandWithParams) >= maxCommandLengthWithParams { return fmt.Errorf("command with parameters '%v' shall be less than %v characters", commandWithParams, maxCommandLengthWithParams) } } return nil } // Gathers all unique commands and processes each command sequentially // Parallel processing could be achieved by running several instances of this plugin with different settings func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { // This needs to be done during every `Gather(...)`, because DPDK can be restarted between consecutive // `Gather(...)` cycles which can cause that it will be exposing different set of metrics. commands := dpdk.gatherCommands(acc) for _, command := range commands { dpdk.processCommand(acc, command) } return nil } // Gathers all unique commands func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator) []string { var commands []string if choice.Contains("ethdev", dpdk.DeviceTypes) { ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter) ethdevCommands, err := dpdk.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands) if err != nil { acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", ethdevListCommand, err)) } commands = append(commands, ethdevCommands...) } if choice.Contains("rawdev", dpdk.DeviceTypes) { rawdevCommands, err := dpdk.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands) if err != nil { acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", rawdevListCommand, err)) } commands = append(commands, rawdevCommands...) } commands = append(commands, dpdk.AdditionalCommands...) return uniqueValues(commands) } // Fetches all identifiers of devices and then creates all possible combinations of commands for each device func (dpdk *dpdk) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) { response, err := dpdk.connector.getCommandResponse(listCommand) if err != nil { return nil, err } params, err := jsonToArray(response, listCommand) if err != nil { return nil, err } result := make([]string, 0, len(commands)*len(params)) for _, command := range commands { for _, param := range params { result = append(result, commandWithParams(command, param)) } } return result, nil } // Executes command, parses response and creates/writes metric from response func (dpdk *dpdk) processCommand(acc telegraf.Accumulator, commandWithParams string) { buf, err := dpdk.connector.getCommandResponse(commandWithParams) if err != nil { acc.AddError(err) return } var parsedResponse map[string]interface{} err = json.Unmarshal(buf, &parsedResponse) if err != nil { acc.AddError(fmt.Errorf("failed to unmarshall json response from %v command - %v", commandWithParams, err)) return } command := stripParams(commandWithParams) value := parsedResponse[command] if isEmpty(value) { acc.AddError(fmt.Errorf("got empty json on '%v' command", commandWithParams)) return } jf := jsonparser.JSONFlattener{} err = jf.FullFlattenJSON("", value, true, true) if err != nil { acc.AddError(fmt.Errorf("failed to flatten response - %v", err)) return } acc.AddFields(pluginName, jf.Fields, map[string]string{ "command": command, "params": getParams(commandWithParams), }) }