"""EMR Serverless module.""" import logging import pprint import time from typing import Any, Dict, List, Literal, Optional, TypedDict, Union import boto3 from typing_extensions import NotRequired, Required from awswrangler import _utils, exceptions from awswrangler._config import apply_configs from awswrangler.annotations import Experimental _logger: logging.Logger = logging.getLogger(__name__) _EMR_SERVERLESS_JOB_WAIT_POLLING_DELAY: float = 5 # SECONDS _EMR_SERVERLESS_JOB_FINAL_STATES: List[str] = ["SUCCESS", "FAILED", "CANCELLED"] class SparkSubmitJobArgs(TypedDict): """Typed dictionary defining the Spark submit job arguments.""" entryPoint: Required[str] """The entry point for the Spark submit job run.""" entryPointArguments: NotRequired[List[str]] """The arguments for the Spark submit job run.""" sparkSubmitParameters: NotRequired[str] """The parameters for the Spark submit job run.""" class HiveRunJobArgs(TypedDict): """Typed dictionary defining the Hive job run arguments.""" query: Required[str] """The S3 location of the query file for the Hive job run.""" initQueryFile: NotRequired[str] """The S3 location of the query file for the Hive job run.""" parameters: NotRequired[str] """The parameters for the Hive job run.""" @Experimental def create_application( name: str, release_label: str, application_type: Literal["Spark", "Hive"] = "Spark", initial_capacity: Optional[Dict[str, str]] = None, maximum_capacity: Optional[Dict[str, str]] = None, tags: Optional[Dict[str, str]] = None, autostart: bool = True, autostop: bool = True, idle_timeout: int = 15, network_configuration: Optional[Dict[str, str]] = None, architecture: Literal["ARM64", "X86_64"] = "X86_64", image_uri: Optional[str] = None, worker_type_specifications: Optional[Dict[str, str]] = None, boto3_session: Optional[boto3.Session] = None, ) -> str: """ Create an EMR Serverless application. https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/emr-serverless.html Parameters ---------- name : str Name of EMR Serverless appliation release_label : str Release label e.g. `emr-6.10.0` application_type : str, optional Application type: "Spark" or "Hive". Defaults to "Spark". initial_capacity : Dict[str, str], optional The capacity to initialize when the application is created. maximum_capacity : Dict[str, str], optional The maximum capacity to allocate when the application is created. This is cumulative across all workers at any given point in time, not just when an application is created. No new resources will be created once any one of the defined limits is hit. tags : Dict[str, str], optional Key/Value collection to put tags on the application. e.g. {"foo": "boo", "bar": "xoo"}) autostart : bool, optional Enables the application to automatically start on job submission. Defaults to true. autostop : bool, optional Enables the application to automatically stop after a certain amount of time being idle. Defaults to true. idle_timeout : int, optional The amount of idle time in minutes after which your application will automatically stop. Defaults to 15 minutes. network_configuration : Dict[str, str], optional The network configuration for customer VPC connectivity. architecture : str, optional The CPU architecture of an application: "ARM64" or "X86_64". Defaults to "X86_64". image_uri : str, optional The URI of an image in the Amazon ECR registry. worker_type_specifications : Dict[str, str], optional The key-value pairs that specify worker type. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- str Application Id. """ emr_serverless = _utils.client(service_name="emr-serverless", session=boto3_session) application_args: Dict[str, Any] = { "name": name, "releaseLabel": release_label, "type": application_type, "autoStartConfiguration": { "enabled": autostart, }, "autoStopConfiguration": { "enabled": autostop, "idleTimeoutMinutes": idle_timeout, }, "architecture": architecture, } if initial_capacity: application_args["initialCapacity"] = initial_capacity if maximum_capacity: application_args["maximumCapacity"] = maximum_capacity if tags: application_args["tags"] = tags if network_configuration: application_args["networkConfiguration"] = network_configuration if worker_type_specifications: application_args["workerTypeSpecifications"] = worker_type_specifications if image_uri: application_args["imageConfiguration"] = { "imageUri": image_uri, } response: Dict[str, str] = emr_serverless.create_application(**application_args) # type: ignore[assignment] _logger.debug("response: \n%s", pprint.pformat(response)) return response["applicationId"] @Experimental @apply_configs def run_job( application_id: str, execution_role_arn: str, job_driver_args: Union[Dict[str, Any], SparkSubmitJobArgs, HiveRunJobArgs], job_type: Literal["Spark", "Hive"] = "Spark", wait: bool = True, configuration_overrides: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, str]] = None, execution_timeout: Optional[int] = None, name: Optional[str] = None, emr_serverless_job_wait_polling_delay: float = _EMR_SERVERLESS_JOB_WAIT_POLLING_DELAY, boto3_session: Optional[boto3.Session] = None, ) -> Union[str, Dict[str, Any]]: """ Run an EMR serverless job. https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/emr-serverless.html Parameters ---------- application_id : str The id of the application on which to run the job. execution_role_arn : str The execution role ARN for the job run. job_driver_args : Union[Dict[str, str], SparkSubmitJobArgs, HiveRunJobArgs] The job driver arguments for the job run. job_type : str, optional Type of the job: "Spark" or "Hive". Defaults to "Spark". wait : bool, optional Whether to wait for the job completion or not. Defaults to true. configuration_overrides : Dict[str, str], optional The configuration overrides for the job run. tags : Dict[str, str], optional Key/Value collection to put tags on the application. e.g. {"foo": "boo", "bar": "xoo"}) execution_timeout : int, optional The maximum duration for the job run to run. If the job run runs beyond this duration, it will be automatically cancelled. name : str, optional Name of the job. emr_serverless_job_wait_polling_delay : int, optional Time to wait between polling attempts. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Union[str, Dict[str, Any]] Job Id if wait=False, or job run details. """ emr_serverless = _utils.client(service_name="emr-serverless", session=boto3_session) job_args: Dict[str, Any] = { "applicationId": application_id, "executionRoleArn": execution_role_arn, } if job_type == "Spark": job_args["jobDriver"] = { "sparkSubmit": job_driver_args, } elif job_type == "Hive": job_args["jobDriver"] = { "hive": job_driver_args, } else: raise exceptions.InvalidArgumentValue(f"Unsupported job type `{job_type}`") if configuration_overrides: job_args["configurationOverrides"] = configuration_overrides if tags: job_args["tags"] = tags if execution_timeout: job_args["executionTimeoutMinutes"] = execution_timeout if name: job_args["name"] = name response = emr_serverless.start_job_run(**job_args) _logger.debug("Job run response: %s", response) job_run_id: str = response["jobRunId"] if wait: return wait_job( application_id=application_id, job_run_id=job_run_id, emr_serverless_job_wait_polling_delay=emr_serverless_job_wait_polling_delay, ) return job_run_id @Experimental @apply_configs def wait_job( application_id: str, job_run_id: str, emr_serverless_job_wait_polling_delay: float = _EMR_SERVERLESS_JOB_WAIT_POLLING_DELAY, boto3_session: Optional[boto3.Session] = None, ) -> Dict[str, Any]: """ Wait for the EMR Serverless job to finish. https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/emr-serverless.html Parameters ---------- application_id : str The id of the application on which the job is running. job_run_id : str The id of the job. emr_serverless_job_wait_polling_delay : int, optional Time to wait between polling attempts. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Dict[str, Any] Job run details. """ emr_serverless = _utils.client(service_name="emr-serverless", session=boto3_session) response = emr_serverless.get_job_run( applicationId=application_id, jobRunId=job_run_id, ) state = response["jobRun"]["state"] while state not in _EMR_SERVERLESS_JOB_FINAL_STATES: time.sleep(emr_serverless_job_wait_polling_delay) response = emr_serverless.get_job_run( applicationId=application_id, jobRunId=job_run_id, ) state = response["jobRun"]["state"] _logger.debug("Job state: %s", state) if state != "SUCCESS": _logger.debug("Job run response: %s", response) raise exceptions.EMRServerlessJobError(response.get("jobRun", {}).get("stateDetails")) return response # type: ignore[return-value]