# Copyright (c) 2022, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, as # published by the Free Software Foundation. # # This program is also distributed with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, # as designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an # additional permission to link the program and your derivative works # with the separately licensed software that they have included with # MySQL. # # Without limiting anything contained in the foregoing, this file, # which is part of MySQL Connector/Python, is also subject to the # Universal FOSS Exception, version 1.0, a copy of which can be found at # http://oss.oracle.com/licenses/universal-foss-exception. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # mypy: disable-error-code="arg-type,union-attr,call-arg" """OCI Authentication Plugin.""" import json import os from base64 import b64encode from pathlib import Path from typing import Any, Dict, Optional from .. import errors from ..logger import logger try: from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric.types import PRIVATE_KEY_TYPES except ImportError: raise errors.ProgrammingError("Package 'cryptography' is not installed") from None try: from oci import config, exceptions except ImportError: raise errors.ProgrammingError( "Package 'oci' (Oracle Cloud Infrastructure Python SDK) is not installed" ) from None from . import BaseAuthPlugin AUTHENTICATION_PLUGIN_CLASS = "MySQLOCIAuthPlugin" OCI_SECURITY_TOKEN_MAX_SIZE = 10 * 1024 # In bytes OCI_SECURITY_TOKEN_TOO_LARGE = "Ephemeral security token is too large (10KB max)" OCI_SECURITY_TOKEN_FILE_NOT_AVAILABLE = ( "Ephemeral security token file ('security_token_file') could not be read" ) OCI_PROFILE_MISSING_PROPERTIES = ( "OCI configuration file does not contain a 'fingerprint' or 'key_file' entry" ) class MySQLOCIAuthPlugin(BaseAuthPlugin): """Implement the MySQL OCI IAM authentication plugin.""" plugin_name: str = "authentication_oci_client" requires_ssl: bool = False context: Any = None oci_config_profile: str = "DEFAULT" oci_config_file: str = config.DEFAULT_LOCATION @staticmethod def _prepare_auth_response(signature: bytes, oci_config: Dict[str, Any]) -> str: """Prepare client's authentication response Prepares client's authentication response in JSON format Args: signature (bytes): server's nonce to be signed by client. oci_config (dict): OCI configuration object. Returns: str: JSON string with the following format: {"fingerprint": str, "signature": str, "token": base64.base64.base64} Raises: ProgrammingError: If the ephemeral security token file can't be open or the token is too large. """ signature_64 = b64encode(signature) auth_response = { "fingerprint": oci_config["fingerprint"], "signature": signature_64.decode(), } # The security token, if it exists, should be a JWT (JSON Web Token), consisted # of a base64-encoded header, body, and signature, separated by '.', # e.g. "Base64.Base64.Base64", stored in a file at the path specified by the # security_token_file configuration property if oci_config.get("security_token_file"): try: security_token_file = Path(oci_config["security_token_file"]) # Check if token exceeds the maximum size if security_token_file.stat().st_size > OCI_SECURITY_TOKEN_MAX_SIZE: raise errors.ProgrammingError(OCI_SECURITY_TOKEN_TOO_LARGE) auth_response["token"] = security_token_file.read_text(encoding="utf-8") except (OSError, UnicodeError) as err: raise errors.ProgrammingError( OCI_SECURITY_TOKEN_FILE_NOT_AVAILABLE ) from err return json.dumps(auth_response, separators=(",", ":")) @staticmethod def _get_private_key(key_path: str) -> PRIVATE_KEY_TYPES: """Get the private_key form the given location""" try: with open(os.path.expanduser(key_path), "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None, ) except (TypeError, OSError, ValueError, UnsupportedAlgorithm) as err: raise errors.ProgrammingError( "An error occurred while reading the API_KEY from " f'"{key_path}": {err}' ) return private_key def _get_valid_oci_config(self) -> Dict[str, Any]: """Get a valid OCI config from the given configuration file path""" error_list = [] req_keys = { "fingerprint": (lambda x: len(x) > 32), "key_file": (lambda x: os.path.exists(os.path.expanduser(x))), } oci_config: Dict[str, Any] = {} try: # key_file is validated by oci.config if present oci_config = config.from_file( self.oci_config_file or config.DEFAULT_LOCATION, self.oci_config_profile or "DEFAULT", ) for req_key, req_value in req_keys.items(): try: # Verify parameter in req_key is present and valid if oci_config[req_key] and not req_value(oci_config[req_key]): error_list.append(f'Parameter "{req_key}" is invalid') except KeyError: error_list.append(f"Does not contain parameter {req_key}") except ( exceptions.ConfigFileNotFound, exceptions.InvalidConfig, exceptions.InvalidKeyFilePath, exceptions.InvalidPrivateKey, exceptions.ProfileNotFound, ) as err: error_list.append(str(err)) # Raise errors if any if error_list: raise errors.ProgrammingError( f"Invalid oci-config-file: {self.oci_config_file}. " f"Errors found: {error_list}" ) return oci_config def auth_response(self, auth_data: Optional[bytes] = None) -> bytes: """Prepare authentication string for the server.""" logger.debug("server nonce: %s, len %d", self._auth_data, len(self._auth_data)) oci_config = self._get_valid_oci_config() private_key = self._get_private_key(oci_config["key_file"]) signature = private_key.sign( self._auth_data, padding.PKCS1v15(), hashes.SHA256() ) auth_response = self._prepare_auth_response(signature, oci_config) logger.debug("authentication response: %s", auth_response) return auth_response.encode()