package snow import ( "context" "fmt" "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/eks-anywhere/pkg/api/v1alpha1" "github.com/aws/eks-anywhere/pkg/bootstrapper" "github.com/aws/eks-anywhere/pkg/clients/kubernetes" "github.com/aws/eks-anywhere/pkg/cluster" "github.com/aws/eks-anywhere/pkg/constants" "github.com/aws/eks-anywhere/pkg/logger" "github.com/aws/eks-anywhere/pkg/providers" providerValidator "github.com/aws/eks-anywhere/pkg/providers/validator" "github.com/aws/eks-anywhere/pkg/retrier" "github.com/aws/eks-anywhere/pkg/templater" "github.com/aws/eks-anywhere/pkg/types" releasev1alpha1 "github.com/aws/eks-anywhere/release/api/v1alpha1" ) const ( eksaSnowCredentialsFileKey = "EKSA_AWS_CREDENTIALS_FILE" eksaSnowCABundlesFileKey = "EKSA_AWS_CA_BUNDLES_FILE" snowCredentialsKey = "AWS_B64ENCODED_CREDENTIALS" snowCertsKey = "AWS_B64ENCODED_CA_BUNDLES" maxRetries = 30 backOffPeriod = 5 * time.Second ) var ( snowDatacenterResourceType = fmt.Sprintf("snowdatacenterconfigs.%s", v1alpha1.GroupVersion.Group) snowMachineResourceType = fmt.Sprintf("snowmachineconfigs.%s", v1alpha1.GroupVersion.Group) ) type SnowProvider struct { kubeUnAuthClient KubeUnAuthClient retrier *retrier.Retrier configManager *ConfigManager ipValidator *providerValidator.IPValidator skipIpCheck bool log logr.Logger } type KubeUnAuthClient interface { KubeconfigClient(kubeconfig string) kubernetes.Client Apply(ctx context.Context, kubeconfig string, obj runtime.Object) error } func NewProvider(kubeUnAuthClient KubeUnAuthClient, configManager *ConfigManager, skipIpCheck bool) *SnowProvider { retrier := retrier.NewWithMaxRetries(maxRetries, backOffPeriod) return &SnowProvider{ kubeUnAuthClient: kubeUnAuthClient, retrier: retrier, configManager: configManager, ipValidator: providerValidator.NewIPValidator(), skipIpCheck: skipIpCheck, log: logger.Get(), } } func (p *SnowProvider) Name() string { return constants.SnowProviderName } func (p *SnowProvider) SetupAndValidateCreateCluster(ctx context.Context, clusterSpec *cluster.Spec) error { if err := p.validateUpgradeRolloutStrategy(clusterSpec); err != nil { return fmt.Errorf("failed setup and validations: %v", err) } if err := p.configManager.SetDefaultsAndValidate(ctx, clusterSpec.Config); err != nil { return fmt.Errorf("setting defaults and validate snow config: %v", err) } if !p.skipIpCheck { if err := p.ipValidator.ValidateControlPlaneIPUniqueness(clusterSpec.Cluster); err != nil { return err } } else { logger.Info("Skipping check for whether control plane ip is in use") } return nil } func (p *SnowProvider) SetupAndValidateUpgradeCluster(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec, _ *cluster.Spec) error { if err := p.validateUpgradeRolloutStrategy(clusterSpec); err != nil { return fmt.Errorf("failed setup and validations: %v", err) } if err := p.configManager.SetDefaultsAndValidate(ctx, clusterSpec.Config); err != nil { return fmt.Errorf("setting defaults and validate snow config: %v", err) } return nil } func (p *SnowProvider) SetupAndValidateDeleteCluster(ctx context.Context, _ *types.Cluster, clusterSpec *cluster.Spec) error { if err := p.validateUpgradeRolloutStrategy(clusterSpec); err != nil { return fmt.Errorf("failed setup and validations: %v", err) } if err := SetupEksaCredentialsSecret(clusterSpec.Config); err != nil { return fmt.Errorf("setting up credentials: %v", err) } return nil } func (p *SnowProvider) UpdateSecrets(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) error { if err := p.kubeUnAuthClient.Apply(ctx, cluster.KubeconfigFile, clusterSpec.SnowCredentialsSecret); err != nil { return fmt.Errorf("applying eks-a snow credentials secret in cluster: %v", err) } return nil } // CAPIObjects generates the control plane and worker nodes objects for snow provider from clusterSpec. func CAPIObjects(ctx context.Context, log logr.Logger, clusterSpec *cluster.Spec, kubeClient kubernetes.Client) (controlPlaneSpec, workersSpec []byte, err error) { controlPlaneObjs, err := ControlPlaneObjects(ctx, log, clusterSpec, kubeClient) if err != nil { return nil, nil, err } controlPlaneSpec, err = templater.ObjectsToYaml(kubernetesToRuntimeObjects(controlPlaneObjs)...) if err != nil { return nil, nil, err } workersObjs, err := WorkersObjects(ctx, log, clusterSpec, kubeClient) if err != nil { return nil, nil, err } workersSpec, err = templater.ObjectsToYaml(kubernetesToRuntimeObjects(workersObjs)...) if err != nil { return nil, nil, err } return controlPlaneSpec, workersSpec, nil } func kubernetesToRuntimeObjects(objs []kubernetes.Object) []runtime.Object { runtimeObjs := make([]runtime.Object, 0, len(objs)) for _, o := range objs { runtimeObjs = append(runtimeObjs, o) } return runtimeObjs } func (p *SnowProvider) generateCAPISpec(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) (controlPlaneSpec, workersSpec []byte, err error) { kubeconfigClient := p.kubeUnAuthClient.KubeconfigClient(cluster.KubeconfigFile) return CAPIObjects(ctx, p.log, clusterSpec, kubeconfigClient) } func (p *SnowProvider) GenerateCAPISpecForCreate(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) (controlPlaneSpec, workersSpec []byte, err error) { return p.generateCAPISpec(ctx, cluster, clusterSpec) } func (p *SnowProvider) GenerateCAPISpecForUpgrade(ctx context.Context, bootstrapCluster, _ *types.Cluster, _ *cluster.Spec, clusterSpec *cluster.Spec) (controlPlaneSpec, workersSpec []byte, err error) { return p.generateCAPISpec(ctx, bootstrapCluster, clusterSpec) } func (p *SnowProvider) PreCAPIInstallOnBootstrap(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) error { return nil } func (p *SnowProvider) PostBootstrapSetup(ctx context.Context, clusterConfig *v1alpha1.Cluster, cluster *types.Cluster) error { return nil } // PostBootstrapDeleteForUpgrade runs any provider-specific operations after bootstrap cluster has been deleted. func (p *SnowProvider) PostBootstrapDeleteForUpgrade(ctx context.Context, cluster *types.Cluster) error { return nil } func (p *SnowProvider) PostBootstrapSetupUpgrade(ctx context.Context, clusterConfig *v1alpha1.Cluster, cluster *types.Cluster) error { return nil } func (p *SnowProvider) PostWorkloadInit(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) error { return nil } func (p *SnowProvider) BootstrapClusterOpts(_ *cluster.Spec) ([]bootstrapper.BootstrapClusterOption, error) { return nil, nil } func (p *SnowProvider) UpdateKubeConfig(content *[]byte, clusterName string) error { return nil } func (p *SnowProvider) Version(clusterSpec *cluster.Spec) string { versionsBundle := clusterSpec.ControlPlaneVersionsBundle() return versionsBundle.Snow.Version } func (p *SnowProvider) EnvMap(clusterSpec *cluster.Spec) (map[string]string, error) { envMap := make(map[string]string) envMap[snowCredentialsKey] = string(clusterSpec.SnowCredentialsSecret.Data[v1alpha1.SnowCredentialsKey]) envMap[snowCertsKey] = string(clusterSpec.SnowCredentialsSecret.Data[v1alpha1.SnowCertificatesKey]) versionsBundle := clusterSpec.ControlPlaneVersionsBundle() envMap["SNOW_CONTROLLER_IMAGE"] = versionsBundle.Snow.Manager.VersionedImage() return envMap, nil } func (p *SnowProvider) GetDeployments() map[string][]string { return map[string][]string{ constants.CapasSystemNamespace: {"capas-controller-manager"}, } } func (p *SnowProvider) GetInfrastructureBundle(clusterSpec *cluster.Spec) *types.InfrastructureBundle { versionsBundle := clusterSpec.ControlPlaneVersionsBundle() folderName := fmt.Sprintf("infrastructure-snow/%s/", versionsBundle.Snow.Version) infraBundle := types.InfrastructureBundle{ FolderName: folderName, Manifests: []releasev1alpha1.Manifest{ versionsBundle.Snow.Components, versionsBundle.Snow.Metadata, }, } return &infraBundle } func (p *SnowProvider) DatacenterConfig(clusterSpec *cluster.Spec) providers.DatacenterConfig { return clusterSpec.SnowDatacenter } func (p *SnowProvider) DatacenterResourceType() string { return snowDatacenterResourceType } func (p *SnowProvider) MachineResourceType() string { return snowMachineResourceType } func (p *SnowProvider) MachineConfigs(clusterSpec *cluster.Spec) []providers.MachineConfig { configs := make([]providers.MachineConfig, 0, len(clusterSpec.SnowMachineConfigs)) for _, mc := range clusterSpec.SnowMachineConfigs { configs = append(configs, mc) } return configs } func (p *SnowProvider) ValidateNewSpec(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) error { return nil } func (p *SnowProvider) ChangeDiff(currentSpec, newSpec *cluster.Spec) *types.ComponentChangeDiff { currentVersionsBundle := currentSpec.ControlPlaneVersionsBundle() newVersionsBundle := newSpec.ControlPlaneVersionsBundle() if currentVersionsBundle.Snow.Version == newVersionsBundle.Snow.Version { return nil } return &types.ComponentChangeDiff{ ComponentName: constants.SnowProviderName, NewVersion: newVersionsBundle.Snow.Version, OldVersion: currentVersionsBundle.Snow.Version, } } func (p *SnowProvider) RunPostControlPlaneUpgrade(ctx context.Context, oldClusterSpec *cluster.Spec, clusterSpec *cluster.Spec, workloadCluster *types.Cluster, managementCluster *types.Cluster) error { return nil } func bundleImagesEqual(new, old releasev1alpha1.SnowBundle) bool { return new.Manager.ImageDigest == old.Manager.ImageDigest && new.KubeVip.ImageDigest == old.KubeVip.ImageDigest } func (p *SnowProvider) machineConfigsChanged(ctx context.Context, cluster *types.Cluster, spec *cluster.Spec) (bool, error) { client := p.kubeUnAuthClient.KubeconfigClient(cluster.KubeconfigFile) for _, new := range spec.SnowMachineConfigs { old := &v1alpha1.SnowMachineConfig{} err := client.Get(ctx, new.Name, namespaceOrDefault(new), old) if apierrors.IsNotFound(err) { return true, nil } if err != nil { return false, err } if len(new.Spec.Devices) != len(old.Spec.Devices) || !equality.Semantic.DeepDerivative(new.Spec, old.Spec) { return true, nil } } return false, nil } func (p *SnowProvider) datacenterChanged(ctx context.Context, cluster *types.Cluster, spec *cluster.Spec) (bool, error) { client := p.kubeUnAuthClient.KubeconfigClient(cluster.KubeconfigFile) new := spec.SnowDatacenter old := &v1alpha1.SnowDatacenterConfig{} err := client.Get(ctx, new.Name, namespaceOrDefault(new), old) if apierrors.IsNotFound(err) { return true, nil } if err != nil { return false, err } return !equality.Semantic.DeepDerivative(new.Spec, old.Spec), nil } // namespaceOrDefault return the object namespace or default if it's empty. func namespaceOrDefault(obj client.Object) string { ns := obj.GetNamespace() if ns == "" { ns = "default" } return ns } func (p *SnowProvider) validateUpgradeRolloutStrategy(clusterSpec *cluster.Spec) error { if clusterSpec.Cluster.Spec.ControlPlaneConfiguration.UpgradeRolloutStrategy != nil { return fmt.Errorf("Upgrade rollout strategy customization is not supported for snow provider") } for _, workerNodeGroupConfiguration := range clusterSpec.Cluster.Spec.WorkerNodeGroupConfigurations { if workerNodeGroupConfiguration.UpgradeRolloutStrategy != nil { return fmt.Errorf("Upgrade rollout strategy customization is not supported for snow provider") } } return nil } // UpgradeNeeded compares the new snow version bundle and objects with the existing ones in the cluster and decides whether // to trigger a cluster upgrade or not. // TODO: revert the change once cluster.BuildSpec is used in cluster_manager to replace the deprecated cluster.BuildSpecForCluster func (p *SnowProvider) UpgradeNeeded(ctx context.Context, newSpec, oldSpec *cluster.Spec, c *types.Cluster) (bool, error) { oldVersionBundle := oldSpec.ControlPlaneVersionsBundle() newVersionsBundle := newSpec.ControlPlaneVersionsBundle() if !bundleImagesEqual(newVersionsBundle.Snow, oldVersionBundle.Snow) { return true, nil } datacenterChanged, err := p.datacenterChanged(ctx, c, newSpec) if err != nil { return false, err } if datacenterChanged { return true, nil } return p.machineConfigsChanged(ctx, c, newSpec) } func (p *SnowProvider) DeleteResources(ctx context.Context, clusterSpec *cluster.Spec) error { client := p.kubeUnAuthClient.KubeconfigClient(clusterSpec.ManagementCluster.KubeconfigFile) for _, mc := range clusterSpec.SnowMachineConfigs { mc.Namespace = namespaceOrDefault(mc) if err := client.Delete(ctx, mc); err != nil && !apierrors.IsNotFound(err) { return err } } clusterSpec.SnowDatacenter.Namespace = namespaceOrDefault(clusterSpec.SnowDatacenter) if err := client.Delete(ctx, clusterSpec.SnowDatacenter); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("deleting snow datacenter: %v", err) } return nil } func (p *SnowProvider) PostClusterDeleteValidate(_ context.Context, _ *types.Cluster) error { // No validations return nil } func (p *SnowProvider) PostMoveManagementToBootstrap(_ context.Context, _ *types.Cluster) error { // NOOP return nil } func (p *SnowProvider) InstallCustomProviderComponents(ctx context.Context, kubeconfigFile string) error { return nil } // PreCoreComponentsUpgrade staisfies the Provider interface. func (p *SnowProvider) PreCoreComponentsUpgrade( ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec, ) error { return nil }