package locking import ( "context" "encoding/json" "fmt" "github.com/go-logr/logr" "github.com/pkg/errors" apicorev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" ) const semaphoreInformationKey = "lock-information" // EtcdadmInitMutex uses a ConfigMap to synchronize cluster initialization. type EtcdadmInitMutex struct { log logr.Logger client client.Client } // NewControlPlaneInitMutex returns a lock that can be held by a control plane node before init. func NewEtcdadmInitMutex(log logr.Logger, client client.Client) *EtcdadmInitMutex { return &EtcdadmInitMutex{ log: log, client: client, } } // Lock allows a control plane node to be the first and only node to run kubeadm init func (c *EtcdadmInitMutex) Lock(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine) bool { sema := newSemaphore() cmName := configMapName(cluster.Name) log := c.log.WithValues("namespace", cluster.Namespace, "cluster-name", cluster.Name, "configmap-name", cmName, "machine-name", machine.Name) err := c.client.Get(ctx, client.ObjectKey{ Namespace: cluster.Namespace, Name: cmName, }, sema.ConfigMap) switch { case apierrors.IsNotFound(err): break case err != nil: log.Error(err, "Failed to acquire lock") return false default: // successfully found an existing config map info, err := sema.information() if err != nil { log.Error(err, "Failed to get information about the existing lock") return false } // the machine requesting the lock is the machine that created the lock, therefore the lock is acquired if info.MachineName == machine.Name { return true } log.Info("Waiting on another machine to initialize", "init-machine", info.MachineName) return false } // Adds owner reference, namespace and name sema.setMetadata(cluster) // Adds the additional information if err := sema.setInformation(&information{MachineName: machine.Name}); err != nil { log.Error(err, "Failed to acquire lock while setting semaphore information") return false } log.Info("Attempting to acquire the lock") err = c.client.Create(ctx, sema.ConfigMap) switch { case apierrors.IsAlreadyExists(err): log.Info("Cannot acquire the lock. The lock has been acquired by someone else") return false case err != nil: log.Error(err, "Error acquiring the lock") return false default: return true } } // Unlock releases the lock func (c *EtcdadmInitMutex) Unlock(ctx context.Context, cluster *clusterv1.Cluster) bool { sema := newSemaphore() cmName := configMapName(cluster.Name) log := c.log.WithValues("namespace", cluster.Namespace, "cluster-name", cluster.Name, "configmap-name", cmName) log.Info("Checking for lock") err := c.client.Get(ctx, client.ObjectKey{ Namespace: cluster.Namespace, Name: cmName, }, sema.ConfigMap) switch { case apierrors.IsNotFound(err): log.Info("Control plane init lock not found, it may have been released already") return true case err != nil: log.Error(err, "Error unlocking the control plane init lock") return false default: // Delete the config map semaphore if there is no error fetching it if err := c.client.Delete(ctx, sema.ConfigMap); err != nil { if apierrors.IsNotFound(err) { return true } log.Error(err, "Error deleting the config map underlying the control plane init lock") return false } return true } } type information struct { MachineName string `json:"machineName"` } type semaphore struct { *apicorev1.ConfigMap } func newSemaphore() *semaphore { return &semaphore{&apicorev1.ConfigMap{}} } func configMapName(clusterName string) string { return fmt.Sprintf("%s-etcd-lock", clusterName) } func (s semaphore) information() (*information, error) { li := &information{} if err := json.Unmarshal([]byte(s.Data[semaphoreInformationKey]), li); err != nil { return nil, errors.Wrap(err, "failed to unmarshal semaphore information") } return li, nil } func (s semaphore) setInformation(information *information) error { b, err := json.Marshal(information) if err != nil { return errors.Wrap(err, "failed to marshal semaphore information") } s.Data = map[string]string{} s.Data[semaphoreInformationKey] = string(b) return nil } func (s *semaphore) setMetadata(cluster *clusterv1.Cluster) { s.ObjectMeta = metav1.ObjectMeta{ Namespace: cluster.Namespace, Name: configMapName(cluster.Name), Labels: map[string]string{ clusterv1.ClusterNameLabel: cluster.Name, }, OwnerReferences: []metav1.OwnerReference{ { APIVersion: cluster.APIVersion, Kind: cluster.Kind, Name: cluster.Name, UID: cluster.UID, }, }, } }