# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import os import logging from botocore.compat import OrderedDict from awscli.customizations.commands import BasicCommand from awscli.customizations.utils import uni_print from awscli.customizations.eks.exceptions import EKSClusterError from awscli.customizations.eks.kubeconfig import (Kubeconfig, KubeconfigError, KubeconfigLoader, KubeconfigWriter, KubeconfigValidator, KubeconfigAppender) from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump LOG = logging.getLogger(__name__) DEFAULT_PATH = os.path.expanduser("~/.kube/config") # At the time EKS no longer supports Kubernetes v1.21 (probably ~Dec 2023), # this can be safely changed to default to writing "v1" API_VERSION = "client.authentication.k8s.io/v1beta1" class UpdateKubeconfigCommand(BasicCommand): NAME = 'update-kubeconfig' DESCRIPTION = BasicCommand.FROM_FILE( 'eks', 'update-kubeconfig', '_description.rst' ) ARG_TABLE = [ { 'name': 'name', 'help_text': ("The name of the cluster for which " "to create a kubeconfig entry. " "This cluster must exist in your account and in the " "specified or configured default Region " "for your AWS CLI installation."), 'required': True }, { 'name': 'kubeconfig', 'help_text': ("Optionally specify a kubeconfig file to append " "with your configuration. " "By default, the configuration is written to the " "first file path in the KUBECONFIG " "environment variable (if it is set) " "or the default kubeconfig path (.kube/config) " "in your home directory."), 'required': False }, { 'name': 'role-arn', 'help_text': ("To assume a role for cluster authentication, " "specify an IAM role ARN with this option. " "For example, if you created a cluster " "while assuming an IAM role, " "then you must also assume that role to " "connect to the cluster the first time."), 'required': False }, { 'name': 'dry-run', 'action': 'store_true', 'default': False, 'help_text': ("Print the merged kubeconfig to stdout instead of " "writing it to the specified file."), 'required': False }, { 'name': 'verbose', 'action': 'store_true', 'default': False, 'help_text': ("Print more detailed output " "when writing to the kubeconfig file, " "including the appended entries.") }, { 'name': 'alias', 'help_text': ("Alias for the cluster context name. " "Defaults to match cluster ARN."), 'required': False }, { 'name': 'user-alias', 'help_text': ("Alias for the generated user name. " "Defaults to match cluster ARN."), 'required': False } ] def _display_entries(self, entries): """ Display entries in yaml format :param entries: a list of OrderedDicts to be printed :type entries: list """ uni_print("Entries:\n\n") for entry in entries: uni_print(ordered_yaml_dump(entry)) uni_print("\n") def _run_main(self, parsed_args, parsed_globals): client = EKSClient(self._session, parsed_args.name, parsed_args.role_arn, parsed_globals) new_cluster_dict = client.get_cluster_entry() new_user_dict = client.get_user_entry(user_alias=parsed_args.user_alias) config_selector = KubeconfigSelector( os.environ.get("KUBECONFIG", ""), parsed_args.kubeconfig ) config = config_selector.choose_kubeconfig( new_cluster_dict["name"] ) updating_existing = config.has_cluster(new_cluster_dict["name"]) appender = KubeconfigAppender() new_context_dict = appender.insert_cluster_user_pair(config, new_cluster_dict, new_user_dict, parsed_args.alias) if parsed_args.dry_run: uni_print(config.dump_content()) else: writer = KubeconfigWriter() writer.write_kubeconfig(config) if updating_existing: uni_print("Updated context {0} in {1}\n".format( new_context_dict["name"], config.path )) else: uni_print("Added new context {0} to {1}\n".format( new_context_dict["name"], config.path )) if parsed_args.verbose: self._display_entries([ new_context_dict, new_user_dict, new_cluster_dict ]) class KubeconfigSelector(object): def __init__(self, env_variable, path_in, validator=None, loader=None): """ Parse KUBECONFIG into a list of absolute paths. Also replace the empty list with DEFAULT_PATH :param env_variable: KUBECONFIG as a long string :type env_variable: string :param path_in: The path passed in through the CLI :type path_in: string or None """ if validator is None: validator = KubeconfigValidator() self._validator = validator if loader is None: loader = KubeconfigLoader(validator) self._loader = loader if path_in is not None: # Override environment variable self._paths = [self._expand_path(path_in)] else: # Get the list of paths from the environment variable if env_variable == "": env_variable = DEFAULT_PATH self._paths = [self._expand_path(element) for element in env_variable.split(os.pathsep) if len(element.strip()) > 0] if len(self._paths) == 0: self._paths = [DEFAULT_PATH] def choose_kubeconfig(self, cluster_name): """ Choose which kubeconfig file to read from. If name is already an entry in one of the $KUBECONFIG files, choose that one. Otherwise choose the first file. :param cluster_name: The name of the cluster which is going to be added :type cluster_name: String :return: a chosen Kubeconfig based on above rules :rtype: Kubeconfig """ # Search for an existing entry to update for candidate_path in self._paths: try: loaded_config = self._loader.load_kubeconfig(candidate_path) if loaded_config.has_cluster(cluster_name): LOG.debug("Found entry to update at {0}".format( candidate_path )) return loaded_config except KubeconfigError as e: LOG.warning("Passing {0}:{1}".format(candidate_path, e)) # No entry was found, use the first file in KUBECONFIG # # Note: This could raise KubeconfigErrors if paths[0] is corrupted return self._loader.load_kubeconfig(self._paths[0]) def _expand_path(self, path): """ A helper to expand a path to a full absolute path. """ return os.path.abspath(os.path.expanduser(path)) class EKSClient(object): def __init__(self, session, cluster_name, role_arn, parsed_globals=None): self._session = session self._cluster_name = cluster_name self._role_arn = role_arn self._cluster_description = None self._globals = parsed_globals def _get_cluster_description(self): """ Use an eks describe-cluster call to get the cluster description Cache the response in self._cluster_description. describe-cluster will only be called once. """ if self._cluster_description is None: if self._globals is None: client = self._session.create_client("eks") else: client = self._session.create_client( "eks", region_name=self._globals.region, endpoint_url=self._globals.endpoint_url, verify=self._globals.verify_ssl ) full_description = client.describe_cluster(name=self._cluster_name) self._cluster_description = full_description["cluster"] if "status" not in self._cluster_description: raise EKSClusterError("Cluster not found") if self._cluster_description["status"] not in ["ACTIVE", "UPDATING"]: raise EKSClusterError("Cluster status is {0}".format( self._cluster_description["status"] )) return self._cluster_description def get_cluster_entry(self): """ Return a cluster entry generated using the previously obtained description. """ cert_data = self._get_cluster_description().get("certificateAuthority", {"data": ""})["data"] endpoint = self._get_cluster_description().get("endpoint") arn = self._get_cluster_description().get("arn") return OrderedDict([ ("cluster", OrderedDict([ ("certificate-authority-data", cert_data), ("server", endpoint) ])), ("name", arn) ]) def get_user_entry(self, user_alias=None): """ Return a user entry generated using the previously obtained description. """ cluster_description = self._get_cluster_description() region = cluster_description.get("arn").split(":")[3] outpost_config = cluster_description.get("outpostConfig") if outpost_config is None: cluster_identification_parameter = "--cluster-name" cluster_identification_value = self._cluster_name else: # If cluster contains outpostConfig, use id for identification cluster_identification_parameter = "--cluster-id" cluster_identification_value = cluster_description.get("id") generated_user = OrderedDict([ ("name", user_alias or self._get_cluster_description().get("arn", "")), ("user", OrderedDict([ ("exec", OrderedDict([ ("apiVersion", API_VERSION), ("args", [ "--region", region, "eks", "get-token", cluster_identification_parameter, cluster_identification_value, "--output", "json", ]), ("command", "aws"), ])) ])) ]) if self._role_arn is not None: generated_user["user"]["exec"]["args"].extend([ "--role", self._role_arn ]) if self._session.profile: generated_user["user"]["exec"]["env"] = [OrderedDict([ ("name", "AWS_PROFILE"), ("value", self._session.profile) ])] return generated_user