# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from smspark.config import Configuration def test_core_site_xml() -> None: configuration = Configuration("core-site", {"hadoop.security.groups.cache.secs": "250"}) serialized_conf = configuration.serialized assert ( serialized_conf == " \n hadoop.security.groups.cache.secs\n 250\n \n" ) def test_hadoop_env_sh() -> None: configuration = Configuration( "hadoop-env", {}, Configurations=[ Configuration( "export", {"HADOOP_DATANODE_HEAPSIZE": "2048", "HADOOP_NAMENODE_OPTS": "-XX:GCTimeRatio=19"}, ) ], ) serialized_conf = configuration.serialized assert serialized_conf == "export HADOOP_DATANODE_HEAPSIZE=2048\nexport HADOOP_NAMENODE_OPTS=-XX:GCTimeRatio=19\n" def test_hadoop_log4j() -> None: configuration = Configuration( "hadoop-log4j", { "hadoop.root.logger": "INFO,console", "hadoop.log.dir": "/var/log/hadoop", "hadoop.log.file": "hadoop.log", }, ) serialized_conf = configuration.serialized assert ( serialized_conf == "hadoop.root.logger=INFO,console\nhadoop.log.dir=/var/log/hadoop\nhadoop.log.file=hadoop.log\n" ) def test_hive_env() -> None: configuration = Configuration( "hive-env", {}, Configurations=[Configuration("export", {"HADOOP_HEAPSIZE": "1000", "USE_HADOOP_SLF4J_BINDING": "false"})], ) serialized_conf = configuration.serialized assert serialized_conf == "export HADOOP_HEAPSIZE=1000\nexport USE_HADOOP_SLF4J_BINDING=false\n" def test_hive_log4j() -> None: configuration = Configuration("hive-log4j", {"property.hive.log.level": "INFO"}) serialized_conf = configuration.serialized assert serialized_conf == "property.hive.log.level=INFO\n" def test_hive_exec_log4j() -> None: configuration = Configuration( "hive-exec-log4j", {"loggers": "NIOServerCnxn,ClientCnxnSocketNIO,DataNucleus,Datastore,JPOX"}, ) serialized_conf = configuration.serialized assert serialized_conf == "loggers=NIOServerCnxn,ClientCnxnSocketNIO,DataNucleus,Datastore,JPOX\n" def test_hive_site() -> None: configuration = Configuration( "hive-site", { "hive.execution.engine": "tez", "hive.security.metastore.authorization.manager": "org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider", }, ) serialized_conf = configuration.serialized assert ( serialized_conf == " \n hive.execution.engine\n tez\n \n \n hive.security.metastore.authorization.manager\n org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider\n \n" ) def test_spark_defaults_conf(): configuration = Configuration("spark-defaults", {"spark.master": "yarn", "spark.executor.memory": "4096M"}) serialized_conf = configuration.serialized assert serialized_conf == "spark.master yarn\nspark.executor.memory 4096M\n" def test_spark_env(): configuration = Configuration( "hadoop-env", {}, Configurations=[ Configuration( "export", {"SPARK_MASTER_PORT": "7077", "SPARK_MASTER_IP": "$STANDALONE_SPARK_MASTER_HOST"}, ) ], ) serialized_conf = configuration.serialized assert serialized_conf == "export SPARK_MASTER_PORT=7077\nexport SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST\n" def test_spark_log4j_properties(): configuration = Configuration( "spark-log4j", {"spark.yarn.app.container.log.dir": "/var/log/spark/user/${user.name}"} ) serialized_conf = configuration.serialized assert serialized_conf == "spark.yarn.app.container.log.dir=/var/log/spark/user/${user.name}\n" def test_spark_hive_site(): configuration = Configuration( "spark-hive-site", {"javax.jdo.option.ConnectionUserName": "hive"}, ) serialized_conf = configuration.serialized assert ( serialized_conf == " \n javax.jdo.option.ConnectionUserName\n hive\n \n" ) def test_spark_metrics_properties(): configuration = Configuration("spark-metrics", {"*.sink.statsd.prefix": "spark"}) serialized_conf = configuration.serialized assert serialized_conf == "*.sink.statsd.prefix=spark\n" def test_yarn_env(): configuration = Configuration( "yarn-env", {}, Configurations=[ Configuration( "export", { "YARN_OPTS": "\"$YARN_OPTS -XX:OnOutOfMemoryError='kill -9 %p'\"", "YARN_PROXYSERVER_HEAPSIZE": "2416", "YARN_NODEMANAGER_HEAPSIZE": "2048", "YARN_RESOURCEMANAGER_HEAPSIZE": "2416", }, ) ], ) serialized_conf = configuration.serialized assert ( serialized_conf == """export YARN_OPTS="$YARN_OPTS -XX:OnOutOfMemoryError='kill -9 %p'"\nexport YARN_PROXYSERVER_HEAPSIZE=2416\nexport YARN_NODEMANAGER_HEAPSIZE=2048\nexport YARN_RESOURCEMANAGER_HEAPSIZE=2416\n""" ) def test_yarn_size(): configuration = Configuration( "yarn-site", {"yarn.log-aggregation-enable": "true"}, ) serialized_conf = configuration.serialized assert ( serialized_conf == " \n yarn.log-aggregation-enable\n true\n \n" )