package fluentd import ( "bytes" "context" "errors" "fmt" "html/template" "strings" "time" "github.com/aws/aws-sdk-go/aws" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/exec" ) const ( fluentdServiceAccountName = "fluentd-service-account" fluentdRBACRoleName = "fluentd-rbac-role" fluentdRBACClusterRoleBindingName = "fluentd-rbac-role-binding" fluentdConfigMapNameClusterInfo = "fluentd-configmap-cluster-info" fluentdConfigMapNameConfig = "fluentd-configmap-config" fluentdConfigMapFileNameFluentConf = "fluent.conf" fluentdConfigMapFileNameContainersConf = "containers.conf" fluentdConfigMapFileNameSystemdConf = "systemd.conf" fluentdConfigMapFileNameHostConf = "host.conf" fluentdAppName = "fluentd-cloudwatch" fluentdDaemonSetName = "fluentd-cloudwatch" ) // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) createFluentdServiceAccount() error { ts.cfg.Logger.Info("creating fluentd ServiceAccount") ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ServiceAccounts(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Create( ctx, &v1.ServiceAccount{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ServiceAccount", }, ObjectMeta: metav1.ObjectMeta{ Name: fluentdServiceAccountName, Namespace: ts.cfg.EKSConfig.AddOnFluentd.Namespace, Labels: map[string]string{ "app.kubernetes.io/name": fluentdAppName, }, }, }, metav1.CreateOptions{}, ) cancel() if err != nil { return fmt.Errorf("failed to create fluentd ServiceAccount (%v)", err) } ts.cfg.Logger.Info("created fluentd ServiceAccount") ts.cfg.EKSConfig.Sync() return nil } // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) deleteFluentdServiceAccount() error { ts.cfg.Logger.Info("deleting fluentd ServiceAccount") foreground := metav1.DeletePropagationForeground ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err := ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ServiceAccounts(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Delete( ctx, fluentdServiceAccountName, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil && !apierrs.IsNotFound(err) && !strings.Contains(err.Error(), "not found") { ts.cfg.Logger.Warn("failed to delete", zap.Error(err)) return fmt.Errorf("failed to delete fluentd ServiceAccount (%v)", err) } ts.cfg.Logger.Info("deleted fluentd ServiceAccount", zap.Error(err)) ts.cfg.EKSConfig.Sync() return nil } // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) createFluentdRBACClusterRole() error { ts.cfg.Logger.Info("creating fluentd RBAC ClusterRole") ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.K8SClient.KubernetesClientSet(). RbacV1(). ClusterRoles(). Create( ctx, &rbacv1.ClusterRole{ TypeMeta: metav1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "ClusterRole", }, // "ClusterRole" is a non-namespaced resource. // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole ObjectMeta: metav1.ObjectMeta{ Name: fluentdRBACRoleName, Namespace: "default", Labels: map[string]string{ "app.kubernetes.io/name": fluentdAppName, }, }, Rules: []rbacv1.PolicyRule{ { // "" indicates the core API group // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole APIGroups: []string{ "", }, Resources: []string{ "namespaces", "pods", "pods/logs", }, Verbs: []string{ "get", "list", "watch", }, }, }, }, metav1.CreateOptions{}, ) cancel() if err != nil { return fmt.Errorf("failed to create fluentd RBAC ClusterRole (%v)", err) } ts.cfg.Logger.Info("created fluentd RBAC ClusterRole") ts.cfg.EKSConfig.Sync() return nil } // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) deleteFluentdRBACClusterRole() error { ts.cfg.Logger.Info("deleting fluentd RBAC ClusterRole") foreground := metav1.DeletePropagationForeground ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err := ts.cfg.K8SClient.KubernetesClientSet(). RbacV1(). ClusterRoles(). Delete( ctx, fluentdRBACRoleName, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil && !apierrs.IsNotFound(err) && !strings.Contains(err.Error(), "not found") { ts.cfg.Logger.Warn("failed to delete", zap.Error(err)) return fmt.Errorf("failed to delete fluentd RBAC ClusterRole (%v)", err) } ts.cfg.Logger.Info("deleted fluentd RBAC ClusterRole", zap.Error(err)) ts.cfg.EKSConfig.Sync() return nil } // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) createFluentdRBACClusterRoleBinding() error { ts.cfg.Logger.Info("creating fluentd RBAC ClusterRoleBinding") ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.K8SClient.KubernetesClientSet(). RbacV1(). ClusterRoleBindings(). Create( ctx, &rbacv1.ClusterRoleBinding{ TypeMeta: metav1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "ClusterRoleBinding", }, ObjectMeta: metav1.ObjectMeta{ Name: fluentdRBACClusterRoleBindingName, Namespace: "default", Labels: map[string]string{ "app.kubernetes.io/name": fluentdAppName, }, }, RoleRef: rbacv1.RoleRef{ APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: fluentdRBACRoleName, }, Subjects: []rbacv1.Subject{ { APIGroup: "", Kind: "ServiceAccount", Name: fluentdServiceAccountName, Namespace: ts.cfg.EKSConfig.AddOnFluentd.Namespace, }, }, }, metav1.CreateOptions{}, ) cancel() if err != nil { return fmt.Errorf("failed to create fluentd RBAC ClusterRoleBinding (%v)", err) } ts.cfg.Logger.Info("created fluentd RBAC ClusterRoleBinding") ts.cfg.EKSConfig.Sync() return nil } // ref. https://github.com/kubernetes/client-go/tree/master/examples/in-cluster-client-configuration // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/ func (ts *tester) deleteFluentdRBACClusterRoleBinding() error { ts.cfg.Logger.Info("deleting fluentd RBAC ClusterRoleBinding") foreground := metav1.DeletePropagationForeground ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err := ts.cfg.K8SClient.KubernetesClientSet(). RbacV1(). ClusterRoleBindings(). Delete( ctx, fluentdRBACClusterRoleBindingName, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil && !apierrs.IsNotFound(err) && !strings.Contains(err.Error(), "not found") { ts.cfg.Logger.Warn("failed to delete", zap.Error(err)) return fmt.Errorf("failed to delete fluentd RBAC ClusterRoleBinding (%v)", err) } ts.cfg.Logger.Info("deleted fluentd RBAC ClusterRoleBinding", zap.Error(err)) ts.cfg.EKSConfig.Sync() return nil } func (ts *tester) createFluentdConfigMapClusterInfo() (err error) { ts.cfg.Logger.Info("creating fluentd ConfigMap cluster-info") ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err = ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ConfigMaps(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Create( ctx, &v1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: fluentdConfigMapNameClusterInfo, Namespace: ts.cfg.EKSConfig.AddOnFluentd.Namespace, Labels: map[string]string{ "name": fluentdConfigMapNameClusterInfo, }, }, Data: map[string]string{ "cluster.name": ts.cfg.EKSConfig.Name, "logs.region": ts.cfg.EKSConfig.Region, }, }, metav1.CreateOptions{}, ) cancel() if err != nil { return err } ts.cfg.Logger.Info("created fluentd ConfigMap cluster-info") ts.cfg.EKSConfig.Sync() return nil } func (ts *tester) deleteFluentdConfigMapClusterInfo() error { ts.cfg.Logger.Info("deleting fluentd ConfigMap cluster-info") foreground := metav1.DeletePropagationForeground ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err := ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ConfigMaps(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Delete( ctx, fluentdConfigMapNameClusterInfo, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil { return err } ts.cfg.Logger.Info("deleted fluentd ConfigMap cluster-info") ts.cfg.EKSConfig.Sync() return nil } const TemplateFluentdConf = ` @include containers.conf @include systemd.conf @include host.conf @type kubernetes_metadata @id filter_kube_metadata_application @log_level "{{.MetadataLogLevel}}" skip_labels {{.MetadataSkipLabels}} skip_container_metadata {{.MetadataSkipContainerMetadata}} skip_master_url {{.MetadataSkipMasterURL}} skip_namespace_metadata {{.MetadataSkipNamespaceMetadata}} cache_size {{.MetadataCacheSize}} watch {{.MetadataWatch}} de_dot false @type null num_threads {{.Threads}} ` type templateFluentdConf struct { Threads int MetadataLogLevel string MetadataCacheSize int MetadataWatch bool MetadataSkipLabels bool MetadataSkipMasterURL bool MetadataSkipContainerMetadata bool MetadataSkipNamespaceMetadata bool } const TemplateContainersConf = ` @type tail @id in_tail_container_logs @label @containers path /var/log/containers/*.log exclude_path ["/var/log/containers/cloudwatch-agent*", "/var/log/containers/fluentd*"] pos_file /var/log/fluentd-containers.log.pos tag * read_from_head true @type json time_format %Y-%m-%dT%H:%M:%S.%NZ @type tail @id in_tail_cwagent_logs @label @cwagentlogs path /var/log/containers/cloudwatch-agent* pos_file /var/log/cloudwatch-agent.log.pos tag * read_from_head true @type json time_format %Y-%m-%dT%H:%M:%S.%NZ @type tail @id in_tail_fluentd_logs @label @fluentdlogs path /var/log/containers/fluentd* pos_file /var/log/fluentd.log.pos tag * read_from_head true @type json time_format %Y-%m-%dT%H:%M:%S.%NZ ` const TemplateSystemdConf = ` @type systemd @id in_systemd_kubelet @label @systemd filters [{ "_SYSTEMD_UNIT": "kubelet.service" }] field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"} field_map_strict true path /run/log/journal @type local persistent true path /var/log/fluentd-journald-kubelet-pos.json read_from_head true tag kubelet.service @type systemd @id in_systemd_kubeproxy @label @systemd filters [{ "_SYSTEMD_UNIT": "kubeproxy.service" }] field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"} field_map_strict true path /run/log/journal @type local persistent true path /var/log/fluentd-journald-kubeproxy-pos.json read_from_head true tag kubeproxy.service @type systemd @id in_systemd_docker @label @systemd filters [{ "_SYSTEMD_UNIT": "docker.service" }] field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"} field_map_strict true @type local persistent true path /var/log/fluentd-journald-docker-pos.json read_from_head true tag docker.service ` const TemplateHostConf = ` @type tail @id in_tail_dmesg @label @hostlogs path /var/log/dmesg pos_file /var/log/dmesg.log.pos tag host.dmesg read_from_head true @type syslog @type tail @id in_tail_secure @label @hostlogs path /var/log/secure pos_file /var/log/secure.log.pos tag host.secure read_from_head true @type syslog @type tail @id in_tail_messages @label @hostlogs path /var/log/messages pos_file /var/log/messages.log.pos tag host.messages read_from_head true @type syslog ` func (ts *tester) createFluentdConfigMapConfig() (err error) { ts.cfg.Logger.Info("creating fluentd ConfigMap config") buf := bytes.NewBuffer(nil) fdConf := templateFluentdConf{ Threads: ts.cfg.EKSConfig.AddOnFluentd.Threads, MetadataLogLevel: ts.cfg.EKSConfig.AddOnFluentd.MetadataLogLevel, MetadataCacheSize: ts.cfg.EKSConfig.AddOnFluentd.MetadataCacheSize, MetadataWatch: ts.cfg.EKSConfig.AddOnFluentd.MetadataWatch, MetadataSkipLabels: ts.cfg.EKSConfig.AddOnFluentd.MetadataSkipLabels, MetadataSkipMasterURL: ts.cfg.EKSConfig.AddOnFluentd.MetadataSkipMasterURL, MetadataSkipContainerMetadata: ts.cfg.EKSConfig.AddOnFluentd.MetadataSkipContainerMetadata, MetadataSkipNamespaceMetadata: ts.cfg.EKSConfig.AddOnFluentd.MetadataSkipNamespaceMetadata, } fdConfTmpl := template.Must(template.New("TemplateFluentdConf").Parse(TemplateFluentdConf)) if err := fdConfTmpl.Execute(buf, fdConf); err != nil { return err } fdConfBody := buf.String() buf.Reset() ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err = ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ConfigMaps(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Create( ctx, &v1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: fluentdConfigMapNameConfig, Namespace: ts.cfg.EKSConfig.AddOnFluentd.Namespace, Labels: map[string]string{ "name": fluentdConfigMapNameConfig, }, }, Data: map[string]string{ fluentdConfigMapFileNameFluentConf: fdConfBody, fluentdConfigMapFileNameContainersConf: TemplateContainersConf, fluentdConfigMapFileNameSystemdConf: TemplateSystemdConf, fluentdConfigMapFileNameHostConf: TemplateHostConf, }, }, metav1.CreateOptions{}, ) cancel() if err != nil { return err } ts.cfg.Logger.Info("created fluentd ConfigMap config") ts.cfg.EKSConfig.Sync() return nil } func (ts *tester) deleteFluentdConfigMapConfig() error { ts.cfg.Logger.Info("deleting fluentd ConfigMap config") foreground := metav1.DeletePropagationForeground ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err := ts.cfg.K8SClient.KubernetesClientSet(). CoreV1(). ConfigMaps(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Delete( ctx, fluentdConfigMapNameConfig, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil { return err } ts.cfg.Logger.Info("deleted fluentd ConfigMap config") ts.cfg.EKSConfig.Sync() return nil } // FluentdImageName is the image name of Fluentd daemon set. // ref. https://github.com/fluent/fluentd-kubernetes-daemonset const FluentdImageName = "fluent/fluentd-kubernetes-daemonset:v1.7.3-debian-cloudwatch-1.0" func (ts *tester) createFluentdDaemonSet() (err error) { dirOrCreate := v1.HostPathDirectoryOrCreate podSpec := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "app.kubernetes.io/name": fluentdAppName, }, }, Spec: v1.PodSpec{ ServiceAccountName: fluentdServiceAccountName, TerminationGracePeriodSeconds: aws.Int64(30), // Unsupported value: "OnFailure": supported values: "Always" RestartPolicy: v1.RestartPolicyAlways, // image's entrypoint requires to write on /fluentd/etc // but we mount configmap there as read-only // this init container copy workaround is required // https://github.com/fluent/fluentd-kubernetes-daemonset/issues/90 InitContainers: []v1.Container{ { Name: "copy-fluentd-config", Image: ts.busyboxImg, Command: []string{ "sh", "-c", "cp /config-volume/..data/* /fluentd/etc", }, // ref. https://kubernetes.io/docs/concepts/cluster-administration/logging/ VolumeMounts: []v1.VolumeMount{ { // to execute Name: fluentdConfigMapNameConfig, MountPath: "/config-volume", }, { Name: "fluentdconf", MountPath: "/fluentd/etc", }, }, }, // TODO: do we need this? // ref. https://github.com/aws-samples/amazon-cloudwatch-container-insights/tree/master/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/quickstart // { // Name: "update-log-driver", // Image: ts.busyboxImg, // Command: []string{"sh", "-c", "''"}, // }, }, // https://www.eksworkshop.com/intermediate/230_logging/deploy/ Containers: []v1.Container{ { Name: fluentdAppName, Image: FluentdImageName, ImagePullPolicy: v1.PullAlways, Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ v1.ResourceMemory: resource.MustParse("400Mi"), }, Requests: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("100m"), v1.ResourceMemory: resource.MustParse("200Mi"), }, }, Env: []v1.EnvVar{ { Name: "REGION", ValueFrom: &v1.EnvVarSource{ ConfigMapKeyRef: &v1.ConfigMapKeySelector{ LocalObjectReference: v1.LocalObjectReference{ Name: fluentdConfigMapNameClusterInfo, }, Key: "logs.region", }, }, }, { Name: "CLUSTER_NAME", ValueFrom: &v1.EnvVarSource{ ConfigMapKeyRef: &v1.ConfigMapKeySelector{ LocalObjectReference: v1.LocalObjectReference{ Name: fluentdConfigMapNameClusterInfo, }, Key: "cluster.name", }, }, }, { Name: "CI_VERSION", Value: "k8s/1.1.1", }, }, // ref. https://kubernetes.io/docs/concepts/cluster-administration/logging/ VolumeMounts: []v1.VolumeMount{ { Name: fluentdConfigMapNameConfig, MountPath: "/config-volume", }, { Name: "fluentdconf", MountPath: "/fluentd/etc", }, { Name: "varlog", MountPath: "/var/log", }, { Name: "varlibdockercontainers", MountPath: "/var/lib/docker/containers", ReadOnly: true, }, { Name: "runlogjournal", MountPath: "/run/log/journal", ReadOnly: true, }, { Name: "dmesg", MountPath: "/var/log/dmesg", ReadOnly: true, }, }, }, }, // ref. https://kubernetes.io/docs/concepts/cluster-administration/logging/ Volumes: []v1.Volume{ { Name: fluentdConfigMapNameConfig, VolumeSource: v1.VolumeSource{ ConfigMap: &v1.ConfigMapVolumeSource{ LocalObjectReference: v1.LocalObjectReference{ Name: fluentdConfigMapNameConfig, }, DefaultMode: aws.Int32(0666), }, }, }, { Name: "fluentdconf", VolumeSource: v1.VolumeSource{ EmptyDir: &v1.EmptyDirVolumeSource{}, }, }, { // to write Name: "varlog", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/var/log", Type: &dirOrCreate, }, }, }, { Name: "varlibdockercontainers", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/var/lib/docker/containers", Type: &dirOrCreate, }, }, }, { Name: "runlogjournal", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/run/log/journal", Type: &dirOrCreate, }, }, }, { Name: "dmesg", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/var/log/dmesg", }, }, }, }, NodeSelector: map[string]string{ // do not deploy in fake nodes, obviously "NodeType": "regular", }, }, } dsObj := appsv1.DaemonSet{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", Kind: "DaemonSet", }, ObjectMeta: metav1.ObjectMeta{ Name: fluentdDaemonSetName, Namespace: ts.cfg.EKSConfig.AddOnFluentd.Namespace, }, Spec: appsv1.DaemonSetSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app.kubernetes.io/name": fluentdAppName, }, }, Template: podSpec, }, } ts.cfg.Logger.Info("creating fluentd DaemonSet", zap.String("name", fluentdDaemonSetName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err = ts.cfg.K8SClient.KubernetesClientSet(). AppsV1(). DaemonSets(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Create(ctx, &dsObj, metav1.CreateOptions{}) cancel() if err != nil { return fmt.Errorf("failed to create fluentd DaemonSet (%v)", err) } ts.cfg.Logger.Info("created fluentd DaemonSet") return nil } func (ts *tester) deleteFluentdDaemonSet() (err error) { foreground := metav1.DeletePropagationForeground ts.cfg.Logger.Info("deleting fluentd DaemonSet", zap.String("name", fluentdDaemonSetName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) err = ts.cfg. K8SClient.KubernetesClientSet(). AppsV1(). DaemonSets(ts.cfg.EKSConfig.AddOnFluentd.Namespace). Delete( ctx, fluentdDaemonSetName, metav1.DeleteOptions{ GracePeriodSeconds: aws.Int64(0), PropagationPolicy: &foreground, }, ) cancel() if err != nil && !apierrs.IsNotFound(err) && !strings.Contains(err.Error(), "not found") { ts.cfg.Logger.Warn("failed to delete fluentd DaemonSet", zap.Error(err)) return fmt.Errorf("failed to delete fluentd DaemonSet (%v)", err) } return nil } func (ts *tester) checkFluentdPods() (err error) { waitDur := 10 * time.Minute retryStart := time.Now() for time.Since(retryStart) < waitDur { select { case <-ts.cfg.Stopc: return errors.New("check aborted") case <-time.After(15 * time.Second): } if err = ts._checkFluentdPods(); err == nil { break } ts.cfg.Logger.Info("failed to check fluentd pods; retrying", zap.Error(err)) } return err } func (ts *tester) _checkFluentdPods() error { pods, err := ts.cfg.K8SClient.ListPods(ts.cfg.EKSConfig.AddOnFluentd.Namespace, 1000, 5*time.Second) if err != nil { ts.cfg.Logger.Warn("listing pods failed", zap.Error(err)) return err } if len(pods) > 0 { ts.cfg.Logger.Info("pods found", zap.Int("pods", len(pods))) fmt.Fprintf(ts.cfg.LogWriter, "\n") for _, pod := range pods { fmt.Fprintf(ts.cfg.LogWriter, "%q Pod using client-go: %q\n", ts.cfg.EKSConfig.AddOnFluentd.Namespace, pod.Name) } fmt.Fprintf(ts.cfg.LogWriter, "\n") } else { ts.cfg.Logger.Info("no pod found", zap.String("namespace", ts.cfg.EKSConfig.AddOnFluentd.Namespace)) return errors.New("no pod found in " + ts.cfg.EKSConfig.AddOnFluentd.Namespace) } targetPods := int32(1) if ts.cfg.EKSConfig.TotalNodes > 1 { targetPods = ts.cfg.EKSConfig.TotalNodes / int32(2) } ts.cfg.Logger.Info("checking fluentd pods", zap.Int32("target-ready-pods", targetPods), zap.Int32("total-nodes", ts.cfg.EKSConfig.TotalNodes), ) readyPods := int32(0) for _, pod := range pods { appName, ok := pod.Labels["app.kubernetes.io/name"] if !ok || appName != fluentdAppName { ts.cfg.Logger.Info("skipping pod, not fluentd", zap.String("labels", fmt.Sprintf("%+v", pod.Labels))) continue } descArgsPods := []string{ ts.cfg.EKSConfig.KubectlPath, "--kubeconfig=" + ts.cfg.EKSConfig.KubeConfigPath, "--namespace=" + ts.cfg.EKSConfig.AddOnFluentd.Namespace, "describe", "pods/" + pod.Name, } descCmdPods := strings.Join(descArgsPods, " ") logArgs := []string{ ts.cfg.EKSConfig.KubectlPath, "--kubeconfig=" + ts.cfg.EKSConfig.KubeConfigPath, "--namespace=" + ts.cfg.EKSConfig.AddOnFluentd.Namespace, "logs", "pods/" + pod.Name, "--all-containers=true", "--timestamps", } logsCmd := strings.Join(logArgs, " ") ts.cfg.Logger.Debug("checking Pod", zap.String("pod-name", pod.Name), zap.String("app-name", appName), zap.String("command-describe", descCmdPods), zap.String("command-logs", logsCmd), ) ready := false statusType, status := "", "" for _, cond := range pod.Status.Conditions { if cond.Status != v1.ConditionTrue { continue } statusType = fmt.Sprintf("%s", cond.Type) status = fmt.Sprintf("%s", cond.Status) if cond.Type == v1.PodInitialized || cond.Type == v1.PodReady { ready = true readyPods++ } break } if !ready { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) output, err := exec.New().CommandContext(ctx, descArgsPods[0], descArgsPods[1:]...).CombinedOutput() cancel() outDesc := string(output) if err != nil { ts.cfg.Logger.Warn("'kubectl describe' failed", zap.Error(err)) } fmt.Fprintf(ts.cfg.LogWriter, "\n'%s' output:\n\n%s\n\n", descCmdPods, outDesc) ts.cfg.Logger.Warn("pod is not ready yet", zap.Int32("current-ready-pods", readyPods), zap.Int32("target-ready-pods", targetPods), zap.Int32("total-nodes", ts.cfg.EKSConfig.TotalNodes), zap.String("pod-name", pod.Name), zap.String("app-name", appName), zap.String("status-type", statusType), zap.String("status", status), ) continue } if readyPods < 3 { // only first 3 nodes ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) output, err := exec.New().CommandContext(ctx, descArgsPods[0], descArgsPods[1:]...).CombinedOutput() cancel() outDesc := string(output) if err != nil { ts.cfg.Logger.Warn("'kubectl describe' failed", zap.Error(err)) continue } ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second) output, err = exec.New().CommandContext(ctx, logArgs[0], logArgs[1:]...).CombinedOutput() cancel() outLogs := string(output) if err != nil { ts.cfg.Logger.Warn("'kubectl logs' failed", zap.Error(err)) continue } fmt.Fprintf(ts.cfg.LogWriter, "\n'%s' output:\n\n%s\n\n", descCmdPods, outDesc) logLines := strings.Split(outLogs, "\n") logLinesN := len(logLines) if logLinesN > 15 { logLines = logLines[logLinesN-15:] outLogs = strings.Join(logLines, "\n") } fmt.Fprintf(ts.cfg.LogWriter, "\n'%s' output:\n\n%s\n\n", logsCmd, outLogs) } if readyPods%100 == 0 { ts.cfg.Logger.Info("found a ready pod", zap.Int32("current-ready-pods", readyPods), zap.Int32("target-ready-pods", targetPods), zap.Int32("total-nodes", ts.cfg.EKSConfig.TotalNodes), zap.String("pod-name", pod.Name), zap.String("app-name", appName), zap.String("status-type", statusType), zap.String("status", status), ) } } ts.cfg.Logger.Info("checking fluentd pods", zap.Int32("current-ready-pods", readyPods), zap.Int32("target-ready-pods", targetPods), zap.Int32("total-nodes", ts.cfg.EKSConfig.TotalNodes), ) if readyPods < targetPods { return errors.New("not enough fluentd pods ready") } return nil }