package fishapp import ( "context" "fmt" "io/ioutil" "net/http" "net/url" "os" "os/exec" "sync" "time" "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/references" "k8s.io/apimachinery/pkg/util/sets" appmesh "github.com/aws/aws-app-mesh-controller-for-k8s/apis/appmesh/v1beta2" "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/algorithm" "github.com/aws/aws-app-mesh-controller-for-k8s/test/framework" "github.com/aws/aws-app-mesh-controller-for-k8s/test/framework/k8s" "github.com/aws/aws-app-mesh-controller-for-k8s/test/framework/manifest" "github.com/aws/aws-app-mesh-controller-for-k8s/test/framework/utils" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/servicediscovery" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" "go.uber.org/zap" "gonum.org/v1/gonum/stat" "gonum.org/v1/gonum/stat/distuv" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( connectivityCheckRate = time.Second / 100 connectivityCheckProxyPort = 8899 connectivityCheckUniformDistributionSL = 0.001 // Significance level that traffic to targets are uniform distributed. AppContainerPort = 9080 HttpProxyContainerPort = 8899 defaultAppImage = "public.ecr.aws/e6v3k1j4/colorteller:v1" defaultHTTPProxyImage = "abhinavsingh/proxy.py:latest" caCertScript = "certs/ca_certs.sh" nodeCertScript = "certs/node_certs.sh" genericNodeCertCfgFile = "certs/node_cert.cfg" certsBasePath = "certs/" certsCfgFileSuffix = "_cert.cfg" certChainSuffix = "_cert_chain.pem" certKeySuffix = "_key.pem" caCertFile = "ca_cert.pem" envoyCACertPath = "/certs/ca_cert.pem" certCleanupScript = "certs/cleanup.sh" sdsDeployScript = "certs/sds_provider.sh" registerAgentIdentity = "certs/register_agent_entry.sh" registerWorkloadIdentity = "certs/register_workload_entry.sh" ) var ( mTLSe2eValidationContext = "spiffe://mtls-e2e.aws" ) // A dynamic generated stack designed to test app mesh integration :D // Suppose given configuration below: // // 5 VirtualServicesCount // 10 VirtualNodesCount // 2 RoutesCountPerVirtualRouter // 2 TargetsCountPerRoute // 4 BackendsCountPerVirtualNode // // We will generate virtual service configuration & virtual node configuration follows: // =======virtual services ========= // // vs1 -> /path1 -> vn1(50) // -> vn2(50) // -> /path2 -> vn3(50) // -> vn4(50) // vs2 -> /path1 -> vn5(50) // -> vn6(50) // -> /path2 -> vn7(50) // -> vn8(50) // vs3 -> /path1 -> vn9(50) // -> vn10(50) // -> /path2 -> vn1(50) // -> vn2(50) // vs4 -> /path1 -> vn3(50) // -> vn4(50) // -> /path2 -> vn5(50) // -> vn6(50) // vs5 -> /path1 -> vn7(50) // -> vn8(50) // -> /path2 -> vn9(50) // -> vn10(50) // // =======virtual nodes ========= // // vn1 -> vs1,vs2,vs3,vs4 // vn2 -> vs5,vs1,vs2,vs3 // vn3 -> vs4,vs5,vs1,vs2 // ... // // then we validate each virtual node can access each virtual service at every path, and calculates the target distribution type DynamicStack struct { // service discovery type ServiceDiscoveryType manifest.ServiceDiscoveryType // tls IsTLSEnabled bool //mtls IsmTLSEnabled bool // number of virtual service VirtualServicesCount int // number of virtual nodes count VirtualNodesCount int // number of routes per virtual router RoutesCountPerVirtualRouter int // number of targets per route TargetsCountPerRoute int // number of backends per virtual node BackendsCountPerVirtualNode int // number of replicas per virtual node ReplicasPerVirtualNode int32 // how many time to check connectivity per URL ConnectivityCheckPerURL int // ====== runtime variables ====== mesh *appmesh.Mesh namespace *corev1.Namespace cloudMapNamespace string createdNodeVNs []*appmesh.VirtualNode createdNodeDPs []*appsv1.Deployment createdNodeSVCs []*corev1.Service createdServiceVSs []*appmesh.VirtualService createdServiceVRs []*appmesh.VirtualRouter createdServiceSVCs []*corev1.Service BackendVNsByVR map[string][]string VNReferenceMap map[string][]*string } // expects the stack can be deployed to namespace successfully func (s *DynamicStack) Deploy(ctx context.Context, f *framework.Framework) { s.createMeshAndNamespace(ctx, f) if s.ServiceDiscoveryType == manifest.CloudMapServiceDiscovery { s.createCloudMapNamespace(ctx, f) time.Sleep(1 * time.Minute) } mb := &manifest.ManifestBuilder{ Namespace: s.namespace.Name, ServiceDiscoveryType: s.ServiceDiscoveryType, CloudMapNamespace: s.cloudMapNamespace, } if s.IsTLSEnabled { err := s.createCertificateAuthority(ctx, f) if err != nil { return } s.createResourcesForNodesWithTLS(ctx, f, mb) } else if s.IsmTLSEnabled { err := s.deploySDSProvider(ctx, f) if err != nil { f.Logger.Error("error creating sds provider") return } err = s.registerAgentSDSEntry() if err != nil { return } s.createResourcesForNodesWithmTLS(ctx, f, mb) } else { s.createResourcesForNodes(ctx, f, mb) } s.createResourcesForServices(ctx, f, mb) s.grantVirtualNodesBackendAccess(ctx, f) if s.IsmTLSEnabled { s.updateListenerSANsForNodes(ctx, f) } } // expects the stack can be cleaned up from namespace successfully func (s *DynamicStack) Cleanup(ctx context.Context, f *framework.Framework) { var deletionErrors []error if errs := s.revokeVirtualNodeBackendAccess(ctx, f); len(errs) != 0 { deletionErrors = append(deletionErrors, errs...) } if errs := s.deleteResourcesForServices(ctx, f); len(errs) != 0 { deletionErrors = append(deletionErrors, errs...) } if errs := s.deleteResourcesForNodes(ctx, f); len(errs) != 0 { deletionErrors = append(deletionErrors, errs...) } if s.ServiceDiscoveryType == manifest.CloudMapServiceDiscovery { if errs := s.deleteCloudMapNamespace(ctx, f); len(errs) != 0 { deletionErrors = append(deletionErrors, errs...) } } if errs := s.deleteMeshAndNamespace(ctx, f); len(errs) != 0 { deletionErrors = append(deletionErrors, errs...) } if s.IsTLSEnabled { if err := s.deleteCerts(); err != nil { f.Logger.Error("Certs clean up failed", zap.Error(err)) } } else if s.IsmTLSEnabled { if err := s.deleteSDSProvider(); err != nil { f.Logger.Error("Certs clean up failed", zap.Error(err)) } } for _, err := range deletionErrors { f.Logger.Error("clean up failed", zap.Error(err)) } Expect(len(deletionErrors)).To(BeZero()) } // Check connectivity and routing works correctly func (s *DynamicStack) Check(ctx context.Context, f *framework.Framework) { // TODO: we can just record the mapping when allocate vn->vs, instead of re-compute it here vsIndexByKey := make(map[types.NamespacedName]int, len(s.createdServiceVSs)) for i := 0; i != s.VirtualServicesCount; i++ { vsKey := k8s.NamespacedName(s.createdServiceVSs[i]) vsIndexByKey[vsKey] = i } var checkErrors []error for i := 0; i != s.VirtualNodesCount; i++ { dp := s.createdNodeDPs[i] vn := s.createdNodeVNs[i] vsIndexes := sets.NewInt() for _, backend := range vn.Spec.Backends { vsKey := references.ObjectKeyForVirtualServiceReference(vn, *backend.VirtualService.VirtualServiceRef) vsIndex := vsIndexByKey[vsKey] vsIndexes.Insert(vsIndex) } if errs := s.checkDeploymentToVirtualServiceConnectivity(ctx, f, dp, vsIndexes); len(errs) != 0 { checkErrors = append(checkErrors, errs...) } } for _, err := range checkErrors { f.Logger.Error("connectivity check failed", zap.Error(err)) } Expect(len(checkErrors)).To(BeZero()) } func (s *DynamicStack) createMeshAndNamespace(ctx context.Context, f *framework.Framework) { By("create a mesh", func() { meshName := fmt.Sprintf("%s-%s", f.Options.ClusterName, utils.RandomDNS1123Label(6)) mesh := &appmesh.Mesh{ ObjectMeta: metav1.ObjectMeta{ Name: meshName, }, Spec: appmesh.MeshSpec{ NamespaceSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "mesh": meshName, }, }, }, } err := f.K8sClient.Create(ctx, mesh) Expect(err).NotTo(HaveOccurred()) s.mesh = mesh }) By(fmt.Sprintf("wait for mesh %s become active", s.mesh.Name), func() { mesh, err := f.MeshManager.WaitUntilMeshActive(ctx, s.mesh) Expect(err).NotTo(HaveOccurred()) s.mesh = mesh }) By("allocates a namespace", func() { if s.IsTLSEnabled { namespace := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: "tls-e2e", }, } err := f.K8sClient.Create(ctx, namespace) Expect(err).NotTo(HaveOccurred()) s.namespace = namespace } else if s.IsmTLSEnabled { namespace := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: "mtls-e2e", }, } err := f.K8sClient.Create(ctx, namespace) Expect(err).NotTo(HaveOccurred()) s.namespace = namespace } else { namespace, err := f.NSManager.AllocateNamespace(ctx, "appmesh") Expect(err).NotTo(HaveOccurred()) s.namespace = namespace } }) By("label namespace with appMesh inject", func() { oldNS := s.namespace.DeepCopy() s.namespace.Labels = algorithm.MergeStringMap(map[string]string{ "appmesh.k8s.aws/sidecarInjectorWebhook": "enabled", "mesh": s.mesh.Name, }, s.namespace.Labels) err := f.K8sClient.Patch(ctx, s.namespace, client.MergeFrom(oldNS)) Expect(err).NotTo(HaveOccurred()) }) } func (s *DynamicStack) deleteMeshAndNamespace(ctx context.Context, f *framework.Framework) []error { var deletionErrors []error if s.namespace != nil { By(fmt.Sprintf("delete namespace: %s", s.namespace.Name), func() { if err := f.K8sClient.Delete(ctx, s.namespace, client.PropagationPolicy(metav1.DeletePropagationForeground), client.GracePeriodSeconds(0)); err != nil { f.Logger.Error("failed to delete namespace", zap.String("namespace", s.namespace.Name), zap.Error(err)) deletionErrors = append(deletionErrors, err) return } By(fmt.Sprintf("wait namespace to be deleted: %s", s.namespace.Namespace), func() { if err := f.NSManager.WaitUntilNamespaceDeleted(ctx, s.namespace); err != nil { f.Logger.Error("failed to wait namespace deletion", zap.String("namespace", s.namespace.Name), zap.Error(err)) deletionErrors = append(deletionErrors, err) } }) }) } if s.mesh != nil { By(fmt.Sprintf("delete mesh %s", s.mesh.Name), func() { if err := f.K8sClient.Delete(ctx, s.mesh, client.PropagationPolicy(metav1.DeletePropagationForeground), client.GracePeriodSeconds(0)); err != nil { f.Logger.Error("failed to delete mesh", zap.String("mesh", s.mesh.Name), zap.Error(err)) deletionErrors = append(deletionErrors, err) return } By(fmt.Sprintf("wait mesh to be deleted: %s", s.mesh.Name), func() { if err := f.MeshManager.WaitUntilMeshDeleted(ctx, s.mesh); err != nil { f.Logger.Error("failed to wait mesh deletion", zap.String("mesh", s.mesh.Name), zap.Error(err)) deletionErrors = append(deletionErrors, err) } }) }) } return deletionErrors } func (s *DynamicStack) createCloudMapNamespace(ctx context.Context, f *framework.Framework) { cmNamespace := fmt.Sprintf("%s-%s", f.Options.ClusterName, utils.RandomDNS1123Label(6)) if s.IsTLSEnabled { cmNamespace = "tls-e2e.svc.cluster.local" } By(fmt.Sprintf("create cloudMap namespace %s", cmNamespace), func() { resp, err := f.CloudMapClient.CreatePrivateDnsNamespaceWithContext(ctx, &servicediscovery.CreatePrivateDnsNamespaceInput{ Name: aws.String(cmNamespace), Vpc: aws.String(f.Options.AWSVPCID), }) Expect(err).NotTo(HaveOccurred()) s.cloudMapNamespace = cmNamespace f.Logger.Info("created cloudMap namespace", zap.String("namespace", cmNamespace), zap.String("operationID", aws.StringValue(resp.OperationId)), ) }) } func (s *DynamicStack) deleteCloudMapNamespace(ctx context.Context, f *framework.Framework) []error { var deletionErrors []error if s.cloudMapNamespace != "" { By(fmt.Sprintf("delete cloudMap namespace %s", s.cloudMapNamespace), func() { var cmNamespaceID string f.CloudMapClient.ListNamespacesPagesWithContext(ctx, &servicediscovery.ListNamespacesInput{}, func(output *servicediscovery.ListNamespacesOutput, b bool) bool { for _, ns := range output.Namespaces { if aws.StringValue(ns.Name) == s.cloudMapNamespace { cmNamespaceID = aws.StringValue(ns.Id) return true } } return false }) if cmNamespaceID == "" { err := errors.Errorf("cannot find cloudMap namespace with name %s", s.cloudMapNamespace) f.Logger.Error("failed to delete cloudMap namespace", zap.String("namespace", s.cloudMapNamespace), zap.Error(err)) deletionErrors = append(deletionErrors, err) return } // hummm, let's fix the controller bug in test cases first xD: // https://github.com/aws/aws-app-mesh-controller-for-k8s/issues/107 // https://github.com/aws/aws-app-mesh-controller-for-k8s/issues/131 By(fmt.Sprintf("[bug workaround] clean up resources in cloudMap namespace %s", s.cloudMapNamespace), func() { // give controller a break to deregister instance xD time.Sleep(1 * time.Minute) var cmServiceIDs []string f.CloudMapClient.ListServicesPagesWithContext(ctx, &servicediscovery.ListServicesInput{ Filters: []*servicediscovery.ServiceFilter{ { Condition: aws.String(servicediscovery.FilterConditionEq), Name: aws.String("NAMESPACE_ID"), Values: aws.StringSlice([]string{cmNamespaceID}), }, }, }, func(output *servicediscovery.ListServicesOutput, b bool) bool { for _, svc := range output.Services { cmServiceIDs = append(cmServiceIDs, aws.StringValue(svc.Id)) } return false }) for _, cmServiceID := range cmServiceIDs { var cmInstanceIDs []string f.CloudMapClient.ListInstancesPagesWithContext(ctx, &servicediscovery.ListInstancesInput{ ServiceId: aws.String(cmServiceID), }, func(output *servicediscovery.ListInstancesOutput, b bool) bool { for _, ins := range output.Instances { cmInstanceIDs = append(cmInstanceIDs, aws.StringValue(ins.Id)) } return false }) for _, cmInstanceID := range cmInstanceIDs { if _, err := f.CloudMapClient.DeregisterInstanceWithContext(ctx, &servicediscovery.DeregisterInstanceInput{ ServiceId: aws.String(cmServiceID), InstanceId: aws.String(cmInstanceID), }); err != nil { f.Logger.Error("failed to deregister cloudMap instance", zap.String("namespaceID", cmNamespaceID), zap.String("serviceID", cmServiceID), zap.String("instanceID", cmInstanceID), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } } time.Sleep(30 * time.Second) if _, err := f.CloudMapClient.DeleteServiceWithContext(ctx, &servicediscovery.DeleteServiceInput{ Id: aws.String(cmServiceID), }); err != nil { f.Logger.Error("failed to delete cloudMap service", zap.String("namespaceID", cmNamespaceID), zap.String("serviceID", cmServiceID), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } } }) time.Sleep(30 * time.Second) if _, err := f.CloudMapClient.DeleteNamespaceWithContext(ctx, &servicediscovery.DeleteNamespaceInput{ Id: aws.String(cmNamespaceID), }); err != nil { f.Logger.Error("failed to delete cloudMap namespace", zap.String("namespaceID", cmNamespaceID), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } return deletionErrors } func (s *DynamicStack) createCertificateAuthority(ctx context.Context, f *framework.Framework) error { _, err := exec.Command("/bin/sh", caCertScript).Output() if err != nil { return fmt.Errorf("error: %s", err) } return nil } func (s *DynamicStack) createTLSCertsForNodes(nodeName string) error { nodeCertCfgFile := certsBasePath + nodeName + certsCfgFileSuffix replaceExpr := "s/node/" + nodeName + "/g" cmd := exec.Command("sed", "-e", replaceExpr, genericNodeCertCfgFile) certFile, err := os.OpenFile(nodeCertCfgFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { return fmt.Errorf("error opening file: %s", err) } defer certFile.Close() cmd.Stdout = certFile err = cmd.Run() if err != nil { return fmt.Errorf("error: %s", err) } _, err = exec.Command("/bin/sh", nodeCertScript, nodeName).Output() if err != nil { return fmt.Errorf("error: %s", err) } return nil } func (s *DynamicStack) deleteCerts() error { fmt.Printf("Delete Certs") _, err := exec.Command("/bin/sh", certCleanupScript).Output() if err != nil { return fmt.Errorf("error %s", err) } return nil } func (s *DynamicStack) createSecretsForNodeResource(ctx context.Context, f *framework.Framework, mb *manifest.ManifestBuilder, nodeName string, nodeSecretName string) error { nodeCertChain := nodeName + certChainSuffix nodeKey := nodeName + certKeySuffix frontendTLSFiles := []string{caCertFile, nodeCertChain, nodeKey} secret := mb.BuildK8SSecretsFromPemFile(certsBasePath, frontendTLSFiles, nodeSecretName, f) err := f.K8sClient.Create(ctx, secret) if err != nil { return err } return nil } func (s *DynamicStack) deploySDSProvider(ctx context.Context, f *framework.Framework) error { _, err := exec.Command("/bin/sh", sdsDeployScript, "deploy").Output() if err != nil { return fmt.Errorf("error: %s", err) } // TODO - Convert this to a watch time.Sleep(30 * time.Second) return nil } func (s *DynamicStack) deleteSDSProvider() error { _, err := exec.Command("/bin/sh", sdsDeployScript, "delete").Output() if err != nil { return fmt.Errorf("error: %s", err) } return nil } func (s *DynamicStack) registerAgentSDSEntry() error { _, err := exec.Command("/bin/sh", registerAgentIdentity).Output() if err != nil { return fmt.Errorf("error: %s", err) } return nil } func (s *DynamicStack) registerVirtualNodeSDSEntry(nodeName string) error { _, err := exec.Command("/bin/sh", registerWorkloadIdentity, nodeName).Output() if err != nil { return fmt.Errorf("error: %s", err) } return nil } func (s *DynamicStack) createResourcesForNodes(ctx context.Context, f *framework.Framework, mb *manifest.ManifestBuilder) { By("create all resources for nodes", func() { s.createdNodeVNs = make([]*appmesh.VirtualNode, s.VirtualNodesCount) s.createdNodeDPs = make([]*appsv1.Deployment, s.VirtualNodesCount) s.createdNodeSVCs = make([]*corev1.Service, s.VirtualNodesCount) vnBuilder := &manifest.VNBuilder{ ServiceDiscoveryType: s.ServiceDiscoveryType, Namespace: s.namespace.Name, CloudMapNamespace: s.cloudMapNamespace, } for i := 0; i != s.VirtualNodesCount; i++ { instanceName := fmt.Sprintf("node-%d", i) By(fmt.Sprintf("create VirtualNode for node #%d", i), func() { listeners := []appmesh.Listener{vnBuilder.BuildListener("http", 9080)} backends := []types.NamespacedName{} vn := vnBuilder.BuildVirtualNode(instanceName, backends, listeners, &appmesh.BackendDefaults{}) err := f.K8sClient.Create(ctx, vn) Expect(err).NotTo(HaveOccurred()) s.createdNodeVNs[i] = vn }) By(fmt.Sprintf("create Deployment for node #%d", i), func() { containersInfo := []manifest.ContainerInfo{ { Name: "app", AppImage: defaultAppImage, ContainerPort: AppContainerPort, Env: []corev1.EnvVar{ { Name: "SERVER_PORT", Value: fmt.Sprintf("%d", AppContainerPort), }, { Name: "COLOR", Value: instanceName, }, }, }, { Name: "http-proxy", AppImage: defaultHTTPProxyImage, ContainerPort: HttpProxyContainerPort, Args: []string{ "--hostname=0.0.0.0", fmt.Sprintf("--port=%d", HttpProxyContainerPort), }, }, } containers := mb.BuildContainerSpec(containersInfo) dp := mb.BuildDeployment(instanceName, s.ReplicasPerVirtualNode, containers, map[string]string{}) err := f.K8sClient.Create(ctx, dp) Expect(err).NotTo(HaveOccurred()) s.createdNodeDPs[i] = dp }) By(fmt.Sprintf("create Service for node #%d", i), func() { svc := mb.BuildServiceWithSelector(instanceName, AppContainerPort, AppContainerPort) err := f.K8sClient.Create(ctx, svc) Expect(err).NotTo(HaveOccurred()) s.createdNodeSVCs[i] = svc }) } By("wait all VirtualNodes become active", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] vn, err := f.VNManager.WaitUntilVirtualNodeActive(ctx, vn) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) waitErrorsMutex.Unlock() return } s.createdNodeVNs[nodeIndex] = vn }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualNodes become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("wait all deployments become ready", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() dp := s.createdNodeDPs[nodeIndex] dp, err := f.DPManager.WaitUntilDeploymentReady(ctx, dp) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "Deployment: %v", k8s.NamespacedName(dp).String())) waitErrorsMutex.Unlock() return } s.createdNodeDPs[nodeIndex] = dp }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all Deployments become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("check all VirtualNode in aws", func() { var checkErrors []error checkErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] err := f.VNManager.CheckVirtualNodeInAWS(ctx, s.mesh, vn) if err != nil { checkErrorsMutex.Lock() checkErrors = append(checkErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) checkErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, checkErr := range checkErrors { f.Logger.Error("failed to check all VirtualNodes in aws", zap.Error(checkErr)) } Expect(len(checkErrors)).To(BeZero()) }) }) } func (s *DynamicStack) createResourcesForNodesWithTLS(ctx context.Context, f *framework.Framework, mb *manifest.ManifestBuilder) { By("create all resources for nodes", func() { s.createdNodeVNs = make([]*appmesh.VirtualNode, s.VirtualNodesCount) s.createdNodeDPs = make([]*appsv1.Deployment, s.VirtualNodesCount) s.createdNodeSVCs = make([]*corev1.Service, s.VirtualNodesCount) vnBuilder := &manifest.VNBuilder{ ServiceDiscoveryType: s.ServiceDiscoveryType, Namespace: s.namespace.Name, CloudMapNamespace: s.cloudMapNamespace, } for i := 0; i != s.VirtualNodesCount; i++ { instanceName := fmt.Sprintf("node-%d", i) By(fmt.Sprintf("create certs for node #%d", i), func() { err := s.createTLSCertsForNodes(instanceName) Expect(err).NotTo(HaveOccurred()) }) By(fmt.Sprintf("create certs for node #%d", i), func() { nodeSecretName := fmt.Sprintf("node-%d-tls", i) err := s.createSecretsForNodeResource(ctx, f, mb, instanceName, nodeSecretName) Expect(err).NotTo(HaveOccurred()) }) By(fmt.Sprintf("create VirtualNode for node #%d", i), func() { tlsEnforce := true nodeBackendDefaults := &appmesh.BackendDefaults{ ClientPolicy: &appmesh.ClientPolicy{ TLS: &appmesh.ClientPolicyTLS{ Enforce: &tlsEnforce, Ports: nil, Validation: appmesh.TLSValidationContext{ Trust: appmesh.TLSValidationContextTrust{ ACM: nil, File: &appmesh.TLSValidationContextFileTrust{CertificateChain: envoyCACertPath}, }, }, }, }, } nodeCertificateChain := "/certs/" + instanceName + certChainSuffix nodePrivateKey := "/certs/" + instanceName + certKeySuffix nodeListenerTLS := &appmesh.ListenerTLS{ Certificate: appmesh.ListenerTLSCertificate{ File: &appmesh.ListenerTLSFileCertificate{ CertificateChain: nodeCertificateChain, PrivateKey: nodePrivateKey, }, }, Mode: "STRICT", } listeners := []appmesh.Listener{vnBuilder.BuildListenerWithTLS("http", AppContainerPort, nodeListenerTLS)} vn := vnBuilder.BuildVirtualNode(instanceName, nil, listeners, nodeBackendDefaults) err := f.K8sClient.Create(ctx, vn) Expect(err).NotTo(HaveOccurred()) s.createdNodeVNs[i] = vn }) By(fmt.Sprintf("create Deployment for node #%d", i), func() { nodeSecretName := fmt.Sprintf("node-%d-tls", i) certsPath := nodeSecretName + ":/certs/" annotations := map[string]string{ "appmesh.k8s.aws/secretMounts": certsPath, } containersInfo := []manifest.ContainerInfo{ { Name: "app", AppImage: defaultAppImage, ContainerPort: AppContainerPort, Env: []corev1.EnvVar{ { Name: "SERVER_PORT", Value: fmt.Sprintf("%d", AppContainerPort), }, { Name: "COLOR", Value: instanceName, }, }, }, { Name: "http-proxy", AppImage: defaultHTTPProxyImage, ContainerPort: HttpProxyContainerPort, Args: []string{ "--hostname=0.0.0.0", fmt.Sprintf("--port=%d", HttpProxyContainerPort), }, }, } containers := mb.BuildContainerSpec(containersInfo) dp := mb.BuildDeployment(instanceName, s.ReplicasPerVirtualNode, containers, annotations) err := f.K8sClient.Create(ctx, dp) Expect(err).NotTo(HaveOccurred()) s.createdNodeDPs[i] = dp }) By(fmt.Sprintf("create Service for node #%d", i), func() { svc := mb.BuildServiceWithSelector(instanceName, AppContainerPort, AppContainerPort) err := f.K8sClient.Create(ctx, svc) Expect(err).NotTo(HaveOccurred()) s.createdNodeSVCs[i] = svc }) } By("wait all VirtualNodes become active", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] vn, err := f.VNManager.WaitUntilVirtualNodeActive(ctx, vn) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) waitErrorsMutex.Unlock() return } s.createdNodeVNs[nodeIndex] = vn }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualNodes become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("wait all deployments become ready", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() dp := s.createdNodeDPs[nodeIndex] dp, err := f.DPManager.WaitUntilDeploymentReady(ctx, dp) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "Deployment: %v", k8s.NamespacedName(dp).String())) waitErrorsMutex.Unlock() return } s.createdNodeDPs[nodeIndex] = dp }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all Deployments become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("check all VirtualNode in aws", func() { var checkErrors []error checkErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] err := f.VNManager.CheckVirtualNodeInAWS(ctx, s.mesh, vn) if err != nil { checkErrorsMutex.Lock() checkErrors = append(checkErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) checkErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, checkErr := range checkErrors { f.Logger.Error("failed to check all VirtualNodes in aws", zap.Error(checkErr)) } Expect(len(checkErrors)).To(BeZero()) }) }) } func (s *DynamicStack) createResourcesForNodesWithmTLS(ctx context.Context, f *framework.Framework, mb *manifest.ManifestBuilder) { By("create all resources for nodes", func() { s.createdNodeVNs = make([]*appmesh.VirtualNode, s.VirtualNodesCount) s.createdNodeDPs = make([]*appsv1.Deployment, s.VirtualNodesCount) s.createdNodeSVCs = make([]*corev1.Service, s.VirtualNodesCount) vnBuilder := &manifest.VNBuilder{ ServiceDiscoveryType: s.ServiceDiscoveryType, Namespace: s.namespace.Name, CloudMapNamespace: s.cloudMapNamespace, } for i := 0; i != s.VirtualNodesCount; i++ { instanceName := fmt.Sprintf("node-%d", i) By(fmt.Sprintf("register workload entry with SDS Provider for node #%d", i), func() { err := s.registerVirtualNodeSDSEntry(instanceName) Expect(err).NotTo(HaveOccurred()) }) By(fmt.Sprintf("create VirtualNode for node #%d", i), func() { tlsEnforce := true nodeSVID := mTLSe2eValidationContext + "/" + instanceName nodeBackendDefaults := &appmesh.BackendDefaults{ ClientPolicy: &appmesh.ClientPolicy{ TLS: &appmesh.ClientPolicyTLS{ Enforce: &tlsEnforce, Ports: nil, Validation: appmesh.TLSValidationContext{ Trust: appmesh.TLSValidationContextTrust{ SDS: &appmesh.TLSValidationContextSDSTrust{ SecretName: &mTLSe2eValidationContext, }, }, SubjectAlternativeNames: &appmesh.SubjectAlternativeNames{ Match: &appmesh.SubjectAlternativeNameMatchers{ Exact: []*string{ &nodeSVID, }, }, }, }, Certificate: &appmesh.ClientTLSCertificate{ SDS: &appmesh.ListenerTLSSDSCertificate{ SecretName: &nodeSVID, }, }, }, }, } nodeListenerTLS := &appmesh.ListenerTLS{ Certificate: appmesh.ListenerTLSCertificate{ SDS: &appmesh.ListenerTLSSDSCertificate{ SecretName: &nodeSVID, }, }, Validation: &appmesh.ListenerTLSValidationContext{ Trust: appmesh.ListenerTLSValidationContextTrust{ SDS: &appmesh.TLSValidationContextSDSTrust{ SecretName: &mTLSe2eValidationContext, }, }, SubjectAlternativeNames: &appmesh.SubjectAlternativeNames{ Match: &appmesh.SubjectAlternativeNameMatchers{ Exact: []*string{ &nodeSVID, }, }, }, }, Mode: "STRICT", } listeners := []appmesh.Listener{vnBuilder.BuildListenerWithTLS("http", AppContainerPort, nodeListenerTLS)} vn := vnBuilder.BuildVirtualNode(instanceName, nil, listeners, nodeBackendDefaults) err := f.K8sClient.Create(ctx, vn) Expect(err).NotTo(HaveOccurred()) s.createdNodeVNs[i] = vn }) By(fmt.Sprintf("create Deployment for node #%d", i), func() { containersInfo := []manifest.ContainerInfo{ { Name: "app", AppImage: defaultAppImage, ContainerPort: AppContainerPort, Env: []corev1.EnvVar{ { Name: "SERVER_PORT", Value: fmt.Sprintf("%d", AppContainerPort), }, { Name: "COLOR", Value: instanceName, }, }, }, { Name: "http-proxy", AppImage: defaultHTTPProxyImage, ContainerPort: HttpProxyContainerPort, Args: []string{ "--hostname=0.0.0.0", fmt.Sprintf("--port=%d", HttpProxyContainerPort), }, }, } containers := mb.BuildContainerSpec(containersInfo) dp := mb.BuildDeployment(instanceName, s.ReplicasPerVirtualNode, containers, nil) err := f.K8sClient.Create(ctx, dp) Expect(err).NotTo(HaveOccurred()) s.createdNodeDPs[i] = dp }) By(fmt.Sprintf("create Service for node #%d", i), func() { svc := mb.BuildServiceWithSelector(instanceName, AppContainerPort, AppContainerPort) err := f.K8sClient.Create(ctx, svc) Expect(err).NotTo(HaveOccurred()) s.createdNodeSVCs[i] = svc }) } By("wait all VirtualNodes become active", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] vn, err := f.VNManager.WaitUntilVirtualNodeActive(ctx, vn) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) waitErrorsMutex.Unlock() return } s.createdNodeVNs[nodeIndex] = vn }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualNodes become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("wait all deployments become ready", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() dp := s.createdNodeDPs[nodeIndex] dp, err := f.DPManager.WaitUntilDeploymentReady(ctx, dp) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "Deployment: %v", k8s.NamespacedName(dp).String())) waitErrorsMutex.Unlock() return } s.createdNodeDPs[nodeIndex] = dp }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all Deployments become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("check all VirtualNode in aws", func() { var checkErrors []error checkErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualNodesCount; i++ { wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] err := f.VNManager.CheckVirtualNodeInAWS(ctx, s.mesh, vn) if err != nil { checkErrorsMutex.Lock() checkErrors = append(checkErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) checkErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, checkErr := range checkErrors { f.Logger.Error("failed to check all VirtualNodes in aws", zap.Error(checkErr)) } Expect(len(checkErrors)).To(BeZero()) }) }) } func (s *DynamicStack) deleteResourcesForNodes(ctx context.Context, f *framework.Framework) []error { var deletionErrors []error By("delete all resources for nodes", func() { for i, svc := range s.createdNodeSVCs { if svc == nil { continue } By(fmt.Sprintf("delete Service for node #%d", i), func() { if err := f.K8sClient.Delete(ctx, svc); err != nil { f.Logger.Error("failed to delete Service", zap.String("namespace", svc.Namespace), zap.String("name", svc.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } for i, dp := range s.createdNodeDPs { if dp == nil { continue } By(fmt.Sprintf("delete Deployment for node #%d", i), func() { if err := f.K8sClient.Delete(ctx, dp, client.PropagationPolicy(metav1.DeletePropagationForeground), client.GracePeriodSeconds(0)); err != nil { f.Logger.Error("failed to delete Deployment", zap.String("namespace", dp.Namespace), zap.String("name", dp.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } for i, vn := range s.createdNodeVNs { if vn == nil { continue } By(fmt.Sprintf("delete VirtualNode for node #%d", i), func() { if err := f.K8sClient.Delete(ctx, vn); err != nil { f.Logger.Error("failed to delete VirtualNode", zap.String("namespace", vn.Namespace), zap.String("name", vn.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } By("wait all deployments become deleted", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i, dp := range s.createdNodeDPs { if dp == nil { continue } wg.Add(1) go func(nodeIndex int) { defer wg.Done() dp := s.createdNodeDPs[nodeIndex] if err := f.DPManager.WaitUntilDeploymentDeleted(ctx, dp); err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "Deployment: %v", k8s.NamespacedName(dp).String())) waitErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all Deployments become deleted", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("wait all VirtualNodes become deleted", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i, vn := range s.createdNodeVNs { if vn == nil { continue } wg.Add(1) go func(nodeIndex int) { defer wg.Done() vn := s.createdNodeVNs[nodeIndex] if err := f.VNManager.WaitUntilVirtualNodeDeleted(ctx, vn); err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualNode: %v", k8s.NamespacedName(vn).String())) waitErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualNode become deleted", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) }) return deletionErrors } func (s *DynamicStack) createResourcesForServices(ctx context.Context, f *framework.Framework, mb *manifest.ManifestBuilder) { By("create all resources for services", func() { s.createdServiceVSs = make([]*appmesh.VirtualService, s.VirtualServicesCount) s.createdServiceVRs = make([]*appmesh.VirtualRouter, s.VirtualServicesCount) s.createdServiceSVCs = make([]*corev1.Service, s.VirtualServicesCount) s.BackendVNsByVR = make(map[string][]string) vrBuilder := &manifest.VRBuilder{ Namespace: s.namespace.Name, } vsBuilder := &manifest.VSBuilder{ Namespace: s.namespace.Name, } nextVirtualNodeIndex := 0 for i := 0; i != s.VirtualServicesCount; i++ { instanceName := fmt.Sprintf("service-%d", i) var VRBackends []string By(fmt.Sprintf("create VirtualRouter for service #%d", i), func() { var routeCfgs []manifest.RouteToWeightedVirtualNodes for routeIndex := 0; routeIndex != s.RoutesCountPerVirtualRouter; routeIndex++ { var weightedTargets []manifest.WeightedVirtualNode for targetIndex := 0; targetIndex != s.TargetsCountPerRoute; targetIndex++ { weightedTargets = append(weightedTargets, manifest.WeightedVirtualNode{ VirtualNode: k8s.NamespacedName(s.createdNodeVNs[nextVirtualNodeIndex]), Weight: 1, }) VRBackends = append(VRBackends, s.createdNodeVNs[nextVirtualNodeIndex].Name) nextVirtualNodeIndex = (nextVirtualNodeIndex + 1) % s.VirtualNodesCount } routeCfgs = append(routeCfgs, manifest.RouteToWeightedVirtualNodes{ Path: fmt.Sprintf("/path-%d", routeIndex), WeightedTargets: weightedTargets, }) } routes := vrBuilder.BuildRoutes(routeCfgs) vrBuilder.Listeners = []appmesh.VirtualRouterListener{vrBuilder.BuildVirtualRouterListener("http", AppContainerPort)} vr := vrBuilder.BuildVirtualRouter(instanceName, routes) err := f.K8sClient.Create(ctx, vr) Expect(err).NotTo(HaveOccurred()) s.createdServiceVRs[i] = vr s.BackendVNsByVR[instanceName] = VRBackends }) By(fmt.Sprintf("create VirtualService for service #%d", i), func() { vs := vsBuilder.BuildVirtualServiceWithRouterBackend(instanceName, instanceName) err := f.K8sClient.Create(ctx, vs) Expect(err).NotTo(HaveOccurred()) s.createdServiceVSs[i] = vs }) By(fmt.Sprintf("create Service for service #%d", i), func() { svc := mb.BuildServiceWithSelector(instanceName, AppContainerPort, AppContainerPort) err := f.K8sClient.Create(ctx, svc) Expect(err).NotTo(HaveOccurred()) s.createdServiceSVCs[i] = svc }) } By("wait all VirtualService become active", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualServicesCount; i++ { wg.Add(1) go func(serviceIndex int) { defer wg.Done() vs, err := f.VSManager.WaitUntilVirtualServiceActive(ctx, s.createdServiceVSs[serviceIndex]) if err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualService: %v", k8s.NamespacedName(vs).String())) waitErrorsMutex.Unlock() return } s.createdServiceVSs[serviceIndex] = vs }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualService become active", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("check all VirtualRouters in AWS", func() { var checkErrors []error checkErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualServicesCount; i++ { wg.Add(1) go func(serviceIndex int) { defer wg.Done() vr := s.createdServiceVRs[serviceIndex] err := f.VRManager.CheckVirtualRouterInAWS(ctx, s.mesh, vr) if err != nil { checkErrorsMutex.Lock() checkErrors = append(checkErrors, errors.Wrapf(err, "VirtualRouter: %v", k8s.NamespacedName(vr).String())) checkErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, checkErr := range checkErrors { f.Logger.Error("failed to check all VirtualRouters in AWS", zap.Error(checkErr)) } Expect(len(checkErrors)).To(BeZero()) }) By("check all VirtualService in AWS", func() { var checkErrors []error checkErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i := 0; i != s.VirtualServicesCount; i++ { wg.Add(1) go func(serviceIndex int) { defer wg.Done() vs := s.createdServiceVSs[serviceIndex] err := f.VSManager.CheckVirtualServiceInAWS(ctx, s.mesh, vs) if err != nil { checkErrorsMutex.Lock() checkErrors = append(checkErrors, errors.Wrapf(err, "VirtualService: %v", k8s.NamespacedName(vs).String())) checkErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, checkErr := range checkErrors { f.Logger.Error("failed to check all VirtualService in AWS", zap.Error(checkErr)) } Expect(len(checkErrors)).To(BeZero()) }) }) } func (s *DynamicStack) deleteResourcesForServices(ctx context.Context, f *framework.Framework) []error { var deletionErrors []error By("delete all resources for services", func() { for i, svc := range s.createdServiceSVCs { if svc == nil { continue } By(fmt.Sprintf("delete Service for service #%d", i), func() { if err := f.K8sClient.Delete(ctx, svc); err != nil { f.Logger.Error("failed to delete Service", zap.String("namespace", svc.Namespace), zap.String("name", svc.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } for i, vs := range s.createdServiceVSs { if vs == nil { continue } By(fmt.Sprintf("delete VirtualService for service #%d", i), func() { if err := f.K8sClient.Delete(ctx, vs); err != nil { f.Logger.Error("failed to delete VirtualService", zap.String("namespace", vs.Namespace), zap.String("name", vs.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } for i, vr := range s.createdServiceVRs { if vr == nil { continue } By(fmt.Sprintf("delete VirtualRouter for service #%d", i), func() { if err := f.K8sClient.Delete(ctx, vr); err != nil { f.Logger.Error("failed to delete VirtualRouter", zap.String("namespace", vr.Namespace), zap.String("name", vr.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } By("wait all VirtualService become deleted", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i, vs := range s.createdServiceVSs { if vs == nil { continue } wg.Add(1) go func(serviceIndex int) { defer wg.Done() vs := s.createdServiceVSs[serviceIndex] if err := f.VSManager.WaitUntilVirtualServiceDeleted(ctx, vs); err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualService: %v", k8s.NamespacedName(vs).String())) waitErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualService become deleted", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) By("wait all VirtualRouter become deleted", func() { var waitErrors []error waitErrorsMutex := &sync.Mutex{} var wg sync.WaitGroup for i, vr := range s.createdServiceVRs { if vr == nil { continue } wg.Add(1) go func(serviceIndex int) { defer wg.Done() vr := s.createdServiceVRs[serviceIndex] if err := f.VRManager.WaitUntilVirtualRouterDeleted(ctx, vr); err != nil { waitErrorsMutex.Lock() waitErrors = append(waitErrors, errors.Wrapf(err, "VirtualRouter: %v", k8s.NamespacedName(vr).String())) waitErrorsMutex.Unlock() return } }(i) } wg.Wait() for _, waitErr := range waitErrors { f.Logger.Error("failed to wait all VirtualRouter become deleted", zap.Error(waitErr)) } Expect(len(waitErrors)).To(BeZero()) }) }) return deletionErrors } func (s *DynamicStack) grantVirtualNodesBackendAccess(ctx context.Context, f *framework.Framework) { By("granting VirtualNodes backend access", func() { s.VNReferenceMap = make(map[string][]*string) nextVirtualServiceIndex := 0 for i, vn := range s.createdNodeVNs { if vn == nil { continue } By(fmt.Sprintf("granting VirtualNode backend access for node #%d", i), func() { var vnBackends []appmesh.Backend var backendSANs []*string backendSANsMap := make(map[string]bool) for backendIndex := 0; backendIndex != s.BackendsCountPerVirtualNode; backendIndex++ { vs := s.createdServiceVSs[nextVirtualServiceIndex] vnBackends = append(vnBackends, appmesh.Backend{ VirtualService: appmesh.VirtualServiceBackend{ VirtualServiceRef: &appmesh.VirtualServiceReference{ Namespace: aws.String(vs.Namespace), Name: vs.Name, }, }, }) if s.IsmTLSEnabled { for _, backendVN := range s.BackendVNsByVR[vs.Name] { if _, ok := backendSANsMap[backendVN]; ok { continue } backendSANsMap[backendVN] = true listenerSVID := mTLSe2eValidationContext + "/" + vn.Name backendSVID := mTLSe2eValidationContext + "/" + backendVN s.VNReferenceMap[backendVN] = append(s.VNReferenceMap[backendVN], &listenerSVID) backendSANs = append(backendSANs, &backendSVID) } } nextVirtualServiceIndex = (nextVirtualServiceIndex + 1) % s.VirtualServicesCount } vnNew := vn.DeepCopy() vnNew.Spec.Backends = vnBackends if s.IsmTLSEnabled { vnNew.Spec.BackendDefaults.ClientPolicy.TLS.Validation.SubjectAlternativeNames.Match.Exact = backendSANs } err := f.K8sClient.Patch(ctx, vnNew, client.MergeFrom(vn)) Expect(err).NotTo(HaveOccurred()) s.createdNodeVNs[i] = vnNew }) } }) } func (s *DynamicStack) revokeVirtualNodeBackendAccess(ctx context.Context, f *framework.Framework) []error { var deletionErrors []error By("revoking VirtualNodes backend access", func() { for i, vn := range s.createdNodeVNs { if vn == nil || len(vn.Spec.Backends) == 0 { continue } By(fmt.Sprintf("revoking VirtualNode backend access for node #%d", i), func() { vnNew := vn.DeepCopy() vnNew.Spec.Backends = nil err := f.K8sClient.Patch(ctx, vnNew, client.MergeFrom(vn)) if err != nil { f.Logger.Error("failed to revoke VirtualNode backend access", zap.String("namespace", vn.Namespace), zap.String("name", vn.Name), zap.Error(err), ) deletionErrors = append(deletionErrors, err) } }) } }) return deletionErrors } func (s *DynamicStack) updateListenerSANsForNodes(ctx context.Context, f *framework.Framework) { By("updating VirtualNodes listener SAN values", func() { for i, vn := range s.createdNodeVNs { if vn == nil { continue } By(fmt.Sprintf("updating VirtualNodes listener SAN values for node #%d", i), func() { vnNew := vn.DeepCopy() listenerSANs := s.VNReferenceMap[vn.Name] vnNew.Spec.Listeners[0].TLS.Validation.SubjectAlternativeNames.Match.Exact = listenerSANs err := f.K8sClient.Patch(ctx, vnNew, client.MergeFrom(vn)) Expect(err).NotTo(HaveOccurred()) }) } }) } func (s *DynamicStack) checkDeploymentToVirtualServiceConnectivity(ctx context.Context, f *framework.Framework, dp *appsv1.Deployment, vsIndexes sets.Int) []error { sel := labels.Set(dp.Spec.Selector.MatchLabels) podList := &corev1.PodList{} err := f.K8sClient.List(ctx, podList, client.InNamespace(dp.Namespace), client.MatchingLabelsSelector{Selector: sel.AsSelector()}) if err != nil { return []error{errors.Wrapf(err, "failed to get pods for Deployment: %v", k8s.NamespacedName(dp).String())} } if len(podList.Items) == 0 { return []error{errors.Wrapf(err, "Deployment have zero pods: %v", k8s.NamespacedName(dp).String())} } var checkErrors []error for i := range podList.Items { pod := podList.Items[i].DeepCopy() By(fmt.Sprintf("check pod %s/%s connectivity to services", pod.Namespace, pod.Name), func() { if errs := s.checkPodToVirtualServiceConnectivity(ctx, f, pod, vsIndexes); len(errs) != 0 { checkErrors = append(checkErrors, errs...) } }) } return checkErrors } func (s *DynamicStack) checkPodToVirtualServiceConnectivity(ctx context.Context, f *framework.Framework, pod *corev1.Pod, vsIndexes sets.Int) []error { connectivityCheckEntries, err := s.obtainPodToVirtualServiceConnectivityEntries(ctx, f, pod, vsIndexes) if err != nil { return []error{err} } retErrCounterByRUL := make(map[string]map[string]int) retStatusNotOKCounterByURL := make(map[string]map[int]int) retBodyCounterByURL := make(map[string]map[string]int) for _, checkEntry := range connectivityCheckEntries { if _, ok := retBodyCounterByURL[checkEntry.dstURL]; !ok { retBodyCounterByURL[checkEntry.dstURL] = make(map[string]int) } if checkEntry.retErr != nil { if _, ok := retErrCounterByRUL[checkEntry.dstURL]; !ok { retErrCounterByRUL[checkEntry.dstURL] = make(map[string]int) } retErrCounterByRUL[checkEntry.dstURL][checkEntry.retErr.Error()] += 1 continue } if checkEntry.retHTTPStatusCode != http.StatusOK { if _, ok := retStatusNotOKCounterByURL[checkEntry.dstURL]; !ok { retStatusNotOKCounterByURL[checkEntry.dstURL] = make(map[int]int) } retStatusNotOKCounterByURL[checkEntry.dstURL][checkEntry.retHTTPStatusCode] += 1 continue } retBodyCounterByURL[checkEntry.dstURL][checkEntry.retHTTPBody] += 1 } var checkErrors []error for url, retErrCounter := range retErrCounterByRUL { for retErr, count := range retErrCounter { f.Logger.Warn("expect traffic from pod to URL succeed", zap.String("pod", k8s.NamespacedName(pod).String()), zap.String("url", url), zap.String("error", retErr), zap.Int("count", count), ) checkErrors = append(checkErrors, errors.Errorf("expect traffic from pod %v to URL %v succeed, got err: %v, count: %v", k8s.NamespacedName(pod).String(), url, retErr, count, )) } } for url, retStatusNotOKCounter := range retStatusNotOKCounterByURL { for retStatusNotOK, count := range retStatusNotOKCounter { f.Logger.Warn("expect traffic from pod to URL succeed", zap.String("pod", k8s.NamespacedName(pod).String()), zap.String("url", url), zap.Int("status_code", retStatusNotOK), zap.Int("count", count), ) checkErrors = append(checkErrors, errors.Errorf("expect traffic from pod %v to URL %v succeed, got status_code: %v, count: %v", k8s.NamespacedName(pod).String(), url, retStatusNotOK, count, )) } } uniformDist := distuv.ChiSquared{K: float64(s.TargetsCountPerRoute - 1)} var expectedHTTPRetCounts []float64 for i := 0; i != s.TargetsCountPerRoute; i++ { expectedHTTPRetCounts = append(expectedHTTPRetCounts, float64(s.ConnectivityCheckPerURL)/float64(s.TargetsCountPerRoute)) } for url, retBodyCounter := range retBodyCounterByURL { var actualHTTPRetCounts []float64 actualHTTPRetCountLogFields := []zap.Field{zap.Namespace("distribution")} for retBody, count := range retBodyCounter { actualHTTPRetCounts = append(actualHTTPRetCounts, float64(count)) actualHTTPRetCountLogFields = append(actualHTTPRetCountLogFields, zap.Int(retBody, count)) } f.Logger.With(actualHTTPRetCountLogFields...).Info("traffic from pod to URL", zap.String("pod", k8s.NamespacedName(pod).String()), zap.String("url", url), ) httpRetCountsDiff := len(expectedHTTPRetCounts) - len(actualHTTPRetCounts) for i := 0; i < httpRetCountsDiff; i++ { actualHTTPRetCounts = append(actualHTTPRetCounts, 0) } chiSqStatics := stat.ChiSquare(actualHTTPRetCounts, expectedHTTPRetCounts) pv := 1 - uniformDist.CDF(chiSqStatics) if pv < connectivityCheckUniformDistributionSL { f.Logger.Warn("expect traffic from pod to URL to be even distributed", zap.String("pod", k8s.NamespacedName(pod).String()), zap.String("url", url), zap.Float64("significance level", connectivityCheckUniformDistributionSL), zap.Float64("pValue", pv), ) } } return checkErrors } // one entry of connectivity check result. type connectivityCheckEntry struct { dstVirtualService types.NamespacedName dstURL string retHTTPStatusCode int retHTTPBody string retErr error } func (s *DynamicStack) obtainPodToVirtualServiceConnectivityEntries(ctx context.Context, f *framework.Framework, pod *corev1.Pod, vsIndexes sets.Int) ([]connectivityCheckEntry, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() var checkEntries []connectivityCheckEntry for vsIndex := range vsIndexes { vs := s.createdServiceVSs[vsIndex] vr := s.createdServiceVRs[vsIndex] for _, route := range vr.Spec.Routes { path := aws.StringValue(route.HTTPRoute.Match.Prefix) checkEntries = append(checkEntries, connectivityCheckEntry{ dstVirtualService: k8s.NamespacedName(vs), dstURL: fmt.Sprintf("http://%s:%d%s", aws.StringValue(vs.Spec.AWSName), AppContainerPort, path), }) } } pfErrChan := make(chan error) pfReadyChan := make(chan struct{}) portForwarder, err := k8s.NewPortForwarder(ctx, f.RestCfg, pod, []string{fmt.Sprintf("%d:%d", connectivityCheckProxyPort, HttpProxyContainerPort)}, pfReadyChan) if err != nil { return nil, err } go func() { pfErrChan <- portForwarder.ForwardPorts() }() proxyURL, err := url.Parse(fmt.Sprintf("http://localhost:%d", connectivityCheckProxyPort)) if err != nil { return nil, err } proxyClient := &http.Client{Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)}} checkEntriesRetChan := make(chan connectivityCheckEntry) go func() { var wg sync.WaitGroup throttle := time.Tick(connectivityCheckRate) for _, entry := range checkEntries { for i := 0; i != s.ConnectivityCheckPerURL; i++ { <-throttle wg.Add(1) go func(entry connectivityCheckEntry) { defer wg.Done() <-pfReadyChan resp, err := proxyClient.Get(entry.dstURL) if err != nil { entry.retErr = err checkEntriesRetChan <- entry return } entry.retHTTPStatusCode = resp.StatusCode body, err := ioutil.ReadAll(resp.Body) if err != nil { entry.retErr = err checkEntriesRetChan <- entry return } entry.retHTTPBody = string(body) checkEntriesRetChan <- entry }(entry) } } wg.Wait() close(checkEntriesRetChan) }() var checkEntriesRet []connectivityCheckEntry for ret := range checkEntriesRetChan { checkEntriesRet = append(checkEntriesRet, ret) } cancel() return checkEntriesRet, <-pfErrChan }