package syslog import ( "crypto/tls" "fmt" "io" "net" "net/url" "os" "path/filepath" "strings" "sync" "time" "unicode" "github.com/influxdata/go-syslog/v3" "github.com/influxdata/go-syslog/v3/nontransparent" "github.com/influxdata/go-syslog/v3/octetcounting" "github.com/influxdata/go-syslog/v3/rfc3164" "github.com/influxdata/go-syslog/v3/rfc5424" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" framing "github.com/influxdata/telegraf/internal/syslog" tlsConfig "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) type syslogRFC string const defaultReadTimeout = time.Second * 5 const ipMaxPacketSize = 64 * 1024 const syslogRFC3164 = "RFC3164" const syslogRFC5424 = "RFC5424" // Syslog is a syslog plugin type Syslog struct { tlsConfig.ServerConfig Address string `toml:"server"` KeepAlivePeriod *config.Duration MaxConnections int ReadTimeout *config.Duration Framing framing.Framing SyslogStandard syslogRFC Trailer nontransparent.TrailerType BestEffort bool Separator string `toml:"sdparam_separator"` now func() time.Time lastTime time.Time mu sync.Mutex wg sync.WaitGroup io.Closer isStream bool tcpListener net.Listener tlsConfig *tls.Config connections map[string]net.Conn connectionsMu sync.Mutex udpListener net.PacketConn } var sampleConfig = ` ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 ## Protocol, address and port to host the syslog receiver. ## If no host is specified, then localhost is used. ## If no port is specified, 6514 is used (RFC5425#section-4.1). server = "tcp://:6514" ## TLS Config # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Period between keep alive probes. ## 0 disables keep alive probes. ## Defaults to the OS configuration. ## Only applies to stream sockets (e.g. TCP). # keep_alive_period = "5m" ## Maximum number of concurrent connections (default = 0). ## 0 means unlimited. ## Only applies to stream sockets (e.g. TCP). # max_connections = 1024 ## Read timeout is the maximum time allowed for reading a single message (default = 5s). ## 0 means unlimited. # read_timeout = "5s" ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), ## or the non-transparent framing technique (RFC6587#section-3.4.2). ## Must be one of "octet-counting", "non-transparent". # framing = "octet-counting" ## The trailer to be expected in case of non-transparent framing (default = "LF"). ## Must be one of "LF", or "NUL". # trailer = "LF" ## Whether to parse in best effort mode or not (default = false). ## By default best effort parsing is off. # best_effort = false ## The RFC standard to use for message parsing ## By default RFC5424 is used. RFC3164 only supports UDP transport (no streaming support) ## Must be one of "RFC5424", or "RFC3164". # syslog_standard = "RFC5424" ## Character to prepend to SD-PARAMs (default = "_"). ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] ## For each combination a field is created. ## Its name is created concatenating identifier, sdparam_separator, and parameter name. # sdparam_separator = "_" ` // SampleConfig returns sample configuration message func (s *Syslog) SampleConfig() string { return sampleConfig } // Description returns the plugin description func (s *Syslog) Description() string { return "Accepts syslog messages following RFC5424 format with transports as per RFC5426, RFC5425, or RFC6587" } // Gather ... func (s *Syslog) Gather(_ telegraf.Accumulator) error { return nil } // Start starts the service. func (s *Syslog) Start(acc telegraf.Accumulator) error { s.mu.Lock() defer s.mu.Unlock() scheme, host, err := getAddressParts(s.Address) if err != nil { return err } s.Address = host switch scheme { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": s.isStream = true case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": s.isStream = false default: return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address) } if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { // Accept success and failure in case the file does not exist //nolint:errcheck,revive os.Remove(s.Address) } if s.isStream { l, err := net.Listen(scheme, s.Address) if err != nil { return err } s.Closer = l s.tcpListener = l s.tlsConfig, err = s.TLSConfig() if err != nil { return err } s.wg.Add(1) go s.listenStream(acc) } else { l, err := net.ListenPacket(scheme, s.Address) if err != nil { return err } s.Closer = l s.udpListener = l s.wg.Add(1) go s.listenPacket(acc) } if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { s.Closer = unixCloser{path: s.Address, closer: s.Closer} } return nil } // Stop cleans up all resources func (s *Syslog) Stop() { s.mu.Lock() defer s.mu.Unlock() if s.Closer != nil { // Ignore the returned error as we cannot do anything about it anyway //nolint:errcheck,revive s.Close() } s.wg.Wait() } // getAddressParts returns the address scheme and host // it also sets defaults for them when missing // when the input address does not specify the protocol it returns an error func getAddressParts(a string) (scheme string, host string, err error) { parts := strings.SplitN(a, "://", 2) if len(parts) != 2 { return "", "", fmt.Errorf("missing protocol within address '%s'", a) } u, err := url.Parse(filepath.ToSlash(a)) //convert backslashes to slashes (to make Windows path a valid URL) if err != nil { return "", "", fmt.Errorf("could not parse address '%s': %v", a, err) } switch u.Scheme { case "unix", "unixpacket", "unixgram": return parts[0], parts[1], nil } if u.Hostname() != "" { host = u.Hostname() } host += ":" if u.Port() == "" { host += "6514" } else { host += u.Port() } return u.Scheme, host, nil } func (s *Syslog) listenPacket(acc telegraf.Accumulator) { defer s.wg.Done() b := make([]byte, ipMaxPacketSize) var p syslog.Machine switch { case !s.BestEffort && s.SyslogStandard == syslogRFC5424: p = rfc5424.NewParser() case s.BestEffort && s.SyslogStandard == syslogRFC5424: p = rfc5424.NewParser(rfc5424.WithBestEffort()) case !s.BestEffort && s.SyslogStandard == syslogRFC3164: p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{})) case s.BestEffort && s.SyslogStandard == syslogRFC3164: p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}), rfc3164.WithBestEffort()) } for { n, _, err := s.udpListener.ReadFrom(b) if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { acc.AddError(err) } break } message, err := p.Parse(b[:n]) if message != nil { acc.AddFields("syslog", fields(message, s), tags(message), s.currentTime()) } if err != nil { acc.AddError(err) } } } func (s *Syslog) listenStream(acc telegraf.Accumulator) { defer s.wg.Done() s.connections = map[string]net.Conn{} for { conn, err := s.tcpListener.Accept() if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { acc.AddError(err) } break } var tcpConn, _ = conn.(*net.TCPConn) if s.tlsConfig != nil { conn = tls.Server(conn, s.tlsConfig) } s.connectionsMu.Lock() if s.MaxConnections > 0 && len(s.connections) >= s.MaxConnections { s.connectionsMu.Unlock() if err := conn.Close(); err != nil { acc.AddError(err) } continue } s.connections[conn.RemoteAddr().String()] = conn s.connectionsMu.Unlock() if err := s.setKeepAlive(tcpConn); err != nil { acc.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", s.Address, err)) } go s.handle(conn, acc) } s.connectionsMu.Lock() for _, c := range s.connections { if err := c.Close(); err != nil { acc.AddError(err) } } s.connectionsMu.Unlock() } func (s *Syslog) removeConnection(c net.Conn) { s.connectionsMu.Lock() delete(s.connections, c.RemoteAddr().String()) s.connectionsMu.Unlock() } func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { defer func() { s.removeConnection(conn) // Ignore the returned error as we cannot do anything about it anyway //nolint:errcheck,revive conn.Close() }() var p syslog.Parser emit := func(r *syslog.Result) { s.store(*r, acc) if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 { if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil { acc.AddError(fmt.Errorf("setting read deadline failed: %v", err)) } } } // Create parser options opts := []syslog.ParserOption{ syslog.WithListener(emit), } if s.BestEffort { opts = append(opts, syslog.WithBestEffort()) } // Select the parser to use depending on transport framing if s.Framing == framing.OctetCounting { // Octet counting transparent framing p = octetcounting.NewParser(opts...) } else { // Non-transparent framing opts = append(opts, nontransparent.WithTrailer(s.Trailer)) p = nontransparent.NewParser(opts...) } p.Parse(conn) if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 { if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil { acc.AddError(fmt.Errorf("setting read deadline failed: %v", err)) } } } func (s *Syslog) setKeepAlive(c *net.TCPConn) error { if s.KeepAlivePeriod == nil { return nil } if *s.KeepAlivePeriod == 0 { return c.SetKeepAlive(false) } if err := c.SetKeepAlive(true); err != nil { return err } return c.SetKeepAlivePeriod(time.Duration(*s.KeepAlivePeriod)) } func (s *Syslog) store(res syslog.Result, acc telegraf.Accumulator) { if res.Error != nil { acc.AddError(res.Error) } if res.Message != nil { acc.AddFields("syslog", fields(res.Message, s), tags(res.Message), s.currentTime()) } } func tags(msg syslog.Message) map[string]string { ts := map[string]string{} // Not checking assuming a minimally valid message ts["severity"] = *msg.SeverityShortLevel() ts["facility"] = *msg.FacilityLevel() switch m := msg.(type) { case *rfc5424.SyslogMessage: populateCommonTags(&m.Base, ts) case *rfc3164.SyslogMessage: populateCommonTags(&m.Base, ts) } return ts } func fields(msg syslog.Message, s *Syslog) map[string]interface{} { flds := map[string]interface{}{} switch m := msg.(type) { case *rfc5424.SyslogMessage: populateCommonFields(&m.Base, flds) // Not checking assuming a minimally valid message flds["version"] = m.Version if m.StructuredData != nil { for sdid, sdparams := range *m.StructuredData { if len(sdparams) == 0 { // When SD-ID does not have params we indicate its presence with a bool flds[sdid] = true continue } for name, value := range sdparams { // Using whitespace as separator since it is not allowed by the grammar within SDID flds[sdid+s.Separator+name] = value } } } case *rfc3164.SyslogMessage: populateCommonFields(&m.Base, flds) } return flds } func populateCommonFields(msg *syslog.Base, flds map[string]interface{}) { flds["facility_code"] = int(*msg.Facility) flds["severity_code"] = int(*msg.Severity) if msg.Timestamp != nil { flds["timestamp"] = (*msg.Timestamp).UnixNano() } if msg.ProcID != nil { flds["procid"] = *msg.ProcID } if msg.MsgID != nil { flds["msgid"] = *msg.MsgID } if msg.Message != nil { flds["message"] = strings.TrimRightFunc(*msg.Message, func(r rune) bool { return unicode.IsSpace(r) }) } } func populateCommonTags(msg *syslog.Base, ts map[string]string) { if msg.Hostname != nil { ts["hostname"] = *msg.Hostname } if msg.Appname != nil { ts["appname"] = *msg.Appname } } type unixCloser struct { path string closer io.Closer } func (uc unixCloser) Close() error { err := uc.closer.Close() // Accept success and failure in case the file does not exist //nolint:errcheck,revive os.Remove(uc.path) return err } func (s *Syslog) currentTime() time.Time { t := s.now() if t == s.lastTime { t = t.Add(time.Nanosecond) } s.lastTime = t return t } func getNanoNow() time.Time { return time.Unix(0, time.Now().UnixNano()) } func init() { defaultTimeout := config.Duration(defaultReadTimeout) inputs.Add("syslog", func() telegraf.Input { return &Syslog{ Address: ":6514", now: getNanoNow, ReadTimeout: &defaultTimeout, Framing: framing.OctetCounting, SyslogStandard: syslogRFC5424, Trailer: nontransparent.LF, Separator: "_", } }) }