# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Contains configuration classes and serializers for configuration."""
# https://docs.python.org/3/whatsnew/3.7.html#pep-563-postponed-evaluation-of-annotations
from __future__ import annotations
import pathlib
from collections import namedtuple
from dataclasses import dataclass
from typing import ClassVar, Mapping, Sequence
_ClassificationData = namedtuple("_ClassificationData", ["classification", "path", "serializer"])
EMR_CONFIGURE_APPS_URL = "https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html"
def properties_serializer(configuration: Configuration) -> str:
"""Serialize configuration to .properties files."""
lines = [f"{key}={val}" for key, val in configuration.Properties.items()]
return "\n".join(lines) + "\n"
def xml_serializer(configuration: Configuration) -> str:
"""Serialize configuration to properties to insert into .xml files."""
result = ""
for name, value in configuration.Properties.items():
sb = " \n"
sb += f" {name}\n"
sb += f" {value}\n"
sb += " \n"
result += sb
return result
def env_serializer(configuration: Configuration) -> str:
"""Serialize configuration to .env files.
The inner nested Configuration object contains
the keys, values, and properties to create lines of env.
"""
lines = []
for inner_configuration in configuration.Configurations:
if inner_configuration.Classification != "export":
raise ValueError(
"env classifications must use the 'export' sub-classification. Please refer to {} for more information.".format(
EMR_CONFIGURE_APPS_URL
)
)
for key, val in inner_configuration.Properties.items():
lines.append(f"export {key}={val}")
return "\n".join(lines) + "\n"
def conf_serializer(configuration: Configuration) -> str:
"""Serialize configuration to .conf files."""
lines = [f"{key} {val}" for key, val in configuration.Properties.items()]
return "\n".join(lines) + "\n"
@dataclass
class Configuration:
"""Dataclass representing configuration for Spark, Yarn, Hive, and Hadoop."""
Classification: str
Properties: Mapping[str, str]
Configurations: Sequence[Configuration] = ()
classification_data: ClassVar = [
_ClassificationData("core-site", "/usr/lib/hadoop/etc/hadoop/core-site.xml", xml_serializer),
_ClassificationData("hadoop-env", "/usr/lib/hadoop/etc/hadoop/hadoop-env.sh", env_serializer),
_ClassificationData("hadoop-log4j", "/usr/lib/hadoop/etc/hadoop/log4j.properties", properties_serializer),
_ClassificationData("hive-env", "/usr/lib/hive/conf/hive-env.sh", env_serializer),
_ClassificationData("hive-log4j", "/usr/lib/hive/conf/hive-log4j2.properties", properties_serializer),
_ClassificationData(
"hive-exec-log4j",
"/usr/lib/hive/conf/hive-exec-log4j2.properties",
properties_serializer,
),
_ClassificationData("hive-site", "/usr/lib/hive/conf/hive-site.xml", xml_serializer),
_ClassificationData("spark-defaults", "/usr/lib/spark/conf/spark-defaults.conf", conf_serializer),
_ClassificationData("spark-env", "/usr/lib/spark/conf/spark-env.sh", env_serializer),
_ClassificationData("spark-log4j", "/usr/lib/spark/conf/log4j.properties", properties_serializer),
_ClassificationData("spark-hive-site", "/usr/lib/spark/conf/hive-site.xml", xml_serializer),
_ClassificationData("spark-metrics", "/usr/lib/spark/conf/metrics.properties", properties_serializer),
_ClassificationData("yarn-env", "/usr/lib/hadoop/etc/hadoop/yarn-env.sh", env_serializer),
_ClassificationData("yarn-site", "/usr/lib/hadoop/etc/hadoop/yarn-site.xml", xml_serializer),
]
def __post_init__(self) -> None:
"""Perform basic validation on values."""
valid_classifications = [properties[0] for properties in self.classification_data]
for classification_data in self.classification_data:
if self.Classification == classification_data.classification:
self._data: _ClassificationData = classification_data
# special case for "*-env" classifications, whose inner nested Configurations list use "export"
if self.Classification not in valid_classifications and self.Classification != "export":
raise ValueError(
"Invalid classification: {}. Must be one of {}. Please refer to {} for more information.".format(
self.Classification, valid_classifications, EMR_CONFIGURE_APPS_URL
)
)
if "env" in self.Classification and not self.Configurations:
raise ValueError(
"'env' classifications require a sub-configuration."
+ " Please refer to {} for more information".format(EMR_CONFIGURE_APPS_URL)
)
@property
def path(self) -> pathlib.Path:
"""Get a path to where the config should be written to and read from."""
return pathlib.Path(self._data.path)
@property
def serialized(self) -> str:
"""Serialize Configuration to string representation for the configuration's filetype."""
serializer = self._data.serializer
serialized_conf: str = serializer(self)
return serialized_conf
def write_config(self) -> str:
"""Write or update configuration file."""
try:
self.path.parent.mkdir(parents=True, exist_ok=True)
except FileExistsError:
pass
if self._data.serializer == xml_serializer:
contents = ""
if self.path.exists():
with open(self.path, "r") as f:
contents += f.read()
# inserts properties before the closing tag.
contents = contents.replace("", f"\n{self.serialized}")
else:
contents += """\n"""
contents += """\n"""
contents += "\n"
contents += f"{self.serialized}\n"
contents += ""
with open(self.path, "w") as f:
f.write(contents)
else:
with open(self.path, "a") as f:
f.write(self.serialized)
with open(self.path, "r") as f:
conf_string = f.read()
return conf_string