/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import org.opensearch.gradle.test.AntFixture import org.opensearch.gradle.testclusters.DefaultTestClustersTask import org.opensearch.hadoop.gradle.fixture.hadoop.HadoopFixturePlugin import org.opensearch.hadoop.gradle.fixture.hadoop.ServiceDescriptor import org.opensearch.hadoop.gradle.fixture.hadoop.conf.HadoopClusterConfiguration import org.opensearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.opensearch.hadoop.gradle.fixture.hadoop.conf.RoleConfiguration import org.opensearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.opensearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer import org.opensearch.hadoop.gradle.fixture.hadoop.tasks.HadoopMRJob import org.opensearch.hadoop.gradle.fixture.hadoop.tasks.HiveBeeline import org.opensearch.hadoop.gradle.fixture.hadoop.tasks.SparkApp apply plugin: 'opensearch.hadoop.build' apply plugin: 'scala' apply plugin: HadoopFixturePlugin configurations { kdcFixture } tasks.withType(ScalaCompile) { ScalaCompile task -> task.sourceCompatibility = project.ext.minimumRuntimeVersion task.targetCompatibility = project.ext.minimumRuntimeVersion } boolean localRepo = project.getProperties().containsKey("localRepo") // Gradle's java library plugin adds a variant to each project that offers the classes dir as an artifact that can be // used in other projects instead of requiring a jar operation to happen. Kerberos depends on the spark integration to // compile some code, but due to how the current spark compilation works it's likely that the classes directory in the // spark projects do not exist anymore when the kerberos compilation happens during a distribution build. // TODO: Clean this up when we get to variants, they should persist their classes dirs in a sane way configurations { compileClasspath { beforeLocking { attributes { attribute(LibraryElements.LIBRARY_ELEMENTS_ATTRIBUTE, project.objects.named(LibraryElements, LibraryElements.JAR)) } } } } dependencies { implementation(project(":opensearch-hadoop-mr")) implementation("org.scala-lang:scala-library:${scala212Version}") implementation("org.scala-lang:scala-reflect:${scala212Version}") implementation(project(":opensearch-spark-30")) { capabilities { // Spark 3.x on Scala 2.12 requireCapability("org.opensearch.spark.sql.variant:spark30scala212:$project.version") } } compileOnly("com.fasterxml.jackson.module:jackson-module-scala_2.12:2.15.2") compileOnly("com.fasterxml.jackson.core:jackson-annotations:2.15.2") compileOnly("org.json4s:json4s-jackson_2.12:4.0.6") compileOnly("org.slf4j:slf4j-api:2.0.7") implementation("org.apache.hadoop:hadoop-client:${HadoopClusterConfiguration.HADOOP.defaultVersion()}") implementation("org.apache.spark:spark-sql_2.12:$project.ext.spark30Version") implementation( project(path: ':opensearch-hadoop-mr', configuration: 'itestJarConfig')) kdcFixture project(':test:fixtures:minikdc') } // ============================================================================= // Testing jars // ============================================================================= // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. boolean disableTests = false if (disableTests) { // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. Test integrationTest = project.tasks.findByName('integrationTest') as Test integrationTest.setEnabled(false) } else if (!localRepo) { // We don't want to attempt compiling integration or test code when using a local repo because it depends on // the opensearch jar which will not be reachable. // Additionally, when using a local repo, the esCluster plugin is not applied, and thus, cannot be referenced in the // build script. // ============================================================================= // Kerberos configuration // ============================================================================= String realm = '@BUILD.CI.OPENSEARCH.ORG' String hostPart = '/build.ci.opensearch.org' String opensearchPrincipal = 'HTTP' + hostPart String clientPrincipal = 'client' + hostPart String namenodePrincipal = 'nn' + hostPart String datanodePrincipal = 'dn' + hostPart String resourceManagerPrincipal = 'rm' + hostPart String nodeManagerPrincipal = 'nm' + hostPart String historyServerPrincipal = 'jhs' + hostPart String mapredPrincipal = 'mapred' + hostPart String hivePrincipalName = 'hive' + hostPart String opensearchKeytabName = 'opensearch.keytab' String clientKeytabName = 'client.keytab' String hadoopKeytabName = 'hadoop.keytab' String hiveKeytabName = 'hive.keytab' Map users = ["client":"password", "gmarx":"swordfish"] Map keytabUsers = [:] keytabUsers.put(opensearchPrincipal, opensearchKeytabName) keytabUsers.put(clientPrincipal, clientKeytabName) keytabUsers.put('HTTP/hadoop.build.opensearch.org', hadoopKeytabName) keytabUsers.put(namenodePrincipal, hadoopKeytabName) keytabUsers.put(datanodePrincipal, hadoopKeytabName) keytabUsers.put(resourceManagerPrincipal, hadoopKeytabName) keytabUsers.put(nodeManagerPrincipal, hadoopKeytabName) keytabUsers.put(historyServerPrincipal, hadoopKeytabName) keytabUsers.put(mapredPrincipal, hadoopKeytabName) keytabUsers.put(hivePrincipalName, hiveKeytabName) // Configure MiniKDC AntFixture kdcFixture = project.tasks.create('kdcFixture', AntFixture) { dependsOn project.configurations.kdcFixture executable = new File(project.runtimeJavaHome, 'bin/java') env 'CLASSPATH', "${ -> project.configurations.kdcFixture.asPath }" waitCondition = { fixture, ant -> // the kdc wrapper writes the ports file when // it's ready, so we can just wait for the file to exist return fixture.portsFile.exists() } final List kdcFixtureArgs = [] // Add provisioning lines first for sys properties users.forEach { k, v -> kdcFixtureArgs.add("-Des.fixture.kdc.user.${k}.pwd=${v}") } keytabUsers.forEach { k, v -> kdcFixtureArgs.add("-Des.fixture.kdc.user.${k}.keytab=${v}") } // Common options kdcFixtureArgs.add('org.opensearch.hadoop.test.fixture.minikdc.MiniKdcFixture') kdcFixtureArgs.add(baseDir.toString()) args kdcFixtureArgs.toArray() } // KDC Fixture artifacts java.nio.file.Path fixtureDir = project.buildDir.toPath().resolve("fixtures").resolve("kdcFixture") java.nio.file.Path krb5Conf = fixtureDir.resolve("krb5.conf") java.nio.file.Path opensearchKeytab = fixtureDir.resolve(opensearchKeytabName) java.nio.file.Path clientKeytab = fixtureDir.resolve(clientKeytabName) java.nio.file.Path hadoopKeytab = fixtureDir.resolve(hadoopKeytabName) java.nio.file.Path hiveKeytab = fixtureDir.resolve(hiveKeytabName) // ============================================================================= // OpenSearch cluster configuration // ============================================================================= // Configure OpenSearch with Kerberos Auth testClusters.integTest { // This may be needed if we ever run against java 9: // --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED // Set kerberos conf on JVM systemProperty("java.security.krb5.conf", krb5Conf.toString()) //systemProperty("sun.security.krb5.debug", "true") // force localhost IPv4 otherwise it is a chicken and egg problem where we need the keytab for the // hostname when starting the cluster but do not know the exact address that is first in the http // ports file setting 'http.host', '127.0.0.1' extraConfigFile('opensearch.keytab', opensearchKeytab.toFile()) } def opensearchAddress = "${-> testClusters.integTest.getAllHttpSocketURI().get(0)}" // Configure Integration Test Task Test integrationTest = project.tasks.findByName('integrationTest') as Test integrationTest.dependsOn(kdcFixture) integrationTest.finalizedBy(kdcFixture.getStopTask()) integrationTest.systemProperty("java.security.krb5.conf", krb5Conf.toString()) //integrationTest.systemProperty("sun.security.krb5.debug", "true") integrationTest.systemProperty("opensearch.net.http.auth.user", "test_admin") integrationTest.systemProperty("opensearch.net.http.auth.pass", "x-pack-test-password") integrationTest.systemProperty("tests.hive.principal", "${hivePrincipalName}${realm}") integrationTest.systemProperty("tests.hive.keytab", hiveKeytab.toString()) // Fixtures will be depending on the jar and test jar artifacts def jar = project.tasks.getByName('jar') as org.gradle.jvm.tasks.Jar def kerberosItestJar = project.tasks.findByName('itestJar') as Jar // TODO: We should be acquiring these jars via a configuration instead of breaking the glass here. def mrJar = project(':opensearch-hadoop-mr').tasks.getByName('jar') as Jar def hiveJar = project(':opensearch-hadoop-hive').tasks.getByName('jar') as Jar def sparkJar = project(':opensearch-spark-30').tasks.getByName('jar') as Jar // Need these for SSL items, test data, and scripts File resourceDir = project.sourceSets.main.resources.getSrcDirs().head() File itestResourceDir = project.sourceSets.itest.resources.getSrcDirs().head() Task setupUsers = project.tasks.create("setupUsers", DefaultTestClustersTask) { useCluster(testClusters.integTest) doLast { project.javaexec { executable = project.runtimeJavaHome.toString() + "/bin/java" mainClass = 'org.opensearch.hadoop.qa.kerberos.setup.SetupKerberosUsers' classpath = sourceSets.main.runtimeClasspath systemProperty('opensearch.nodes', opensearchAddress) systemProperty('opensearch.net.http.auth.user', 'test_admin') systemProperty('opensearch.net.http.auth.pass', 'x-pack-test-password') systemProperty('principals', "$clientPrincipal$realm") systemProperty('users', "client") systemProperty('proxiers', "$hivePrincipalName$realm") } } } setupUsers.dependsOn(kdcFixture, jar) integrationTest.dependsOn(setupUsers) // ============================================================================= // Hadoop test cluster configuration // ============================================================================= // Project instance available implicitly HadoopClusterConfiguration config = project.hadoop.cluster("hadoopFixture") { HadoopClusterConfiguration config -> // Hadoop cluster depends on KDC Fixture being up and running config.addDependency(kdcFixture) config.useOpenSearchCluster(testClusters.integTest) config.service('hadoop') { ServiceConfiguration s -> s.addSystemProperty("java.security.krb5.conf", krb5Conf.toString()) s.setJvmArgs("-Xmx200m") // Core Site Config s.settingsFile('core-site.xml') { SettingsContainer.FileSettings f -> // Enable Security f.addSetting("hadoop.security.authentication", "kerberos") f.addSetting("hadoop.security.authorization", "true") f.addSetting("hadoop.rpc.protection", "authentication") f.addSetting("hadoop.ssl.require.client.cert", "false") f.addSetting("hadoop.ssl.hostname.verifier", "DEFAULT") f.addSetting("hadoop.ssl.keystores.factory.class", "org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory") f.addSetting("hadoop.ssl.server.conf", "ssl-server.xml") f.addSetting("hadoop.ssl.client.conf", "ssl-client.xml") f.addSetting("hadoop.proxyuser.hive.hosts", "*") f.addSetting("hadoop.proxyuser.hive.groups", "*") // Add OpenSearch Security settings here because without them Spark will not obtain tokens f.addSetting('opensearch.security.authentication', 'kerberos') f.addSetting('opensearch.net.spnego.auth.opensearch.principal', "${opensearchPrincipal}${realm}") } // SSL Server Config s.settingsFile('ssl-server.xml') { SettingsContainer.FileSettings f -> f.addSetting("ssl.server.keystore.type", "jks") f.addSetting("ssl.server.keystore.location", "${resourceDir.getAbsolutePath()}/ssl/server.jks") f.addSetting("ssl.server.keystore.password", "bigdata") f.addSetting("ssl.server.keystore.keypassword", "bigdata") } // HDFS Site Config s.settingsFile('hdfs-site.xml') { SettingsContainer.FileSettings f -> f.addSetting("dfs.http.policy", "HTTPS_ONLY") f.addSetting("dfs.web.authentication.kerberos.principal", "HTTP/hadoop.build.opensearch.org$realm") f.addSetting("dfs.web.authentication.kerberos.keytab", "$hadoopKeytab") f.addSetting("dfs.block.access.token.enable", "true") f.addSetting("dfs.namenode.kerberos.principal", "$namenodePrincipal$realm") f.addSetting("dfs.namenode.keytab.file", "$hadoopKeytab") f.addSetting("dfs.namenode.kerberos.internal.spnego.principal", "HTTP/hadoop.build.opensearch.org") f.addSetting("dfs.datanode.data.dir.perm", "700") f.addSetting("dfs.datanode.kerberos.principal", "$datanodePrincipal$realm") f.addSetting("dfs.datanode.keytab.file", "$hadoopKeytab") f.addSetting("dfs.encrypt.data.transfer", "false") f.addSetting("dfs.data.transfer.protection", "authentication") } // Yarn Site Config s.settingsFile('yarn-site.xml') { SettingsContainer.FileSettings f -> f.addSetting("yarn.resourcemanager.principal", "$resourceManagerPrincipal$realm") f.addSetting("yarn.resourcemanager.keytab", "$hadoopKeytab") f.addSetting("yarn.nodemanager.principal", "$nodeManagerPrincipal$realm") f.addSetting("yarn.nodemanager.keytab", "$hadoopKeytab") } // Mapred Site Config s.settingsFile('mapred-site.xml') { SettingsContainer.FileSettings f -> f.addSetting("mapreduce.framework.name", "yarn") f.addSetting("mapreduce.shuffle.ssl.enabled", "false") f.addSetting("mapreduce.jobhistory.principal", "$historyServerPrincipal$realm") f.addSetting("mapreduce.jobhistory.keytab", "$hadoopKeytab") f.addSetting("yarn.resourcemanager.principal", "$resourceManagerPrincipal$realm") } // Add the OpenSearch-Hadoop jar to the resource manager classpath so that it can load the token renewer implementation // for OpenSearch tokens. Otherwise, tokens may not be cancelled at the end of the job. s.role('resourcemanager') { RoleConfiguration r -> r.addEnvironmentVariable('YARN_USER_CLASSPATH', mrJar.archiveFile.get().asFile.toString()) r.settingsFile('yarn-site.xml') { SettingsContainer.FileSettings f -> // Add settings specifically for OpenSearch Node to allow for cancelling the tokens f.addSetting('opensearch.nodes', opensearchAddress) } } } config.service('spark') config.service('hive') { ServiceConfiguration s -> s.setJvmArgs("-Xmx200m") s.addSystemProperty("java.security.krb5.conf", krb5Conf.toString()) s.addSetting('hive.execution.engine', 'mr') // Hive attempts to establish a tez session at start up, but there are no Tez libraries distributed with Hive. s.addSetting('hive.server2.active.passive.ha.enable', 'true') s.addSetting('hive.server2.authentication', 'kerberos') s.addSetting('hive.server2.authentication.kerberos.principal', "$hivePrincipalName$realm") s.addSetting('hive.server2.authentication.kerberos.keytab', "$hiveKeytab") // s.addSetting('hive.server2.logging.operation.level', "VERBOSE") s.addSetting('yarn.app.mapreduce.am.command-opts', "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.map.java.opts', "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.reduce.java.opts', "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('opensearch.nodes', opensearchAddress) } config.addDependency(jar) config.addDependency(kerberosItestJar) config.addDependency(mrJar) config.addDependency(hiveJar) config.addDependency(sparkJar) } // We need to create a tmp directory in hadoop before history server does, because history server will set permissions // wrong. HadoopMRJob createTmp = config.service('hadoop').role('datanode').createClusterTask('createTmp', HadoopMRJob.class) { clusterConfiguration = config // Run on namenode since the gateway is not yet set up runOn(config.service('hadoop').role('namenode').instance(0)) dependsOn(jar) jobJar = jar.getArchiveFile().get().getAsFile() jobClass = 'org.opensearch.hadoop.qa.kerberos.dfs.SecureFsShell' systemProperties([ "test.krb5.principal": namenodePrincipal, "test.krb5.keytab": hadoopKeytab.toString(), ]) args = ['-mkdir', '-p', '/tmp/hadoop-yarn/staging/history'] } HadoopMRJob prepareTmp = config.service('hadoop').role('datanode').createClusterTask('prepareTmp', HadoopMRJob.class) { clusterConfiguration = config // Run on namenode since the gateway is not yet set up runOn(config.service('hadoop').role('namenode').instance(0)) dependsOn(jar, createTmp) jobJar = jar.getArchiveFile().get().getAsFile() jobClass = 'org.opensearch.hadoop.qa.kerberos.dfs.SecureFsShell' systemProperties([ "test.krb5.principal": namenodePrincipal, "test.krb5.keytab": hadoopKeytab.toString(), ]) // Recursive should be fine here since it's before anything is on the FS args = ['-chmod', '-R', '777', '/tmp'] } config.service('hadoop').role('historyserver').addDependency(prepareTmp) config.service('hive').addDependency(prepareTmp) // We must create the data directory before copying test data. DfsCopy task would normally do this // automatically, but we have to wrap the fs shell for Kerberos. HadoopMRJob createDataDir = config.createClusterTask('createDataDir', HadoopMRJob.class) { clusterConfiguration = config executedOn = config.service('hadoop').role('namenode').instance(0) dependsOn(jar) jobJar = jar.getArchiveFile().get().getAsFile() jobClass = 'org.opensearch.hadoop.qa.kerberos.dfs.SecureFsShell' systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) args = ['-mkdir', '-p', '/data/artists/'] } // Copy the test data to HDFS using the SecureFsShell instead of dfs copy. We could use the // DfsCopy task, but with Kerberos, we would have to kinit before running it. Instead we wrap. HadoopMRJob copyData = config.createClusterTask('copyData', HadoopMRJob.class) { clusterConfiguration = config dependsOn(createDataDir, jar) jobJar = jar.getArchiveFile().get().getAsFile() jobClass = 'org.opensearch.hadoop.qa.kerberos.dfs.SecureFsShell' systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) args = ['-copyFromLocal', project.file(new File(itestResourceDir, 'artists.dat')), "/data/artists/artists.dat"] } // ============================================================================= // Map Reduce Jobs // ============================================================================= // Run the MR job to load data to OpenSearch. Ensure Kerberos settings are available. HadoopMRJob mrLoadData = config.createClusterTask('mrLoadData', HadoopMRJob.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(copyData, setupUsers) jobJar = jar.getArchiveFile().get().getAsFile() libJars(mrJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) jobClass = 'org.opensearch.hadoop.qa.kerberos.mr.LoadToOpenSearch' jobSettings([ 'opensearch.resource': 'qa_kerberos_mr_data', 'opensearch.nodes': opensearchAddress, 'opensearch.security.authentication': 'kerberos', 'opensearch.net.spnego.auth.opensearch.principal': "${opensearchPrincipal}${realm}", 'load.field.names': 'number,name,url,picture,@timestamp,tag', 'mapreduce.map.java.opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}" ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) args = ['/data/artists/artists.dat'] } integrationTest.dependsOn(mrLoadData) // Run the MR job to read data out of OpenSearch. Ensure Kerberos settings are available. HadoopMRJob mrReadData = config.createClusterTask('mrReadData', HadoopMRJob.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(mrLoadData) jobJar = jar.getArchiveFile().get().getAsFile() libJars(mrJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) jobClass = 'org.opensearch.hadoop.qa.kerberos.mr.ReadFromOpenSearch' jobSettings([ 'opensearch.resource': 'qa_kerberos_mr_data', 'opensearch.nodes': opensearchAddress, 'opensearch.security.authentication': 'kerberos', 'opensearch.net.spnego.auth.opensearch.principal': "${opensearchPrincipal}${realm}", 'mapreduce.map.java.opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Xmx200m -Djava.security.krb5.conf=${krb5Conf.toString()}" ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) args = ['/data/output/mr'] } integrationTest.dependsOn(mrReadData) // ============================================================================= // Spark Jobs // ============================================================================= // Run the Spark job to load data to OpenSearch. Ensure Kerberos settings are available. SparkApp sparkLoadData = config.createClusterTask('sparkLoadData', SparkApp.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.getArchiveFile().get().getAsFile() libJars(sparkJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) jobClass = 'org.opensearch.hadoop.qa.kerberos.spark.LoadToOpenSearch' jobSettings([ 'spark.opensearch.resource': 'qa_kerberos_spark_data', 'spark.opensearch.nodes': opensearchAddress, 'spark.opensearch.security.authentication': 'kerberos', 'spark.opensearch.net.spnego.auth.opensearch.principal': "${opensearchPrincipal}${realm}", 'spark.load.field.names': 'number,name,url,picture,@timestamp,tag', 'spark.yarn.am.memory': "200m", 'spark.driver.memory': "600m", 'spark.yarn.am.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.yarn.principal': "${clientPrincipal}${realm}", 'spark.yarn.keytab': clientKeytab.toString() ]) environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/artists/artists.dat'] } integrationTest.dependsOn(sparkLoadData) // Run the Spark job to load data to OpenSearch. Ensure Kerberos settings are available. SparkApp sparkReadData = config.createClusterTask('sparkReadData', SparkApp.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(sparkLoadData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.getArchiveFile().get().getAsFile() libJars(sparkJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) jobClass = 'org.opensearch.hadoop.qa.kerberos.spark.ReadFromOpenSearch' jobSettings([ 'spark.opensearch.resource': 'qa_kerberos_spark_data', 'spark.opensearch.nodes': opensearchAddress, 'spark.opensearch.security.authentication': 'kerberos', 'spark.opensearch.net.spnego.auth.opensearch.principal': "${opensearchPrincipal}${realm}", 'spark.yarn.am.memory': "200m", 'spark.driver.memory': "600m", 'spark.yarn.am.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.yarn.principal': "${clientPrincipal}${realm}", 'spark.yarn.keytab': clientKeytab.toString() ]) environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/output/spark'] } integrationTest.dependsOn(sparkReadData) // ============================================================================= // Hive Jobs // ============================================================================= // Replace the regular beeline script with our own shimmed one. All we do is perform a keytab login and // delegate on regularly. This is easier than re-implementing most of beeline. Copy patchBeeline = config.createClusterTask('patchBeeline', Copy.class) { InstanceConfiguration i = config.service('hive').role('hiveserver').instance(0) ServiceDescriptor hive = i.getServiceDescriptor() Object lazyEvalBeelineScript = "${->i.getBaseDir().toPath().resolve(hive.homeDirName(i)).resolve(hive.scriptDir(i)).resolve('ext').toString()}" String lazyPatchedBeeline = "${->resourceDir.toPath().resolve('hive').resolve('patches').resolve(i.getServiceConf().getVersion().toString()).resolve('beeline.sh').toString()}" // Just in case any of the directories in Hive are shifted around, before running the fixture, we resolve // the path to the beeline ext script lazily. from lazyPatchedBeeline into lazyEvalBeelineScript setDuplicatesStrategy(DuplicatesStrategy.INCLUDE) } HiveBeeline hiveLoadData = config.createClusterTask('hiveLoadData', HiveBeeline.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData, patchBeeline) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/load_to_opensearch.sql') libJars(hiveJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()} ", 'TEST_LIB': jar.getArchiveFile().get().getAsFile().toString() ]) } integrationTest.dependsOn(hiveLoadData) HiveBeeline hiveReadData = config.createClusterTask('hiveReadData', HiveBeeline.class) { clusterConfiguration = config useCluster(testClusters.integTest) dependsOn(hiveLoadData) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/read_from_opensearch.sql') libJars(hiveJar.getArchiveFile().get().getAsFile(), kerberosItestJar.getArchiveFile().get().getAsFile()) environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()} ", 'TEST_LIB': jar.getArchiveFile().get().getAsFile().toString() ]) } integrationTest.dependsOn(hiveReadData) // ============================================================================= // Copy job outputs // ============================================================================= File outputDataDir = project.file(new File(project.buildDir, 'data')) Task createOutputDataDir = project.tasks.create("createOutputDataDir") { doLast { if (outputDataDir.exists()) { // Clean out any prior results outputDataDir.deleteDir() } outputDataDir.mkdirs() } } Map readJobs = [ 'mr': mrReadData, 'sparkRDD': sparkReadData, 'sparkDF': sparkReadData, 'sparkDS': sparkReadData, 'hive': hiveReadData, ] readJobs.forEach { integrationName, integrationReadTask -> // Copy the test data to HDFS using the SecureFsShell instead of dfs copy. We could use the // DfsCopy task, but with Kerberos, we would have to kinit before running it. Instead we wrap. HadoopMRJob copyOutputTask = config.createClusterTask("copy${integrationName.capitalize()}Output".toString(), HadoopMRJob.class) { clusterConfiguration = config dependsOn(integrationReadTask, createOutputDataDir) jobJar = jar.getArchiveFile().get().getAsFile() jobClass = 'org.opensearch.hadoop.qa.kerberos.dfs.SecureFsShell' systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) args = ['-copyToLocal', "/data/output/$integrationName", outputDataDir] } // Integration test needs to depend on copy output tasks // to ensure all integrations have their output files on // disk integrationTest.dependsOn(copyOutputTask) } }