package controllers import ( "context" etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1" "github.com/go-logr/logr" "github.com/pkg/errors" "k8s.io/klog/v2" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/controller-runtime/pkg/client" ) // TODO(g-gaston): remove this once we have a stable CAPI repo that contains this, // MachineEtcdReadyLabelName is the label set on machines that have succesfully joined the etcd cluster. const MachineEtcdReadyLabelName = "cluster.x-k8s.io/etcd-ready" type etcdMachines map[string]etcdMachine // endpoints returns all the API endpoints for the machines that have one available. func (e etcdMachines) endpoints() []string { endpoints := make([]string, 0, len(e)) for _, m := range e { if m.endpoint != "" { endpoints = append(endpoints, m.endpoint) } } return endpoints } // etcdMachine represents a Machine that should be a member of an etcd cluster. type etcdMachine struct { *clusterv1.Machine endpoint string listening bool healthError error } func (e etcdMachine) healthy() bool { return e.listening && e.healthError == nil } // updateMachinesEtcdReadyLabel adds the etcd-ready label to the machines that have joined the etcd cluster. func (r *EtcdadmClusterReconciler) updateMachinesEtcdReadyLabel(ctx context.Context, log logr.Logger, machines etcdMachines) error { for _, m := range machines { if _, ok := m.Labels[MachineEtcdReadyLabelName]; ok { continue } if !m.healthy() { log.Info("Machine not healthy yet", "machine", klog.KObj(m.Machine), "listening", m.listening, "healthError", m.healthError, "endpoint", m.endpoint) continue } m.Labels[MachineEtcdReadyLabelName] = "true" if err := r.Client.Update(ctx, m.Machine); err != nil { return errors.Wrapf(err, "adding etcd ready label to machine %s", m.Name) } } return nil } // checkOwnedMachines verifies the health of all etcd members. func (r *EtcdadmClusterReconciler) checkOwnedMachines(ctx context.Context, log logr.Logger, etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (etcdMachines, error) { ownedMachines, err := r.getCurrentOwnedMachines(ctx, etcdadmCluster, cluster) if err != nil { return nil, err } machines := make(etcdMachines, len(ownedMachines)) for k, machine := range ownedMachines { m := etcdMachine{Machine: machine} endpoint := getMachineEtcdEndpoint(machine) if endpoint == "" { machines[k] = m continue } err := r.performEndpointHealthCheck(ctx, cluster, endpoint, true) // This is not ideal, performEndpointHealthCheck uses an error to signal both a not ready/unhealthy member // and also transient errors when performing such check. // Ideally we would separate these 2 so we can abort on error and mark as unhealthy separetly m.healthError = err if errors.Is(err, portNotOpenErr) { log.Info("Machine is not listening yet, this is probably transient, while etcd starts", "endpoint", endpoint) } else { m.endpoint = endpoint m.listening = true } machines[k] = m } return machines, nil } // getCurrentOwnedMachines lists all the owned machines by the etcdadm cluster. func (r *EtcdadmClusterReconciler) getCurrentOwnedMachines(ctx context.Context, etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (collections.Machines, error) { var client client.Reader if conditions.IsFalse(etcdadmCluster, etcdv1.EtcdMachinesSpecUpToDateCondition) { // During upgrade with current logic, outdated machines don't get deleted right away. // the controller removes their etcdadmCluster ownerRef and updates the Machine. So using uncachedClient here will fetch those changes client = r.uncachedClient } else { client = r.Client } etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, client, cluster, EtcdClusterMachines(cluster.Name, etcdadmCluster.Name)) if err != nil { return nil, errors.Wrap(err, "reading machines for etcd cluster") } ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdadmCluster)) return ownedMachines, nil } // getMachineEtcdEndpoint constructs the full API url for an etcd member Machine. // If the Machine doesn't have yet the right address, it returns empty string. func getMachineEtcdEndpoint(machine *clusterv1.Machine) string { address := getEtcdMachineAddress(machine) if address == "" { return "" } return getMemberClientURL(address) }