// File generated by the kubebuilder framework: // https://github.com/kubernetes-sigs/kubebuilder /* Copyright 2016 The Kubernetes Authors. Modifications Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "fmt" "reflect" "time" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" opsv1 "github.com/aws/zone-aware-controllers-for-k8s/api/v1" "github.com/aws/zone-aware-controllers-for-k8s/pkg/metrics" "github.com/aws/zone-aware-controllers-for-k8s/pkg/podzone" utils "github.com/aws/zone-aware-controllers-for-k8s/pkg/utils" ) // The ZoneDisruptionBudget (ZDB) Controller is responsible for watching for pod changes, // checking which ones are unavailable to calculate the number of disruptions allowed per zone at any time. // // The logic applied here was copied and adapted from the k8s' PDB controller (DisruptionController): // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go // Same timeout used on PDBs: // https://github.com/kubernetes/kubernetes/blob/d7123a65/pkg/controller/disruption/disruption.go#L68 const ( DeletionTimeout = 2 * 60 * time.Second ) // ZoneDisruptionBudgetReconciler reconciles a ZoneDisruptionBudget object type ZoneDisruptionBudgetReconciler struct { client.Client Scheme *runtime.Scheme Logger logr.Logger PodZoneHelper *podzone.Helper } //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zonedisruptionbudgets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zonedisruptionbudgets/status,verbs=get;update;patch //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zonedisruptionbudgets/finalizers,verbs=update //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=pods/status,verbs=get //+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch //+kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile func (r *ZoneDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var zdb opsv1.ZoneDisruptionBudget if err := r.Get(ctx, req.NamespacedName, &zdb); err != nil { r.Logger.Error(err, "Unable to fetch ZDB") return ctrl.Result{}, client.IgnoreNotFound(err) } r.Logger.Info("Begin to process ZDB", "zdb", zdb.Name) recheckTime, err := r.sync(ctx, &zdb) if err != nil { return ctrl.Result{}, err } if recheckTime != nil { return ctrl.Result{RequeueAfter: time.Until(*recheckTime)}, nil } return ctrl.Result{}, nil } // Adapted from `DisruptionController.trySync()`: // https://github.com/kubernetes/kubernetes/blob/d7123a652/pkg/controller/disruption/disruption.go#L588 func (r *ZoneDisruptionBudgetReconciler) sync(ctx context.Context, zdb *opsv1.ZoneDisruptionBudget) (*time.Time, error) { pods, err := r.getPodsForZdb(ctx, zdb) if err != nil { return nil, err } if len(pods) == 0 { r.Logger.Info("No matching pods found") } expectedCount, desiredHealthy, zonePodsMap, err := r.getExpectedPodCount(ctx, zdb, pods) if err != nil { r.Logger.Error(err, "Failed to calculate the number of expected pods") return nil, err } currentTime := time.Now() disruptedPods, recheckTime := r.buildDisruptedPodMap(pods, zdb, currentTime) currentHealthy, currentUnhealthy := countHealthyPods(zonePodsMap, disruptedPods, currentTime) err = r.updateStatus(ctx, zdb, currentHealthy, currentUnhealthy, desiredHealthy, expectedCount, disruptedPods) if err != nil { r.Logger.Error(err, "Unable to update zdb status") } return recheckTime, err } func (r *ZoneDisruptionBudgetReconciler) getPodsForZdb(ctx context.Context, zdb *opsv1.ZoneDisruptionBudget) ([]*v1.Pod, error) { labelSelector, err := metav1.LabelSelectorAsSelector(zdb.Spec.Selector) if err != nil { r.Logger.Error(err, "GetLabelSelector failed", "zdb", zdb.Name) return []*v1.Pod{}, err } listOptions := &client.ListOptions{Namespace: zdb.Namespace, LabelSelector: labelSelector} podList := &v1.PodList{} if err := r.List(ctx, podList, listOptions); err != nil { r.Logger.Error(err, "Unable to list pods") return []*v1.Pod{}, err } pods := make([]*v1.Pod, 0, len(podList.Items)) for i := range podList.Items { pods = append(pods, &podList.Items[i]) } return pods, nil } // Adapted from `DisruptionController.getExpectedPodCount()`: // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go#L623 func (r *ZoneDisruptionBudgetReconciler) getExpectedPodCount(ctx context.Context, zdb *opsv1.ZoneDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy map[string]int32, zonePodsMap map[string][]*v1.Pod, err error) { expectedCount = map[string]int32{} desiredHealthy = map[string]int32{} err = nil totalExpectedCount, unmanagedPods, err := r.getExpectedScale(zdb, pods) if err != nil { return } if len(unmanagedPods) > 0 { r.Logger.Info("Found unmanaged pods associated with this ZDB", "unmanagedPods", unmanagedPods) } maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(zdb.Spec.MaxUnavailable, int(totalExpectedCount), true) if err != nil { return } zonePodsMap = r.PodZoneHelper.GetZonePodsMap(ctx, pods) totalPods := int32(0) for zone := range zonePodsMap { podCount := int32(len(zonePodsMap[zone])) expectedCount[zone] = int32(podCount) totalPods += podCount } // On PDBs, the expectedCount can be directly retrieved from the Statefulsets's desired // number of replicas. In our case we can't do that because it's possible to have a number of pods // that is not exactly a multiple of the number of zones. Given that, we will consider that pods // that were not created already are going to be evenly distributed across the zones and keep // incrementing ExpectedPods by one until their sum is equal to totalExpectedCount sortedExpectedCount := utils.SortMapByValue(expectedCount) for totalExpectedCount > totalPods && len(sortedExpectedCount) > 0 { r.Logger.Info("Less observed pods than expected", "expected", totalExpectedCount, "observed", totalPods) for _, pair := range sortedExpectedCount { expectedCount[pair.Key] = expectedCount[pair.Key] + 1 totalPods++ if totalPods >= totalExpectedCount { break } } } for zone := range expectedCount { desiredHealthy[zone] = expectedCount[zone] - int32(maxUnavailable) if desiredHealthy[zone] < 0 { desiredHealthy[zone] = 0 } } return } // Adapted from `DisruptionController.buildDisruptedPodMap()`: // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go#L749 func (r *ZoneDisruptionBudgetReconciler) buildDisruptedPodMap(pods []*v1.Pod, zdb *opsv1.ZoneDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) { disruptedPods := zdb.Status.DisruptedPods result := make(map[string]metav1.Time) var recheckTime *time.Time if disruptedPods == nil { return result, recheckTime } for _, pod := range pods { if utils.IsTerminating(pod) { // Already being deleted. continue } disruptionTime, found := disruptedPods[pod.Name] if !found { // Pod not on the list. continue } expectedDeletion := disruptionTime.Time.Add(DeletionTimeout) if expectedDeletion.Before(currentTime) { r.Logger.Info("Pod was expected to be deleted but it wasn't", "pod", pod.Name, "disruptionTime", disruptionTime.String()) } else { if recheckTime == nil || expectedDeletion.Before(*recheckTime) { recheckTime = &expectedDeletion } result[pod.Name] = disruptionTime } } return result, recheckTime } // Adapted from `DisruptionController.countHealthyPods()`: // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go#L729 func countHealthyPods(zonePodsMap map[string][]*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy, currentUnhealthy map[string]int32) { currentHealthy = map[string]int32{} currentUnhealthy = map[string]int32{} for zone := range zonePodsMap { healthy := int32(0) unhealthy := int32(0) for _, pod := range zonePodsMap[zone] { // Pod is being deleted. if utils.IsTerminating(pod) { unhealthy++ continue } // Pod is expected to be deleted soon. if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) { unhealthy++ continue } if utils.IsRunningAndReady(pod) { healthy++ } else { unhealthy++ } } currentHealthy[zone] = healthy currentUnhealthy[zone] = unhealthy } return } // Adapted from `DisruptionController.getExpectedScale()`: // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go#L665 func (r *ZoneDisruptionBudgetReconciler) getExpectedScale(zdb *opsv1.ZoneDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) { // A mapping from controllers to their scale. controllerScale := map[types.UID]int32{} // 1. Find the controller for each pod. // With ControllerRef, a pod can only have 1 controller. for _, pod := range pods { controllerRef := metav1.GetControllerOf(pod) if controllerRef == nil { unmanagedPods = append(unmanagedPods, pod.Name) continue } // If we already know the scale of the controller there is no need to do anything. if _, found := controllerScale[controllerRef.UID]; found { continue } // Check all the supported controllers to find the desired scale. foundController := false for _, finder := range utils.Finders() { var controllerNScale *utils.ControllerAndScale controllerNScale, err = finder(r.Client, controllerRef, pod.Namespace) if err != nil { return } if controllerNScale != nil { controllerScale[controllerNScale.UID] = controllerNScale.Scale foundController = true break } } if !foundController { err = fmt.Errorf("Found no controllers for pod %q", pod.Name) return } } // 2. Add up all the controllers. expectedCount = 0 for _, count := range controllerScale { expectedCount += count } return } // Adapted from `DisruptionController.updatePdbStatus()`: // https://github.com/kubernetes/kubernetes/blob/d7123a65248/pkg/controller/disruption/disruption.go#L806 func (r *ZoneDisruptionBudgetReconciler) updateStatus(ctx context.Context, zdb *opsv1.ZoneDisruptionBudget, currentHealthy, currentUnhealthy, desiredHealthy, expectedCount map[string]int32, disruptedPods map[string]metav1.Time) error { disruptedZones := []string{} for zone := range currentUnhealthy { if currentUnhealthy[zone] > 0 { disruptedZones = append(disruptedZones, zone) } } disruptionsAllowed := map[string]int32{} disruptedZoneCount := len(disruptedZones) for zone := range desiredHealthy { if disruptedZoneCount > 1 { disruptionsAllowed[zone] = 0 } else if disruptedZoneCount == 1 { if disruptedZones[0] == zone { disruptionsAllowed[zone] = currentHealthy[zone] - desiredHealthy[zone] } else { disruptionsAllowed[zone] = 0 } } else { disruptionsAllowed[zone] = currentHealthy[zone] - desiredHealthy[zone] } if disruptionsAllowed[zone] < 0 { disruptionsAllowed[zone] = 0 } } if reflect.DeepEqual(zdb.Status.CurrentHealthy, currentHealthy) && reflect.DeepEqual(zdb.Status.CurrentUnhealthy, currentUnhealthy) && reflect.DeepEqual(zdb.Status.DesiredHealthy, desiredHealthy) && reflect.DeepEqual(zdb.Status.ExpectedPods, expectedCount) && reflect.DeepEqual(zdb.Status.DisruptedPods, disruptedPods) && reflect.DeepEqual(zdb.Status.DisruptionsAllowed, disruptionsAllowed) && zdb.Status.ObservedGeneration == zdb.Generation { return nil } zdb.Status.CurrentHealthy = currentHealthy zdb.Status.CurrentUnhealthy = currentUnhealthy zdb.Status.DesiredHealthy = desiredHealthy zdb.Status.ExpectedPods = expectedCount zdb.Status.DisruptedPods = disruptedPods zdb.Status.DisruptionsAllowed = disruptionsAllowed zdb.Status.ObservedGeneration = zdb.Generation err := r.Client.Status().Update(ctx, zdb) if err != nil { return err } r.Logger.Info("ZDB status updated", "spec", zdb.Spec, "status", zdb.Status) metrics.PublishZdbStatusMetrics(zdb) return nil } // SetupWithManager sets up the controller with the Manager. func (r *ZoneDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&opsv1.ZoneDisruptionBudget{}). // watch for changes to pods Watches( &source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(r.findZdbForPod), ). Complete(r) } func (r *ZoneDisruptionBudgetReconciler) findZdbForPod(obj client.Object) []reconcile.Request { pod, ok := obj.(*v1.Pod) if !ok { r.Logger.Info("Failed to covert object to pod", "obj", obj) return []reconcile.Request{} } zdb, err := utils.GetZdbForPod(r.Client, r.Logger, pod) if err != nil { r.Logger.Error(err, "Error getting zdb for pod", "pod", pod.Name) return []reconcile.Request{} } if zdb == nil { return []reconcile.Request{} } request := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: zdb.GetName(), Namespace: zdb.GetNamespace(), }, } return []reconcile.Request{request} }