- [Configuration Properties](#configuration-properties) - [Required](#required) - [Essential](#essential) - [Map/Reduce](#mapreduce) - ['Old' (`org.apache.hadoop.mapred`) API](#old-orgapachehadoopmapred-api) - [Reading](#reading) - [Writing](#writing) - ['New' (`org.apache.hadoop.mapreduce`) API](#new-orgapachehadoopmapreduce-api) - [Reading](#reading-1) - [Writing](#writing-1) - [Signing requests for IAM authentication](#signing-requests-for-iam-authentication) - [Apache Hive](#apache-hive) - [Reading](#reading-2) - [Writing](#writing-2) - [Signing requests for IAM authentication](#signing-requests-for-iam-authentication-1) - [Apache Spark](#apache-spark) - [Scala](#scala) - [Reading](#reading-3) - [Spark SQL](#spark-sql) - [Writing](#writing-3) - [Spark SQL](#spark-sql-1) - [Java](#java) - [Reading](#reading-4) - [Spark SQL](#spark-sql-2) - [Writing](#writing-4) - [Spark SQL](#spark-sql-3) - [Signing requests for IAM authentication](#signing-requests-for-iam-authentication-2) ## Configuration Properties All configuration properties start with `opensearch` prefix. Note that the `opensearch.internal` namespace is reserved for the library internal use and should _not_ be used by the user at any point. The properties are read mainly from the Hadoop configuration but the user can specify (some of) them directly depending on the library used. ### Required ``` opensearch.resource= ``` ### Essential ``` opensearch.query= # defaults to {"query":{"match_all":{}}} opensearch.nodes= # defaults to localhost opensearch.port= # defaults to 9200 ``` ## [Map/Reduce][] For basic, low-level or performance-sensitive environments, OpenSearch-Hadoop provides dedicated `InputFormat` and `OutputFormat` that read and write data to OpenSearch. To use them, add the `opensearch-hadoop` jar to your job classpath (either by bundling the library along - it's ~300kB and there are no-dependencies), using the [DistributedCache][] or by provisioning the cluster manually. Note that os-hadoop supports both the so-called 'old' and the 'new' API through its `OpenSearchInputFormat` and `OpenSearchOutputFormat` classes. ### 'Old' (`org.apache.hadoop.mapred`) API ### Reading To read data from OpenSearch, configure the `OpenSearchInputFormat` on your job configuration along with the relevant [properties](#configuration-properties): ```java JobConf conf = new JobConf(); conf.setInputFormat(OpenSearchInputFormat.class); conf.set("opensearch.resource", "radio/artists"); conf.set("opensearch.query", "?q=me*"); // replace this with the relevant query ... JobClient.runJob(conf); ``` ### Writing Same configuration template can be used for writing but using `OpenSearchOuputFormat`: ```java JobConf conf = new JobConf(); conf.setOutputFormat(OpenSearchOutputFormat.class); conf.set("opensearch.resource", "radio/artists"); // index or indices used for storing data ... JobClient.runJob(conf); ``` ### 'New' (`org.apache.hadoop.mapreduce`) API ### Reading ```java Configuration conf = new Configuration(); conf.set("opensearch.resource", "radio/artists"); conf.set("opensearch.query", "?q=me*"); // replace this with the relevant query Job job = new Job(conf) job.setInputFormatClass(OpenSearchInputFormat.class); ... job.waitForCompletion(true); ``` ### Writing ```java Configuration conf = new Configuration(); conf.set("opensearch.resource", "radio/artists"); // index or indices used for storing data Job job = new Job(conf) job.setOutputFormatClass(OpenSearchOutputFormat.class); ... job.waitForCompletion(true); ``` ### Signing requests for IAM authentication Signing requests would require the [aws-sdk-bundle](https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle) in your job classpath. ```java Configuration conf = new Configuration(); conf.set("opensearch.resource", "radio/artists"); conf.set("opensearch.query", "?q=me*"); // replace this with the relevant query conf.set("opensearch.nodes", "https://search-xxx.us-east-1.es.amazonaws.com"); conf.set("opensearch.port", "443"); conf.set("opensearch.net.ssl", "true"); conf.set("opensearch.nodes.wan.only", "true"); conf.set("opensearch.aws.sigv4.enabled", "true"); conf.set("opensearch.aws.sigv4.region", "us-east-1"); Job job = new Job(conf) job.setInputFormatClass(OpenSearchInputFormat.class); ... job.waitForCompletion(true); ``` ## [Apache Hive][] OpenSearch-Hadoop provides a Hive storage handler for OpenSearch, meaning one can define an [external table][] on top of OpenSearch. Add opensearch-hadoop-.jar to `hive.aux.jars.path` or register it manually in your Hive script (recommended): ``` ADD JAR /path_to_jar/opensearch-hadoop-.jar; ``` ### Reading To read data from OpenSearch, define a table backed by the desired index: ```SQL CREATE EXTERNAL TABLE artists ( id BIGINT, name STRING, links STRUCT) STORED BY 'org.opensearch.hive.hadoop.OpenSearchStorageHandler' TBLPROPERTIES('opensearch.resource' = 'radio/artists', 'opensearch.query' = '?q=me*'); ``` The fields defined in the table are mapped to the JSON when communicating with OpenSearch. Notice the use of `TBLPROPERTIES` to define the location, that is the query used for reading from this table. Once defined, the table can be used just like any other: ```SQL SELECT * FROM artists; ``` ### Writing To write data, a similar definition is used but with a different `opensearch.resource`: ```SQL CREATE EXTERNAL TABLE artists ( id BIGINT, name STRING, links STRUCT) STORED BY 'org.opensearch.hive.hadoop.OpenSearchStorageHandler' TBLPROPERTIES('opensearch.resource' = 'radio/artists'); ``` Any data passed to the table is then passed down to OpenSearch; for example considering a table `s`, mapped to a TSV/CSV file, one can index it to OpenSearch like this: ```SQL INSERT OVERWRITE TABLE artists SELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture) FROM source s; ``` As one can note, currently the reading and writing are treated separately but we're working on unifying the two and automatically translating [HiveQL][] to OpenSearch queries. ### Signing requests for IAM authentication Signing requests would require the [aws-sdk-bundle](https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle) in your job classpath. ```SQL CREATE EXTERNAL TABLE artists ( id BIGINT, name STRING, links STRUCT) STORED BY 'org.opensearch.hadoop.hive.OpenSearchStorageHandler' TBLPROPERTIES( 'opensearch.nodes' = 'https://search-xxx.us-east-1.es.amazonaws.com', 'opensearch.port' = '443', 'opensearch.net.ssl' = 'true', 'opensearch.resource' = 'artists', 'opensearch.nodes.wan.only' = 'true', 'opensearch.aws.sigv4.enabled' = 'true', 'opensearch.aws.sigv4.region' = 'us-east-1' ); ``` ## [Apache Spark][] OpenSearch-Hadoop provides native (Java and Scala) integration with Spark: for reading a dedicated `RDD` and for writing, methods that work on any `RDD`. Spark SQL is also supported ### Scala ### Reading To read data from OpenSearch, create a dedicated `RDD` and specify the query as an argument: ```scala import org.opensearch.spark._ .. val conf = ... val sc = new SparkContext(conf) sc.opensearchRDD("radio/artists", "?q=me*") ``` #### Spark SQL ```scala import org.opensearch.spark.sql._ // DataFrame schema automatically inferred val df = sqlContext.read.format("opensearch").load("buckethead/albums") // operations get pushed down and translated at runtime to OpenSearch QueryDSL val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016))) ``` ### Writing Import the `org.opensearch.spark._` package to gain `savetoOpenSearch` methods on your `RDD`s: ```scala import org.opensearch.spark._ val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD(Seq(numbers, airports)).saveToOpenSearch("spark/docs") ``` #### Spark SQL ```scala import org.opensearch.spark.sql._ val df = sqlContext.read.json("examples/people.json") df.saveToOpenSearch("spark/people") ``` ### Java In a Java environment, use the `org.opensearch.spark.rdd.java.api` package, in particular the `JavaOpenSearchSpark` class. ### Reading To read data from OpenSearch, create a dedicated `RDD` and specify the query as an argument. ```java import org.apache.spark.api.java.JavaSparkContext; import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark; SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD> opensearchRDD = JavaOpenSearchSpark.opensearchRDD(jsc, "radio/artists"); ``` #### Spark SQL ```java SQLContext sql = new SQLContext(sc); DataFrame df = sql.read().format("opensearch").load("buckethead/albums"); DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016))) ``` ### Writing Use `JavaOpenSearchSpark` to index any `RDD` to OpenSearch: ```java import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark; SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map numbers = ImmutableMap.of("one", 1, "two", 2); Map airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaOpenSearchSpark.saveToOpenSearch(javaRDD, "spark/docs"); ``` #### Spark SQL ```java import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL; DataFrame df = sqlContext.read.json("examples/people.json") JavaOpenSearchSparkSQL.saveToOpenSearch(df, "spark/docs") ``` ### Signing requests for IAM authentication Signing requests would require the [aws-sdk-bundle](https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle) in your job classpath. ```scala import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.opensearch.spark.sql._ val sql = new SQLContext(sc) val accountsRead = sql.read.json("/Users/user/release/maximus/data/accounts.json") val options = Map("pushdown" -> "true", "opensearch.nodes" -> "https://aos-2.3-domain", "opensearch.aws.sigv4.enabled" -> "true", "opensearch.aws.sigv4.region" -> "us-west-2", "opensearch.nodes.resolve.hostname" -> "false", "opensearch.nodes.wan.only" -> "true", "opensearch.net.ssl" -> "true") accountsRead.saveToOpenSearch("accounts-00001", options) ``` [Map/Reduce]: http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html [Apache Hive]: http://hive.apache.org [Apache Spark]: http://spark.apache.org [HiveQL]: http://cwiki.apache.org/confluence/display/Hive/LanguageManual [external table]: http://cwiki.apache.org/Hive/external-tables.html [DistributedCache]: http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/filecache/DistributedCache.html