package starlark import ( "fmt" "github.com/influxdata/telegraf" common "github.com/influxdata/telegraf/plugins/common/starlark" "github.com/influxdata/telegraf/plugins/processors" "go.starlark.net/starlark" ) const ( description = "Process metrics using a Starlark script" sampleConfig = ` ## The Starlark source can be set as a string in this configuration file, or ## by referencing a file containing the script. Only one source or script ## should be set at once. ## ## Source of the Starlark script. source = ''' def apply(metric): return metric ''' ## File containing a Starlark script. # script = "/usr/local/bin/myscript.star" ## The constants of the Starlark script. # [processors.starlark.constants] # max_size = 10 # threshold = 0.75 # default_name = "Julia" # debug_mode = true ` ) type Starlark struct { common.StarlarkCommon results []telegraf.Metric } func (s *Starlark) Init() error { err := s.StarlarkCommon.Init() if err != nil { return err } // The source should define an apply function. err = s.AddFunction("apply", &common.Metric{}) if err != nil { return err } // Preallocate a slice for return values. s.results = make([]telegraf.Metric, 0, 10) return nil } func (s *Starlark) SampleConfig() string { return sampleConfig } func (s *Starlark) Description() string { return description } func (s *Starlark) Start(_ telegraf.Accumulator) error { return nil } func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { parameters, found := s.GetParameters("apply") if !found { return fmt.Errorf("The parameters of the apply function could not be found") } parameters[0].(*common.Metric).Wrap(metric) rv, err := s.Call("apply") if err != nil { s.LogError(err) metric.Reject() return err } switch rv := rv.(type) { case *starlark.List: iter := rv.Iterate() defer iter.Done() var v starlark.Value for iter.Next(&v) { switch v := v.(type) { case *common.Metric: m := v.Unwrap() if containsMetric(s.results, m) { s.Log.Errorf("Duplicate metric reference detected") continue } s.results = append(s.results, m) acc.AddMetric(m) default: s.Log.Errorf("Invalid type returned in list: %s", v.Type()) } } // If the script didn't return the original metrics, mark it as // successfully handled. if !containsMetric(s.results, metric) { metric.Accept() } // clear results for i := range s.results { s.results[i] = nil } s.results = s.results[:0] case *common.Metric: m := rv.Unwrap() // If the script returned a different metric, mark this metric as // successfully handled. if m != metric { metric.Accept() } acc.AddMetric(m) case starlark.NoneType: metric.Drop() default: return fmt.Errorf("Invalid type returned: %T", rv) } return nil } func (s *Starlark) Stop() error { return nil } func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { for _, m := range metrics { if m == metric { return true } } return false } func init() { processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { return &Starlark{ StarlarkCommon: common.StarlarkCommon{ StarlarkLoadFunc: common.LoadFunc, }, } }) }