// File generated by the kubebuilder framework: // https://github.com/kubernetes-sigs/kubebuilder /* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "math" "reflect" "sort" "strconv" "strings" "time" "github.com/go-logr/logr" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/integer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" opsv1 "github.com/aws/zone-aware-controllers-for-k8s/api/v1" "github.com/aws/zone-aware-controllers-for-k8s/pkg/metrics" "github.com/aws/zone-aware-controllers-for-k8s/pkg/podzone" utils "github.com/aws/zone-aware-controllers-for-k8s/pkg/utils" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" ) const requeueInterval = 10 * time.Second type CloudWatchAPI interface { DescribeAlarms(ctx context.Context, params *cloudwatch.DescribeAlarmsInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.DescribeAlarmsOutput, error) } // ZoneAwareUpdateReconciler reconciles a ZoneAwareUpdate object type ZoneAwareUpdateReconciler struct { client.Client Scheme *runtime.Scheme Logger logr.Logger PodZoneHelper *podzone.Helper AlarmStateProvider utils.AlarmStateProvider } //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zoneawareupdates,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zoneawareupdates/status,verbs=get;update;patch //+kubebuilder:rbac:groups=zonecontrol.k8s.aws,resources=zoneawareupdates/finalizers,verbs=update //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete //+kubebuilder:rbac:groups="",resources=pods/status,verbs=get //+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch //+kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch //+kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get;update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile func (r *ZoneAwareUpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var zau opsv1.ZoneAwareUpdate if err := r.Get(ctx, req.NamespacedName, &zau); err != nil { r.Logger.Error(err, "Unable to fetch ZAU") return ctrl.Result{}, client.IgnoreNotFound(err) } r.Logger.Info("Begin to process ZAU", "zau", zau.Name) var sts apps.StatefulSet if err := r.Get(ctx, types.NamespacedName{Name: zau.Spec.StatefulSet, Namespace: zau.Namespace}, &sts); err != nil { r.Logger.Error(err, "Unable to fetch statefulset") return ctrl.Result{}, client.IgnoreNotFound(err) } recheck, err := r.updateStatefulSet(ctx, &zau, &sts) if err != nil { return ctrl.Result{}, err } metrics.PublishZauStatusMetrics(&zau) if recheck { return ctrl.Result{RequeueAfter: requeueInterval}, nil } return ctrl.Result{}, nil } func (r *ZoneAwareUpdateReconciler) updateStatefulSet(ctx context.Context, zau *opsv1.ZoneAwareUpdate, sts *apps.StatefulSet) (bool, error) { if !zau.Spec.DryRun && sts.Spec.UpdateStrategy.Type != apps.OnDeleteStatefulSetStrategyType { r.Logger.Info("Statefulset update strategy is not OnDelete") return false, nil } pods, err := r.getStatefulSetPods(ctx, sts) if err != nil { r.Logger.Error(err, "Unable to fetch statefulset pods") return false, err } var oldPods, oldNotReadyPods []*v1.Pod oldPodsCountMap := map[string]int32{} for i := range pods { pod := pods[i] // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if utils.IsTerminating(pod) { r.Logger.Info("There are pods getting terminated, skipping", "pod", pod.Name) return false, nil } podRev, ok := pod.Labels[apps.ControllerRevisionHashLabelKey] if !ok { r.Logger.Info("Pod revision not found", "pod", pod.Name) continue } // If we have updated Pod that has been created but are not running and ready we can not make progress. if podRev == sts.Status.UpdateRevision && !utils.IsRunningAndReady(pod) { r.Logger.Info("There are pods in the new revision that are not ready, skipping", "pod", pod.Name) return false, nil } if podRev != sts.Status.UpdateRevision { oldPods = append(oldPods, pod) if !utils.IsRunningAndReady(pod) { oldNotReadyPods = append(oldNotReadyPods, pod) } } } if len(oldPods) == 0 { r.Logger.Info("No pods to update") err := r.updateStatefulSetRevision(ctx, sts) if err != nil { return false, err } return false, r.updateZauStatus(ctx, zau, sts, int32(0), int32(0), oldPodsCountMap, false) } r.sortPods(oldPods) r.sortPods(oldNotReadyPods) zonePodsMap := r.PodZoneHelper.GetZonePodsMap(ctx, oldPods) zones := r.PodZoneHelper.GetSortedZonesFromMap(zonePodsMap) for zone := range zonePodsMap { podCount := len(zonePodsMap[zone]) oldPodsCountMap[zone] = int32(podCount) } firstZone := zones[0] if sts.Status.Replicas != sts.Status.ReadyReplicas || len(oldNotReadyPods) > 0 { notReadyMap := r.PodZoneHelper.GetZonePodsMap(ctx, oldNotReadyPods) // Do not progress if there are unhealthy pods in multiple zones. if len(notReadyMap) > 1 { r.Logger.Info("There are unhealthy replicas in multiple zones, skipping") return false, nil } // If unhealthy pod is in the first zone, update it first. if _, found := notReadyMap[firstZone]; found { zonePodsMap = notReadyMap } else { // Do not progress if the unhealthy pods are not in the first zone. r.Logger.Info("There are unhealthy replicas which are not in the first zone, skipping") return false, nil } } else { // Check PauseAlarm when all replicas are ready pause, err := r.pauseRollout(ctx, zau) if err != nil { return false, err } if pause { r.Logger.Info("PauseRolloutAlarm is in alarm", "alarm", zau.Spec.PauseRolloutAlarm) r.updateZauStatus(ctx, zau, sts, zau.Status.UpdateStep, zau.Status.DeletedReplicas, oldPodsCountMap, true) return true, nil } } r.Logger.Info("Proceeding with zone update", "zone", firstZone) return false, r.deletePods(ctx, zau, sts, zonePodsMap[firstZone], oldPodsCountMap) } func (r *ZoneAwareUpdateReconciler) getStatefulSetPods(ctx context.Context, sts *apps.StatefulSet) ([]*v1.Pod, error) { labelSelector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector) if err != nil { r.Logger.Error(err, "GetLabelSelector failed", "sts", sts.Name) return []*v1.Pod{}, err } listOptions := &client.ListOptions{Namespace: sts.Namespace, LabelSelector: labelSelector} podList := &v1.PodList{} if err := r.List(ctx, podList, listOptions); err != nil { r.Logger.Error(err, "Unable to list pods") return []*v1.Pod{}, err } pods := make([]*v1.Pod, 0, len(podList.Items)) for i := range podList.Items { pods = append(pods, &podList.Items[i]) } return pods, nil } // Sorting pods so updates are always done in a consistent order. // Pod N -> 0 func (r *ZoneAwareUpdateReconciler) sortPods(pods []*v1.Pod) { sort.Slice(pods, func(i, j int) bool { parts := strings.Split(pods[i].Name, "-") idI, err := strconv.Atoi(parts[len(parts)-1]) if err != nil { return false } parts = strings.Split(pods[j].Name, "-") idJ, err := strconv.Atoi(parts[len(parts)-1]) if err != nil { return false } return idI > idJ }) } func (r *ZoneAwareUpdateReconciler) updateStatefulSetRevision(ctx context.Context, sts *apps.StatefulSet) error { if sts.Status.CurrentRevision != sts.Status.UpdateRevision { r.Logger.Info("Updating statefulset current revision", "current", sts.Status.CurrentRevision, "update", sts.Status.UpdateRevision) sts.Status.CurrentRevision = sts.Status.UpdateRevision return r.Client.Status().Update(ctx, sts) } return nil } func (r *ZoneAwareUpdateReconciler) updateZauStatus(ctx context.Context, zau *opsv1.ZoneAwareUpdate, sts *apps.StatefulSet, step int32, deletedPods int32, oldPodsCountMap map[string]int32, pausedRollout bool) error { if reflect.DeepEqual(zau.Status.CurrentRevision, sts.Status.CurrentRevision) && reflect.DeepEqual(zau.Status.UpdateRevision, sts.Status.UpdateRevision) && reflect.DeepEqual(zau.Status.UpdateStep, step) && reflect.DeepEqual(zau.Status.DeletedReplicas, deletedPods) && reflect.DeepEqual(zau.Status.OldReplicas, oldPodsCountMap) && reflect.DeepEqual(zau.Status.PausedRollout, pausedRollout) { return nil } zau.Status.CurrentRevision = sts.Status.CurrentRevision zau.Status.UpdateRevision = sts.Status.UpdateRevision zau.Status.UpdateStep = step zau.Status.DeletedReplicas = deletedPods zau.Status.PausedRollout = pausedRollout if zau.Status.OldReplicas == nil || len(zau.Status.OldReplicas) == 0 { zau.Status.OldReplicas = oldPodsCountMap } else { for zone := range zau.Status.OldReplicas { if count, ok := oldPodsCountMap[zone]; ok { zau.Status.OldReplicas[zone] = count } else { // reset zones that were already updated zau.Status.OldReplicas[zone] = 0 } } // check if there is a new zone with old replicas for zone := range oldPodsCountMap { if _, ok := zau.Status.OldReplicas[zone]; !ok { zau.Status.OldReplicas[zone] = oldPodsCountMap[zone] } } } err := r.Client.Status().Update(ctx, zau) if err != nil { return err } r.Logger.Info("ZAU status updated", "spec", zau.Spec, "status", zau.Status) return nil } func (r *ZoneAwareUpdateReconciler) pauseRollout(ctx context.Context, zau *opsv1.ZoneAwareUpdate) (bool, error) { if zau.Spec.PauseRolloutAlarm == "" { return false, nil } if zau.Spec.IgnoreAlarm { r.Logger.Info("Ignoring PauseRolloutAlarm") return false, nil } state, err := r.AlarmStateProvider.AlarmState(ctx, zau.Spec.PauseRolloutAlarm) if err != nil { return true, err } if state == cwtypes.StateValueAlarm { return true, nil } return false, nil } func (r *ZoneAwareUpdateReconciler) deletePods(ctx context.Context, zau *opsv1.ZoneAwareUpdate, sts *apps.StatefulSet, pods []*v1.Pod, oldPodsCountMap map[string]int32) error { maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(zau.Spec.MaxUnavailable, int(sts.Status.Replicas), true) if err != nil { r.Logger.Error(err, "Failed to compute maxUnavailable") return err } updateStep := zau.Status.UpdateStep if updateStep > 0 && zau.Status.UpdateRevision != sts.Status.UpdateRevision { r.Logger.Info("New update revision found, reseting UpdateStep counter", "previousValue", updateStep) updateStep = 0 } maxToDelete, err := r.maxPodsToDelete(maxUnavailable, updateStep, zau.Spec.ExponentialFactor) if err != nil { r.Logger.Error(err, "Failed to compute the max number of pods to be deleted") return err } numPodsToDelete := integer.IntMin(len(pods), maxToDelete) for i := 0; i < numPodsToDelete; i++ { pod := pods[i] r.Logger.Info("Found a candidate pod to be deleted", "pod", pod.Name, "revision", pod.Labels[apps.ControllerRevisionHashLabelKey]) if zau.Spec.DryRun { r.Logger.Info("DryRun option enabled, ignoring deletion", "pod", pod.Name) } else { if err := r.Delete(ctx, pod); err != nil { r.Logger.Error(err, "Failed to delete pod") return err } } } return r.updateZauStatus(ctx, zau, sts, updateStep+1, int32(numPodsToDelete), oldPodsCountMap, false) } func (r *ZoneAwareUpdateReconciler) maxPodsToDelete(maxUnavailable int, updateStep int32, exponentialFactor string) (int, error) { factor, err := strconv.ParseFloat(exponentialFactor, 64) if err != nil { return 0, err } if factor == 0 { return maxUnavailable, nil } maxToDelete := math.Pow(factor, float64(updateStep)) if maxToDelete > math.MaxInt32 { maxToDelete = math.MaxInt32 } numPodsToDelete := integer.IntMin(int(maxToDelete), maxUnavailable) return numPodsToDelete, nil } // SetupWithManager sets up the controller with the Manager. func (r *ZoneAwareUpdateReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&opsv1.ZoneAwareUpdate{}). // watch for changes to pods Watches( &source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(r.findZauForPod), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). // watch for changes to statefulsets Watches( &source.Kind{Type: &apps.StatefulSet{}}, handler.EnqueueRequestsFromMapFunc(r.findZauForStatefulSet), ). Complete(r) } func (r *ZoneAwareUpdateReconciler) findZauForPod(obj client.Object) []reconcile.Request { pod, ok := obj.(*v1.Pod) if !ok { r.Logger.Info("Failed to covert object to pod", "obj", obj) return []reconcile.Request{} } sts, err := r.getPodStatefulSet(pod) if err != nil { r.Logger.Error(err, "Unable to fetch statefulset", "pod", pod.Name) return []reconcile.Request{} } if sts == nil { // No statefulset associated to the pod return []reconcile.Request{} } return r.findZauForStatefulSet(sts) } func (r *ZoneAwareUpdateReconciler) getPodStatefulSet(pod *v1.Pod) (*apps.StatefulSet, error) { controllerRef := metav1.GetControllerOf(pod) if controllerRef == nil { return nil, nil } if controllerRef.Kind != utils.ControllerKindSS.Kind { return nil, nil } var sts apps.StatefulSet if err := r.Get(context.TODO(), types.NamespacedName{Name: controllerRef.Name, Namespace: pod.GetNamespace()}, &sts); err != nil { return nil, client.IgnoreNotFound(err) } return &sts, nil } func (r *ZoneAwareUpdateReconciler) findZauForStatefulSet(obj client.Object) []reconcile.Request { sts, ok := obj.(*apps.StatefulSet) if !ok { r.Logger.Info("Failed to convert object to statefulset", "obj", obj) return []reconcile.Request{} } zau, err := getZauForStatefulSet(r.Client, r.Logger, sts) if err != nil { r.Logger.Error(err, "Error getting ZAU for statefulset", "sts", sts.Name) return []reconcile.Request{} } if zau == nil { return []reconcile.Request{} } request := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: zau.Name, Namespace: zau.Namespace, }, } return []reconcile.Request{request} } func getZauForStatefulSet(c client.Client, logger logr.Logger, sts *apps.StatefulSet) (*opsv1.ZoneAwareUpdate, error) { zauList := &opsv1.ZoneAwareUpdateList{} if err := c.List(context.TODO(), zauList, &client.ListOptions{Namespace: sts.GetNamespace()}); err != nil { return nil, err } var matchedZaus []opsv1.ZoneAwareUpdate for _, strategy := range zauList.Items { if strategy.Spec.StatefulSet == sts.GetName() { matchedZaus = append(matchedZaus, strategy) } } if len(matchedZaus) == 0 { return nil, nil } if len(matchedZaus) > 1 { logger.Info("StatefulSet matches multiple ZAUs. Choosing first matched ZAU.", "sset", sts.Name, "zau", matchedZaus[0].Name) } return &matchedZaus[0], nil }