// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may // not use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language governing // permissions and limitations under the License. package worker import ( "context" "errors" "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/metrics" ) // Prometheus metrics var ( prometheusRegistered = false jobsSubmittedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "jobs_submitted_count", Help: "The number of jobs submitted to the buffer", }, []string{"resource"}, ) jobsCompletedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "jobs_completed_count", Help: "The number of jobs completed by worker routines", }, []string{"resource"}, ) jobsFailedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "jobs_failed_count", Help: "The number of jobs that failed to complete after retries", }, []string{"resource"}, ) jobsNotFoundCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "jobs_resource_not_found_count", Help: "The number of jobs whose resources were not found", }, []string{"resource"}, ) ) // Errors var ( WorkersAlreadyStartedError = errors.New("failed to start the workers as they are already running") ) type Worker interface { StartWorkerPool(func(interface{}) (ctrl.Result, error)) error SubmitJob(job interface{}) SubmitJobAfter(job interface{}, submitAfter time.Duration) } type worker struct { // resourceName that the worker belongs to resourceName string // workersStarted is the flag to prevent starting duplicate set of workers workersStarted bool // workerFunc is the function that will be invoked with the job by the worker routine workerFunc func(interface{}) (ctrl.Result, error) // maxRetries is the number of times to retry item in case of failure maxRetriesOnErr int // maxWorkerCount represents the maximum number of workers that will be started maxWorkerCount int // ctx is the background context to close the chanel on termination signal ctx context.Context // Log is the structured logger set to log with resource name Log logr.Logger // queue is the k8s rate limiting queue to store the submitted jobs queue workqueue.RateLimitingInterface } // NewDefaultWorkerPool returns a new worker pool for a give resource type with the given configuration func NewDefaultWorkerPool(resourceName string, workerCount int, maxRequeue int, logger logr.Logger, ctx context.Context) Worker { prometheusRegister() return &worker{ resourceName: resourceName, maxRetriesOnErr: maxRequeue, maxWorkerCount: workerCount, Log: logger, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), ctx: ctx, } } // prometheusRegister registers the metrics. func prometheusRegister() { if !prometheusRegistered { metrics.Registry.MustRegister( jobsSubmittedCount, jobsCompletedCount, jobsFailedCount, jobsNotFoundCount) prometheusRegistered = true } } func (w *worker) SetWorkerFunc(workerFunc func(interface{}) (ctrl.Result, error)) { w.workerFunc = workerFunc } // SubmitJob adds the job to the rate limited queue func (w *worker) SubmitJob(job interface{}) { // in theory, only health check endpoint should send a nil job to test periodically if job == nil { queueLen := w.queue.Len() w.Log.V(1).Info("For informational / health check purpose only to check worker queue availability", "WorkerQueueLen", queueLen) return } w.queue.Add(job) jobsSubmittedCount.WithLabelValues(w.resourceName).Inc() } // SubmitJobAfter submits the job to the work queue after the given time period func (w *worker) SubmitJobAfter(job interface{}, submitAfter time.Duration) { w.queue.AddAfter(job, submitAfter) jobsSubmittedCount.WithLabelValues(w.resourceName).Inc() } // runWorker runs a worker that listens on new item on the worker queue func (w *worker) runWorker() { for w.processNextItem() { } } // processNextItem returns false if the queue is shut down, otherwise processes the job and returns true func (w *worker) processNextItem() (cont bool) { job, quit := w.queue.Get() if quit { return } defer w.queue.Done(job) log := w.Log.WithValues("job", job) cont = true if result, err := w.workerFunc(job); err != nil { if w.queue.NumRequeues(job) >= w.maxRetriesOnErr { log.Error(err, "exceeded maximum retries", "max retries", w.maxRetriesOnErr) w.queue.Forget(job) jobsFailedCount.WithLabelValues(w.resourceName).Inc() return } else if apierrors.IsNotFound(err) { //similar to upstream https://github.com/kubernetes-sigs/controller-runtime/issues/377#issue-426207628 log.Error(err, "won't requeue a not found errored job", "job", job) w.queue.Forget(job) jobsNotFoundCount.WithLabelValues(w.resourceName).Inc() return } log.Error(err, "re-queuing job", "retry count", w.queue.NumRequeues(job)) w.queue.AddRateLimited(job) return } else if result.Requeue { log.V(1).Info("timed retry", "retry after", result.RequeueAfter) w.queue.AddAfter(job, result.RequeueAfter) return } log.V(1).Info("completed job successfully") w.queue.Forget(job) jobsCompletedCount.WithLabelValues(w.resourceName).Inc() return } // StartWorkerPool starts the worker pool that starts the worker routines that concurrently listen on the channel func (w *worker) StartWorkerPool(workerFunc func(interface{}) (ctrl.Result, error)) error { if w.workersStarted { return WorkersAlreadyStartedError } w.workerFunc = workerFunc w.workersStarted = true go func() { w.Log.Info("starting routine to listen on chanel for termination signal") <-w.ctx.Done() w.queue.ShutDown() w.Log.Info("shut down the queue after receiving termination signal") }() w.Log.Info("starting worker routines", "worker count", w.maxWorkerCount) // Start a new go routine to listen on the chanel and allocate jobs to go routines for workerCount := 1; workerCount <= w.maxWorkerCount; workerCount++ { go w.runWorker() } return nil }