package opcua_client import ( "context" "fmt" "net/url" "sort" "strconv" "strings" "time" "github.com/gopcua/opcua" "github.com/gopcua/opcua/ua" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/selfstat" ) type OpcuaWorkarounds struct { AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"` } // OpcUA type type OpcUA struct { MetricName string `toml:"name"` Endpoint string `toml:"endpoint"` SecurityPolicy string `toml:"security_policy"` SecurityMode string `toml:"security_mode"` Certificate string `toml:"certificate"` PrivateKey string `toml:"private_key"` Username string `toml:"username"` Password string `toml:"password"` Timestamp string `toml:"timestamp"` AuthMethod string `toml:"auth_method"` ConnectTimeout config.Duration `toml:"connect_timeout"` RequestTimeout config.Duration `toml:"request_timeout"` RootNodes []NodeSettings `toml:"nodes"` Groups []GroupSettings `toml:"group"` Workarounds OpcuaWorkarounds `toml:"workarounds"` Log telegraf.Logger `toml:"-"` nodes []Node nodeData []OPCData nodeIDs []*ua.NodeID nodeIDerror []error state ConnectionState // status ReadSuccess selfstat.Stat `toml:"-"` ReadError selfstat.Stat `toml:"-"` // internal values client *opcua.Client req *ua.ReadRequest opts []opcua.Option codes []ua.StatusCode } type NodeSettings struct { FieldName string `toml:"name"` Namespace string `toml:"namespace"` IdentifierType string `toml:"identifier_type"` Identifier string `toml:"identifier"` DataType string `toml:"data_type"` // Kept for backward compatibility but was never used. Description string `toml:"description"` // Kept for backward compatibility but was never used. TagsSlice [][]string `toml:"tags"` } type Node struct { tag NodeSettings idStr string metricName string metricTags map[string]string } type GroupSettings struct { MetricName string `toml:"name"` // Overrides plugin's setting Namespace string `toml:"namespace"` // Can be overridden by node setting IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting Nodes []NodeSettings `toml:"nodes"` TagsSlice [][]string `toml:"tags"` } // OPCData type type OPCData struct { TagName string Value interface{} Quality ua.StatusCode ServerTime time.Time SourceTime time.Time DataType ua.TypeID } // ConnectionState used for constants type ConnectionState int const ( //Disconnected constant state 0 Disconnected ConnectionState = iota //Connecting constant state 1 Connecting //Connected constant state 2 Connected ) const description = `Retrieve data from OPCUA devices` const sampleConfig = ` ## Metric name # name = "opcua" # ## OPC UA Endpoint URL # endpoint = "opc.tcp://localhost:4840" # ## Maximum time allowed to establish a connect to the endpoint. # connect_timeout = "10s" # ## Maximum time allowed for a request over the estabilished connection. # request_timeout = "5s" # ## Security policy, one of "None", "Basic128Rsa15", "Basic256", ## "Basic256Sha256", or "auto" # security_policy = "auto" # ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto" # security_mode = "auto" # ## Path to cert.pem. Required when security mode or policy isn't "None". ## If cert path is not supplied, self-signed cert and key will be generated. # certificate = "/etc/telegraf/cert.pem" # ## Path to private key.pem. Required when security mode or policy isn't "None". ## If key path is not supplied, self-signed cert and key will be generated. # private_key = "/etc/telegraf/key.pem" # ## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To ## authenticate using a specific ID, select 'Certificate' or 'UserName' # auth_method = "Anonymous" # ## Username. Required for auth_method = "UserName" # username = "" # ## Password. Required for auth_method = "UserName" # password = "" # ## Option to select the metric timestamp to use. Valid options are: ## "gather" -- uses the time of receiving the data in telegraf ## "server" -- uses the timestamp provided by the server ## "source" -- uses the timestamp provided by the source # timestamp = "gather" # ## Node ID configuration ## name - field name to use in the output ## namespace - OPC UA namespace of the node (integer value 0 thru 3) ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) ## identifier - OPC UA ID (tag as shown in opcua browser) ## Example: ## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262"} # nodes = [ # {name="", namespace="", identifier_type="", identifier=""}, # {name="", namespace="", identifier_type="", identifier=""}, #] # ## Node Group ## Sets defaults for OPC UA namespace and ID type so they aren't required in ## every node. A group can also have a metric name that overrides the main ## plugin metric name. ## ## Multiple node groups are allowed #[[inputs.opcua.group]] ## Group Metric name. Overrides the top level name. If unset, the ## top level name is used. # name = # ## Group default namespace. If a node in the group doesn't set its ## namespace, this is used. # namespace = # ## Group default identifier type. If a node in the group doesn't set its ## namespace, this is used. # identifier_type = # ## Node ID Configuration. Array of nodes with the same settings as above. # nodes = [ # {name="", namespace="", identifier_type="", identifier=""}, # {name="", namespace="", identifier_type="", identifier=""}, #] ## Enable workarounds required by some devices to work correctly # [inputs.opcua.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid # additional_valid_status_codes = ["0xC0"] ` // Description will appear directly above the plugin definition in the config file func (o *OpcUA) Description() string { return description } // SampleConfig will populate the sample configuration portion of the plugin's configuration func (o *OpcUA) SampleConfig() string { return sampleConfig } // Init will initialize all tags func (o *OpcUA) Init() error { o.state = Disconnected err := choice.Check(o.Timestamp, []string{"", "gather", "server", "source"}) if err != nil { return err } err = o.validateEndpoint() if err != nil { return err } err = o.InitNodes() if err != nil { return err } err = o.setupOptions() if err != nil { return err } err = o.setupWorkarounds() if err != nil { return err } tags := map[string]string{ "endpoint": o.Endpoint, } o.ReadError = selfstat.Register("opcua", "read_error", tags) o.ReadSuccess = selfstat.Register("opcua", "read_success", tags) return nil } func (o *OpcUA) validateEndpoint() error { if o.MetricName == "" { return fmt.Errorf("device name is empty") } if o.Endpoint == "" { return fmt.Errorf("endpoint url is empty") } _, err := url.Parse(o.Endpoint) if err != nil { return fmt.Errorf("endpoint url is invalid") } //search security policy type switch o.SecurityPolicy { case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto": // Valid security policy type - do nothing. default: return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.MetricName) } //search security mode type switch o.SecurityMode { case "None", "Sign", "SignAndEncrypt", "auto": // Valid security mode type - do nothing. default: return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.MetricName) } return nil } func tagsSliceToMap(tags [][]string) (map[string]string, error) { m := make(map[string]string) for i, tag := range tags { if len(tag) != 2 { return nil, fmt.Errorf("tag %d needs 2 values, has %d: %v", i+1, len(tag), tag) } if tag[0] == "" { return nil, fmt.Errorf("tag %d has empty name", i+1) } if tag[1] == "" { return nil, fmt.Errorf("tag %d has empty value", i+1) } if _, ok := m[tag[0]]; ok { return nil, fmt.Errorf("tag %d has duplicate key: %v", i+1, tag[0]) } m[tag[0]] = tag[1] } return m, nil } //InitNodes Method on OpcUA func (o *OpcUA) InitNodes() error { for _, node := range o.RootNodes { o.nodes = append(o.nodes, Node{ metricName: o.MetricName, tag: node, }) } for _, group := range o.Groups { if group.MetricName == "" { group.MetricName = o.MetricName } groupTags, err := tagsSliceToMap(group.TagsSlice) if err != nil { return err } for _, node := range group.Nodes { if node.Namespace == "" { node.Namespace = group.Namespace } if node.IdentifierType == "" { node.IdentifierType = group.IdentifierType } nodeTags, err := tagsSliceToMap(node.TagsSlice) if err != nil { return err } mergedTags := make(map[string]string) for k, v := range groupTags { mergedTags[k] = v } for k, v := range nodeTags { mergedTags[k] = v } o.nodes = append(o.nodes, Node{ metricName: group.MetricName, tag: node, metricTags: mergedTags, }) } } err := o.validateOPCTags() if err != nil { return err } return nil } type metricParts struct { metricName string fieldName string tags string // sorted by tag name and in format tag1=value1, tag2=value2 } func newMP(n *Node) metricParts { var keys []string for key := range n.metricTags { keys = append(keys, key) } sort.Strings(keys) var sb strings.Builder for i, key := range keys { if i != 0 { // Writes to a string-builder will always succeed //nolint:errcheck,revive sb.WriteString(", ") } // Writes to a string-builder will always succeed //nolint:errcheck,revive sb.WriteString(key) // Writes to a string-builder will always succeed //nolint:errcheck,revive sb.WriteString("=") // Writes to a string-builder will always succeed //nolint:errcheck,revive sb.WriteString(n.metricTags[key]) } x := metricParts{ metricName: n.metricName, fieldName: n.tag.FieldName, tags: sb.String(), } return x } func (o *OpcUA) validateOPCTags() error { nameEncountered := map[metricParts]struct{}{} for _, node := range o.nodes { mp := newMP(&node) //check empty name if node.tag.FieldName == "" { return fmt.Errorf("empty name in '%s'", node.tag.FieldName) } //search name duplicate if _, ok := nameEncountered[mp]; ok { return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')", mp.fieldName, mp.metricName, mp.tags) } //add it to the set nameEncountered[mp] = struct{}{} //search identifier type switch node.tag.IdentifierType { case "s", "i", "g", "b": // Valid identifier type - do nothing. default: return fmt.Errorf("invalid identifier type '%s' in '%s'", node.tag.IdentifierType, node.tag.FieldName) } node.idStr = BuildNodeID(node.tag) //parse NodeIds and NodeIds errors nid, niderr := ua.ParseNodeID(node.idStr) // build NodeIds and Errors o.nodeIDs = append(o.nodeIDs, nid) o.nodeIDerror = append(o.nodeIDerror, niderr) // Grow NodeData for later input o.nodeData = append(o.nodeData, OPCData{}) } return nil } // BuildNodeID build node ID from OPC tag func BuildNodeID(tag NodeSettings) string { return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier } // Connect to a OPCUA device func Connect(o *OpcUA) error { u, err := url.Parse(o.Endpoint) if err != nil { return err } switch u.Scheme { case "opc.tcp": o.state = Connecting if o.client != nil { if err := o.client.Close(); err != nil { // Only log the error but to not bail-out here as this prevents // reconnections for multiple parties (see e.g. #9523). o.Log.Errorf("Closing connection failed: %v", err) } } o.client = opcua.NewClient(o.Endpoint, o.opts...) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout)) defer cancel() if err := o.client.Connect(ctx); err != nil { return fmt.Errorf("error in Client Connection: %s", err) } regResp, err := o.client.RegisterNodes(&ua.RegisterNodesRequest{ NodesToRegister: o.nodeIDs, }) if err != nil { return fmt.Errorf("registerNodes failed: %v", err) } o.req = &ua.ReadRequest{ MaxAge: 2000, NodesToRead: readvalues(regResp.RegisteredNodeIDs), TimestampsToReturn: ua.TimestampsToReturnBoth, } err = o.getData() if err != nil { return fmt.Errorf("get Data Failed: %v", err) } default: return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme) } return nil } func (o *OpcUA) setupOptions() error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout)) defer cancel() // Get a list of the endpoints for our target server endpoints, err := opcua.GetEndpoints(ctx, o.Endpoint) if err != nil { return err } if o.Certificate == "" && o.PrivateKey == "" { if o.SecurityPolicy != "None" || o.SecurityMode != "None" { o.Certificate, o.PrivateKey, err = generateCert("urn:telegraf:gopcua:client", 2048, o.Certificate, o.PrivateKey, 365*24*time.Hour) if err != nil { return err } } } o.opts, err = o.generateClientOpts(endpoints) return err } func (o *OpcUA) setupWorkarounds() error { if len(o.Workarounds.AdditionalValidStatusCodes) != 0 { for _, c := range o.Workarounds.AdditionalValidStatusCodes { val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion if err != nil { return err } o.codes = append(o.codes, ua.StatusCode(uint32(val))) } } return nil } func (o *OpcUA) checkStatusCode(code ua.StatusCode) bool { for _, val := range o.codes { if val == code { return true } } return false } func (o *OpcUA) getData() error { resp, err := o.client.Read(o.req) if err != nil { o.ReadError.Incr(1) return fmt.Errorf("RegisterNodes Read failed: %v", err) } o.ReadSuccess.Incr(1) for i, d := range resp.Results { o.nodeData[i].Quality = d.Status if !o.checkStatusCode(d.Status) { mp := newMP(&o.nodes[i]) o.Log.Errorf("status not OK for node '%s'(metric name '%s', tags '%s')", mp.fieldName, mp.metricName, mp.tags) continue } o.nodeData[i].TagName = o.nodes[i].tag.FieldName if d.Value != nil { o.nodeData[i].Value = d.Value.Value() o.nodeData[i].DataType = d.Value.Type() } o.nodeData[i].Quality = d.Status o.nodeData[i].ServerTime = d.ServerTimestamp o.nodeData[i].SourceTime = d.SourceTimestamp } return nil } func readvalues(ids []*ua.NodeID) []*ua.ReadValueID { rvids := make([]*ua.ReadValueID, len(ids)) for i, v := range ids { rvids[i] = &ua.ReadValueID{NodeID: v} } return rvids } func disconnect(o *OpcUA) error { u, err := url.Parse(o.Endpoint) if err != nil { return err } switch u.Scheme { case "opc.tcp": o.state = Disconnected o.client.Close() o.client = nil return nil default: return fmt.Errorf("invalid controller") } } // Gather defines what data the plugin will gather. func (o *OpcUA) Gather(acc telegraf.Accumulator) error { if o.state == Disconnected { o.state = Connecting err := Connect(o) if err != nil { o.state = Disconnected return err } } o.state = Connected err := o.getData() if err != nil && o.state == Connected { o.state = Disconnected // Ignore returned error to not mask the original problem //nolint:errcheck,revive disconnect(o) return err } for i, n := range o.nodes { if o.checkStatusCode(o.nodeData[i].Quality) { fields := make(map[string]interface{}) tags := map[string]string{ "id": n.idStr, } for k, v := range n.metricTags { tags[k] = v } fields[o.nodeData[i].TagName] = o.nodeData[i].Value fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.nodeData[i].Quality)) switch o.Timestamp { case "server": acc.AddFields(n.metricName, fields, tags, o.nodeData[i].ServerTime) case "source": acc.AddFields(n.metricName, fields, tags, o.nodeData[i].SourceTime) default: acc.AddFields(n.metricName, fields, tags) } } } return nil } // Add this plugin to telegraf func init() { inputs.Add("opcua", func() telegraf.Input { return &OpcUA{ MetricName: "opcua", Endpoint: "opc.tcp://localhost:4840", SecurityPolicy: "auto", SecurityMode: "auto", Timestamp: "gather", RequestTimeout: config.Duration(5 * time.Second), ConnectTimeout: config.Duration(10 * time.Second), Certificate: "/etc/telegraf/cert.pem", PrivateKey: "/etc/telegraf/key.pem", AuthMethod: "Anonymous", codes: []ua.StatusCode{ua.StatusOK}, } }) }