// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may // not use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language governing // permissions and limitations under the License. package observability import ( "context" "fmt" "strings" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/rs/zerolog/log" corev1 "k8s.io/api/core/v1" kErr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" ) // Kubernetes event types, reasons and messages const ( Normal = corev1.EventTypeNormal Warning = corev1.EventTypeWarning MonitorErrReason = "MonitorError" MonitorErrMsgFmt = "There was a problem monitoring for events in monitor '%s'" UncordonErrReason = "UncordonError" UncordonErrMsgFmt = "There was a problem while trying to uncordon the node: %s" UncordonReason = "Uncordon" UncordonMsg = "Node successfully uncordoned" PreDrainErrReason = "PreDrainError" PreDrainErrMsgFmt = "There was a problem executing the pre-drain task: %s" PreDrainReason = "PreDrain" PreDrainMsg = "Pre-drain task successfully executed" CordonErrReason = "CordonError" CordonErrMsgFmt = "There was a problem while trying to cordon the node: %s" CordonReason = "Cordon" CordonMsg = "Node successfully cordoned" CordonAndDrainErrReason = "CordonAndDrainError" CordonAndDrainErrMsgFmt = "There was a problem while trying to cordon and drain the node: %s" CordonAndDrainReason = "CordonAndDrain" CordonAndDrainMsg = "Node successfully cordoned and drained" PostDrainErrReason = "PostDrainError" PostDrainErrMsgFmt = "There was a problem executing the post-drain task: %s" PostDrainReason = "PostDrain" PostDrainMsg = "Post-drain task successfully executed" ) // Interruption event reasons const ( scheduledEventReason = "ScheduledEvent" spotITNReason = "SpotInterruption" sqsTerminationReason = "SQSTermination" rebalanceRecommendationReason = "RebalanceRecommendation" stateChangeReason = "StateChange" asgLifecycleReason = "ASGLifecycle" unknownReason = "UnknownInterruption" ) // K8sEventRecorder wraps a Kubernetes event recorder with some extra information type K8sEventRecorder struct { annotations map[string]string clientSet *kubernetes.Clientset enabled bool sqsMode bool record.EventRecorder } // InitK8sEventRecorder creates a Kubernetes event recorder func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetadata ec2metadata.NodeMetadata, extraAnnotationsStr string) (K8sEventRecorder, error) { if !enabled { return K8sEventRecorder{}, nil } annotations := make(map[string]string) annotations["account-id"] = nodeMetadata.AccountId if !sqsMode { annotations["availability-zone"] = nodeMetadata.AvailabilityZone annotations["instance-id"] = nodeMetadata.InstanceID annotations["instance-life-cycle"] = nodeMetadata.InstanceLifeCycle annotations["instance-type"] = nodeMetadata.InstanceType annotations["local-hostname"] = nodeMetadata.LocalHostname annotations["local-ipv4"] = nodeMetadata.LocalIP annotations["public-hostname"] = nodeMetadata.PublicHostname annotations["public-ipv4"] = nodeMetadata.PublicIP annotations["region"] = nodeMetadata.Region } var err error if extraAnnotationsStr != "" { annotations, err = parseExtraAnnotations(annotations, extraAnnotationsStr) if err != nil { return K8sEventRecorder{}, err } } config, err := rest.InClusterConfig() if err != nil { return K8sEventRecorder{}, err } clientSet, err := kubernetes.NewForConfig(config) if err != nil { return K8sEventRecorder{}, err } broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) return K8sEventRecorder{ annotations: annotations, clientSet: clientSet, enabled: true, sqsMode: sqsMode, EventRecorder: broadcaster.NewRecorder( scheme.Scheme, corev1.EventSource{ Component: "aws-node-termination-handler", Host: nodeName, }, ), }, nil } // Emit a Kubernetes event for the given node and with the given event type, reason and message func (r K8sEventRecorder) Emit(nodeName string, eventType, eventReason, eventMsgFmt string, eventMsgArgs ...interface{}) { if r.enabled { var node *corev1.Node var annotations map[string]string if r.sqsMode { var err error node, err = r.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) if err != nil { if kErr.IsNotFound(err) { return } log.Err(err).Msg("Emitting Kubernetes event failed") return } annotations = generateNodeAnnotations(node, r.annotations) } else { node = &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Namespace: "default", }, } annotations = r.annotations } r.AnnotatedEventf(node, annotations, eventType, eventReason, eventMsgFmt, eventMsgArgs...) //nolint:all } } // getReasonForKindV1 returns a Kubernetes event reason for the given interruption event kind. // Compatible with log format version 1. func getReasonForKindV1(eventKind, monitorKind string) string { // In v1 all events received from SQS were given the same reason. if monitorKind == monitor.SQSTerminateKind { return sqsTerminationReason } // However, events received from IMDS could be more specific. switch eventKind { case monitor.ScheduledEventKind: return scheduledEventReason case monitor.SpotITNKind: return spotITNReason case monitor.RebalanceRecommendationKind: return rebalanceRecommendationReason default: return unknownReason } } // getReasonForKindV2 returns a Kubernetes event reason for the given interruption event kind. // Compatible with log format version 2. func getReasonForKindV2(eventKind, _ string) string { // v2 added reasons for more event kinds for both IMDS and SQS events. switch eventKind { case monitor.ScheduledEventKind: return scheduledEventReason case monitor.SpotITNKind: return spotITNReason case monitor.RebalanceRecommendationKind: return rebalanceRecommendationReason case monitor.StateChangeKind: return stateChangeReason case monitor.ASGLifecycleKind: return asgLifecycleReason default: return unknownReason } } var GetReasonForKind func(kind, monitor string) string = getReasonForKindV1 func SetReasonForKindVersion(version int) error { switch version { case 1: GetReasonForKind = getReasonForKindV1 return nil case 2: GetReasonForKind = getReasonForKindV2 return nil default: GetReasonForKind = getReasonForKindV1 return fmt.Errorf("Unrecognized 'reason for kind' version: %d, using version 1", version) } } // Parse the given extra annotations string into a map func parseExtraAnnotations(annotations map[string]string, extraAnnotationsStr string) (map[string]string, error) { parts := strings.Split(extraAnnotationsStr, ",") for _, part := range parts { keyValue := strings.Split(part, "=") if len(keyValue) != 2 { return nil, fmt.Errorf("error parsing annotations") } annotations[keyValue[0]] = keyValue[1] } return annotations, nil } // Generate annotations for an event occurred on the given node func generateNodeAnnotations(node *corev1.Node, annotations map[string]string) map[string]string { nodeAnnotations := make(map[string]string) for k, v := range annotations { nodeAnnotations[k] = v } nodeAnnotations["availability-zone"] = node.Labels["topology.kubernetes.io/zone"] nodeAnnotations["instance-id"] = node.Spec.ProviderID[strings.LastIndex(node.Spec.ProviderID, "/")+1:] nodeAnnotations["instance-type"] = node.Labels["node.kubernetes.io/instance-type"] nodeAnnotations["local-hostname"] = node.Name for _, address := range node.Status.Addresses { // If there's more than one address of the same type, use the first one switch address.Type { case corev1.NodeInternalIP: if _, exist := annotations["local-ipv4"]; !exist { nodeAnnotations["local-ipv4"] = address.Address } case corev1.NodeExternalDNS: if _, exist := annotations["public-hostname"]; !exist { nodeAnnotations["public-hostname"] = address.Address } case corev1.NodeExternalIP: if _, exist := annotations["public-ipv4"]; !exist { nodeAnnotations["public-ipv4"] = address.Address } } } nodeAnnotations["region"] = node.Labels["topology.kubernetes.io/region"] return nodeAnnotations }