package fluent_bit import ( "context" "fmt" "strings" "time" "github.com/aws/aws-k8s-tester/client" "github.com/aws/aws-sdk-go/aws" "github.com/onsi/ginkgo" "go.uber.org/zap" apps_v1 "k8s.io/api/apps/v1" core_v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" rbac_v1 "k8s.io/api/rbac/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/exec" ) const ( appName = "fluent-bit" appServiceAccountName = "fluentbit-service-account" appRBACRoleName = "fluentbit-rbac-role" appRBACRoleBindingName = "fluentbit-rbac-role-binding" appConfigMapNameClusterInfo = "fluentbit-configmap-cluster-info" appConfigMapNameConfig = "fluentbit-configmap-config" appConfigMapFileNameFluentConf = "fluent-bit.conf" appDaemonSetName = "fluentbit-cloudwatch" appContainerImage = "fluent/fluent-bit:1.5" appHTTPClient = "127.0.0.1" containerHTTPClient = "http-client" loggingPod = "fake-logger-pod" ) var dirOrCreate = v1.HostPathDirectoryOrCreate func (ts *tester) createServiceAccount() error { ts.cfg.Logger.Info("creating: ", zap.String("ServiceAccount", appName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.Client.KubernetesClient(). CoreV1().ServiceAccounts(ts.cfg.Namespace).Create( ctx, &v1.ServiceAccount{ TypeMeta: meta_v1.TypeMeta{ APIVersion: "v1", Kind: "ServiceAccount", }, ObjectMeta: meta_v1.ObjectMeta{ Name: appServiceAccountName, Namespace: ts.cfg.Namespace, Labels: map[string]string{ "app.kubernetes.io/name": appName, }, }, }, meta_v1.CreateOptions{}, ) cancel() if err != nil { if k8s_errors.IsAlreadyExists(err) { ts.cfg.Logger.Info("resource already exists", zap.String("ServiceAccount", appName)) return nil } return fmt.Errorf("failed to create %s: %s (%v)", "ServiceAccount", appName, err) } ts.cfg.Logger.Info("Create resource", zap.String("ServiceAccount", appName)) return nil } func (ts *tester) createRBACRole() error { ts.cfg.Logger.Info("creating: ", zap.String("Role", appName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.Client.KubernetesClient(). RbacV1(). Roles(ts.cfg.Namespace). Create( ctx, &rbac_v1.Role{ TypeMeta: meta_v1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "Role", }, // "Role" is a non-namespaced resource. // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole ObjectMeta: meta_v1.ObjectMeta{ Name: appRBACRoleName, Namespace: ts.cfg.Namespace, Labels: map[string]string{ "app.kubernetes.io/name": appName, }, }, Rules: []rbac_v1.PolicyRule{ { // "" indicates the core API group // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole APIGroups: []string{ "", }, Resources: []string{ "pods", "pods/logs", }, Verbs: []string{ "get", "list", "watch", }, }, }, }, meta_v1.CreateOptions{}, ) cancel() if err != nil { if k8s_errors.IsAlreadyExists(err) { ts.cfg.Logger.Info("resource already exists", zap.String("Role", appName)) return nil } return fmt.Errorf("failed to create %s: %s (%v)", "Role", appName, err) } ts.cfg.Logger.Info("Create resource", zap.String("Role", appName)) return nil } func (ts *tester) createRBACRoleBinding() error { ts.cfg.Logger.Info("creating: ", zap.String("RoleBinding", appName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.Client.KubernetesClient(). RbacV1(). RoleBindings(ts.cfg.Namespace). Create( ctx, &rbac_v1.RoleBinding{ TypeMeta: meta_v1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "RoleBindings", }, ObjectMeta: meta_v1.ObjectMeta{ Name: appRBACRoleBindingName, Namespace: ts.cfg.Namespace, Labels: map[string]string{ "app.kubernetes.io/name": appName, }, }, RoleRef: rbac_v1.RoleRef{ APIGroup: "rbac.authorization.k8s.io", Kind: "Role", Name: appRBACRoleName, }, Subjects: []rbac_v1.Subject{ { APIGroup: "", Kind: "ServiceAccount", Name: appServiceAccountName, Namespace: ts.cfg.Namespace, }, }, }, meta_v1.CreateOptions{}, ) cancel() if err != nil { if k8s_errors.IsAlreadyExists(err) { ts.cfg.Logger.Info("resource already exists", zap.String("RoleBinding", appName)) return nil } return fmt.Errorf("failed to create %s: %s (%v)", "RoleBinding", appName, err) } ts.cfg.Logger.Info("Create resource", zap.String("RoleBinding", appName)) return nil } func (ts *tester) createRBACClusterRole() error { ts.cfg.Logger.Info("creating: ", zap.String("ClusterRole", appName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.Client.KubernetesClient(). RbacV1(). ClusterRoles(). Create( ctx, &rbac_v1.ClusterRole{ TypeMeta: meta_v1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "ClusterRole", }, // "ClusterRole" is a non-namespaced resource. // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole ObjectMeta: meta_v1.ObjectMeta{ Name: appRBACRoleName, Namespace: "default", Labels: map[string]string{ "app.kubernetes.io/name": appName, }, }, Rules: []rbac_v1.PolicyRule{ { // "" indicates the core API group // ref. https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole APIGroups: []string{ "", }, Resources: []string{ "namespaces", "pods", "pods/logs", }, Verbs: []string{ "get", "list", "watch", }, }, }, }, meta_v1.CreateOptions{}, ) cancel() if err != nil { if k8s_errors.IsAlreadyExists(err) { ts.cfg.Logger.Info("resource already exists", zap.String("ClusterRole", appName)) return nil } return fmt.Errorf("failed to create %s: %s (%v)", "ClusterRole", appName, err) } ts.cfg.Logger.Info("Create resource", zap.String("ClusterRole", appName)) return nil } func (ts *tester) createRBACClusterRoleBinding() error { ts.cfg.Logger.Info("creating: ", zap.String("ClusterRoleBinding", appName)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) _, err := ts.cfg.Client.KubernetesClient(). RbacV1(). ClusterRoleBindings(). Create( ctx, &rbac_v1.ClusterRoleBinding{ TypeMeta: meta_v1.TypeMeta{ APIVersion: "rbac.authorization.k8s.io/v1", Kind: "ClusterRoleBinding", }, ObjectMeta: meta_v1.ObjectMeta{ Name: appRBACRoleBindingName, Namespace: "default", Labels: map[string]string{ "app.kubernetes.io/name": appName, }, }, RoleRef: rbac_v1.RoleRef{ APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: appRBACRoleName, }, Subjects: []rbac_v1.Subject{ { APIGroup: "", Kind: "ServiceAccount", Name: appServiceAccountName, Namespace: ts.cfg.Namespace, }, }, }, meta_v1.CreateOptions{}, ) cancel() if err != nil { if k8s_errors.IsAlreadyExists(err) { ts.cfg.Logger.Info("resource already exists", zap.String("ClusterRoleBinding", appName)) return nil } return fmt.Errorf("failed to create %s: %s (%v)", "ClusterRoleBinding", appName, err) } ts.cfg.Logger.Info("Create resource", zap.String("ClusterRoleBinding", appName)) return nil } const FluentBitConf = ` [SERVICE] Flush 1 Log_Level info Daemon off Parsers_File parsers.conf HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port 2020 @INCLUDE input-kubernetes.conf @INCLUDE output.conf ` const InputConf = ` [INPUT] Name tail Path /var/log/suite/* Refresh_Interval 5 ` const OutputConf = ` [OUTPUT] Name stdout Match * ` const ParsersConf = ` [PARSER] Name nginx Format regex Regex ^(?[^ ]*) (?[^ ]*) (?[^ ]*) \[(?