package addon_tests import ( "fmt" "testing" "github.com/aws/amazon-vpc-cni-k8s/test/framework" "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/aws/services" "github.com/aws/amazon-vpc-cni-k8s/test/framework/utils" v1 "k8s.io/api/core/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) const InstanceTypeNodeLabelKey = "beta.kubernetes.io/instance-type" var f *framework.Framework var maxIPPerInterface int var primaryNode v1.Node var secondaryNode v1.Node var vpcCIDRs []string var clusterVersion string var latestAddonVersion string func TestCNIPodNetworking(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "CNI Pod Networking Suite") } var _ = BeforeSuite(func() { f = framework.New(framework.GlobalOptions) By("creating test namespace") f.K8sResourceManagers.NamespaceManager(). CreateNamespace(utils.DefaultTestNamespace) By(fmt.Sprintf("getting the node with the node label key %s and value %s", f.Options.NgNameLabelKey, f.Options.NgNameLabelVal)) nodes, err := f.K8sResourceManagers.NodeManager().GetNodes(f.Options.NgNameLabelKey, f.Options.NgNameLabelVal) Expect(err).ToNot(HaveOccurred()) By("verifying that atleast 1 nodes is present for the test") Expect(len(nodes.Items)).Should(BeNumerically(">", 0)) // Set the primary and secondary node for testing primaryNode = nodes.Items[0] secondaryNode = nodes.Items[1] By("getting the instance type from node label " + InstanceTypeNodeLabelKey) instanceType := primaryNode.Labels[InstanceTypeNodeLabelKey] By("getting the network interface details from ec2") instanceOutput, err := f.CloudServices.EC2().DescribeInstanceType(instanceType) Expect(err).ToNot(HaveOccurred()) maxIPPerInterface = int(*instanceOutput[0].NetworkInfo.Ipv4AddressesPerInterface) By("describing the VPC to get the VPC CIDRs") describeVPCOutput, err := f.CloudServices.EC2().DescribeVPC(f.Options.AWSVPCID) Expect(err).ToNot(HaveOccurred()) for _, cidrBlockAssociationSet := range describeVPCOutput.Vpcs[0].CidrBlockAssociationSet { vpcCIDRs = append(vpcCIDRs, *cidrBlockAssociationSet.CidrBlock) } By("getting current cluster version") clusterOutput, err := f.CloudServices.EKS().DescribeCluster(f.Options.ClusterName) Expect(err).NotTo(HaveOccurred()) clusterVersion = *clusterOutput.Cluster.Version By("getting latest vpc-cni addon version") latestAddonVersion, err = f.CloudServices.EKS().GetLatestVersion(&services.AddonInput{ AddonName: "vpc-cni", K8sVersion: clusterVersion, }) Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { By("deleting test namespace") f.K8sResourceManagers.NamespaceManager(). DeleteAndWaitTillNamespaceDeleted(utils.DefaultTestNamespace) })