//go:build !solaris // +build !solaris package tail import ( "bytes" "context" "errors" "io" "strings" "sync" "time" "github.com/dimchansky/utfbom" "github.com/influxdata/tail" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" ) const ( defaultWatchMethod = "inotify" ) var ( offsets = make(map[string]int64) offsetsMutex = new(sync.Mutex) ) type empty struct{} type semaphore chan empty type Tail struct { Files []string `toml:"files"` FromBeginning bool `toml:"from_beginning"` Pipe bool `toml:"pipe"` WatchMethod string `toml:"watch_method"` MaxUndeliveredLines int `toml:"max_undelivered_lines"` CharacterEncoding string `toml:"character_encoding"` PathTag string `toml:"path_tag"` Log telegraf.Logger `toml:"-"` tailers map[string]*tail.Tail offsets map[string]int64 parserFunc parsers.ParserFunc wg sync.WaitGroup acc telegraf.TrackingAccumulator MultilineConfig MultilineConfig `toml:"multiline"` multiline *Multiline ctx context.Context cancel context.CancelFunc sem semaphore decoder *encoding.Decoder } func NewTail() *Tail { offsetsMutex.Lock() offsetsCopy := make(map[string]int64, len(offsets)) for k, v := range offsets { offsetsCopy[k] = v } offsetsMutex.Unlock() return &Tail{ FromBeginning: false, MaxUndeliveredLines: 1000, offsets: offsetsCopy, PathTag: "path", } } const sampleConfig = ` ## File names or a pattern to tail. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: ## "/var/log/**.log" -> recursively find all .log files in /var/log ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log ## "/var/log/apache.log" -> just tail the apache log file ## "/var/log/log[!1-2]* -> tail files without 1-2 ## "/var/log/log[^1-2]* -> identical behavior as above ## See https://github.com/gobwas/glob for more examples ## files = ["/var/mymetrics.out"] ## Read file from beginning. # from_beginning = false ## Whether file is a named pipe # pipe = false ## Method used to watch for file updates. Can be either "inotify" or "poll". # watch_method = "inotify" ## Maximum lines of the file to process that have not yet be written by the ## output. For best throughput set based on the number of metrics on each ## line and the size of the output's metric_batch_size. # max_undelivered_lines = 1000 ## Character encoding to use when interpreting the file contents. Invalid ## characters are replaced using the unicode replacement character. When set ## to the empty string the data is not decoded to text. ## ex: character_encoding = "utf-8" ## character_encoding = "utf-16le" ## character_encoding = "utf-16be" ## character_encoding = "" # character_encoding = "" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ## Set the tag that will contain the path of the tailed file. If you don't want this tag, set it to an empty string. # path_tag = "path" ## multiline parser/codec ## https://www.elastic.co/guide/en/logstash/2.4/plugins-filters-multiline.html #[inputs.tail.multiline] ## The pattern should be a regexp which matches what you believe to be an ## indicator that the field is part of an event consisting of multiple lines of log data. #pattern = "^\s" ## This field must be either "previous" or "next". ## If a line matches the pattern, "previous" indicates that it belongs to the previous line, ## whereas "next" indicates that the line belongs to the next one. #match_which_line = "previous" ## The invert_match field can be true or false (defaults to false). ## If true, a message not matching the pattern will constitute a match of the multiline ## filter and the what will be applied. (vice-versa is also true) #invert_match = false ## After the specified timeout, this plugin sends a multiline event even if no new pattern ## is found to start a new event. The default timeout is 5s. #timeout = 5s ` func (t *Tail) SampleConfig() string { return sampleConfig } func (t *Tail) Description() string { return "Parse the new lines appended to a file" } func (t *Tail) Init() error { if t.MaxUndeliveredLines == 0 { return errors.New("max_undelivered_lines must be positive") } t.sem = make(semaphore, t.MaxUndeliveredLines) var err error t.decoder, err = encoding.NewDecoder(t.CharacterEncoding) return err } func (t *Tail) Gather(_ telegraf.Accumulator) error { return t.tailNewFiles(true) } func (t *Tail) Start(acc telegraf.Accumulator) error { t.acc = acc.WithTracking(t.MaxUndeliveredLines) t.ctx, t.cancel = context.WithCancel(context.Background()) t.wg.Add(1) go func() { defer t.wg.Done() for { select { case <-t.ctx.Done(): return case <-t.acc.Delivered(): <-t.sem } } }() var err error t.multiline, err = t.MultilineConfig.NewMultiline() if err != nil { return err } t.tailers = make(map[string]*tail.Tail) err = t.tailNewFiles(t.FromBeginning) // clear offsets t.offsets = make(map[string]int64) // assumption that once Start is called, all parallel plugins have already been initialized offsetsMutex.Lock() offsets = make(map[string]int64) offsetsMutex.Unlock() return err } func (t *Tail) tailNewFiles(fromBeginning bool) error { var poll bool if t.WatchMethod == "poll" { poll = true } // Create a "tailer" for each file for _, filepath := range t.Files { g, err := globpath.Compile(filepath) if err != nil { t.Log.Errorf("Glob %q failed to compile: %s", filepath, err.Error()) } for _, file := range g.Match() { if _, ok := t.tailers[file]; ok { // we're already tailing this file continue } var seek *tail.SeekInfo if !t.Pipe && !fromBeginning { if offset, ok := t.offsets[file]; ok { t.Log.Debugf("Using offset %d for %q", offset, file) seek = &tail.SeekInfo{ Whence: 0, Offset: offset, } } else { seek = &tail.SeekInfo{ Whence: 2, Offset: 0, } } } tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, Follow: true, Location: seek, MustExist: true, Poll: poll, Pipe: t.Pipe, Logger: tail.DiscardingLogger, OpenReaderFunc: func(rd io.Reader) io.Reader { r, _ := utfbom.Skip(t.decoder.Reader(rd)) return r }, }) if err != nil { t.Log.Debugf("Failed to open file (%s): %v", file, err) continue } t.Log.Debugf("Tail added for %q", file) parser, err := t.parserFunc() if err != nil { t.Log.Errorf("Creating parser: %s", err.Error()) continue } // create a goroutine for each "tailer" t.wg.Add(1) go func() { defer t.wg.Done() t.receiver(parser, tailer) t.Log.Debugf("Tail removed for %q", tailer.Filename) if err := tailer.Err(); err != nil { t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error()) } }() t.tailers[tailer.Filename] = tailer } } return nil } // ParseLine parses a line of text. func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) { switch parser.(type) { case *csv.Parser: m, err := parser.Parse([]byte(line)) if err != nil { if errors.Is(err, io.EOF) { return nil, nil } return nil, err } return m, err default: return parser.Parse([]byte(line)) } } // Receiver is launched as a goroutine to continuously watch a tailed logfile // for changes, parse any incoming msgs, and add to the accumulator. func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { // holds the individual lines of multi-line log entries. var buffer bytes.Buffer var timer *time.Timer var timeout <-chan time.Time // The multiline mode requires a timer in order to flush the multiline buffer // if no new lines are incoming. if t.multiline.IsEnabled() { timer = time.NewTimer(time.Duration(*t.MultilineConfig.Timeout)) timeout = timer.C } channelOpen := true tailerOpen := true var line *tail.Line for { line = nil if timer != nil { timer.Reset(time.Duration(*t.MultilineConfig.Timeout)) } select { case <-t.ctx.Done(): channelOpen = false case line, tailerOpen = <-tailer.Lines: if !tailerOpen { channelOpen = false } case <-timeout: } var text string if line != nil { // Fix up files with Windows line endings. text = strings.TrimRight(line.Text, "\r") if t.multiline.IsEnabled() { if text = t.multiline.ProcessLine(text, &buffer); text == "" { continue } } } if line == nil || !channelOpen || !tailerOpen { if text += t.multiline.Flush(&buffer); text == "" { if !channelOpen { return } continue } } if line != nil && line.Err != nil { t.Log.Errorf("Tailing %q: %s", tailer.Filename, line.Err.Error()) continue } metrics, err := parseLine(parser, text) if err != nil { t.Log.Errorf("Malformed log line in %q: [%q]: %s", tailer.Filename, text, err.Error()) continue } if t.PathTag != "" { for _, metric := range metrics { metric.AddTag(t.PathTag, tailer.Filename) } } // try writing out metric first without blocking select { case t.sem <- empty{}: t.acc.AddTrackingMetricGroup(metrics) if t.ctx.Err() != nil { return // exit! } continue // next loop default: // no room. switch to blocking write. } // Block until plugin is stopping or room is available to add metrics. select { case <-t.ctx.Done(): return case t.sem <- empty{}: t.acc.AddTrackingMetricGroup(metrics) } } } func (t *Tail) Stop() { for _, tailer := range t.tailers { if !t.Pipe && !t.FromBeginning { // store offset for resume offset, err := tailer.Tell() if err == nil { t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename) } else { t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error()) } } err := tailer.Stop() if err != nil { t.Log.Errorf("Stopping tail on %q: %s", tailer.Filename, err.Error()) } } t.cancel() t.wg.Wait() // persist offsets offsetsMutex.Lock() for k, v := range t.offsets { offsets[k] = v } offsetsMutex.Unlock() } func (t *Tail) SetParserFunc(fn parsers.ParserFunc) { t.parserFunc = fn } func init() { inputs.Add("tail", func() telegraf.Input { return NewTail() }) }