/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.opensearch.hadoop.rest; import org.opensearch.hadoop.cfg.PropertiesSettings; import org.opensearch.hadoop.rest.PartitionDefinition.PartitionDefinitionBuilder; import org.opensearch.hadoop.serialization.dto.mapping.FieldParser; import org.opensearch.hadoop.serialization.dto.mapping.Mapping; import org.opensearch.hadoop.thirdparty.codehaus.jackson.JsonParser; import org.opensearch.hadoop.thirdparty.codehaus.jackson.map.ObjectMapper; import org.opensearch.hadoop.util.BytesArray; import org.opensearch.hadoop.util.FastByteArrayInputStream; import org.opensearch.hadoop.util.FastByteArrayOutputStream; import org.junit.Test; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Map; import static org.junit.Assert.*; public class PartitionDefinitionTest { private Mapping getTestMapping() throws IOException { ObjectMapper mapper = new ObjectMapper(); JsonParser jsonParser = mapper.getJsonFactory() .createJsonParser(getClass().getResourceAsStream("/org/opensearch/hadoop/serialization/dto/mapping/typeless/basic.json")); Map map = (Map) mapper.readValue(jsonParser, Map.class); return FieldParser.parseTypelessMappings(map).getResolvedView(); } @Test public void testWritable() throws IOException { Mapping mapping = getTestMapping(); PropertiesSettings settings = new PropertiesSettings(); settings.setProperty("setting1", "value1"); settings.setProperty("setting2", "value2"); PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12, new String[] {"localhost:9200", "otherhost:9200"}); BytesArray bytes = writeWritablePartition(expected); PartitionDefinition def = readWritablePartition(bytes); assertPartitionEquals(expected, def); } @Test public void testSerializable() throws IOException, ClassNotFoundException { Mapping mapping = getTestMapping(); PropertiesSettings settings = new PropertiesSettings(); settings.setProperty("setting1", "value1"); settings.setProperty("setting2", "value2"); PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37, new String[] {"localhost:9200", "otherhost:9200"}); BytesArray bytes = writeSerializablePartition(expected); PartitionDefinition def = readSerializablePartition(bytes); assertPartitionEquals(expected, def); } @Test public void testWritableWithSlice() throws IOException { Mapping mapping = getTestMapping(); PropertiesSettings settings = new PropertiesSettings(); settings.setProperty("setting1", "value1"); settings.setProperty("setting2", "value2"); PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12, new PartitionDefinition.Slice(10, 27), new String[] {"localhost:9200", "otherhost:9200"}); BytesArray bytes = writeWritablePartition(expected); PartitionDefinition def = readWritablePartition(bytes); assertPartitionEquals(expected, def); } @Test public void testSerializableWithSlice() throws IOException, ClassNotFoundException { Mapping mapping = getTestMapping(); PropertiesSettings settings = new PropertiesSettings(); settings.setProperty("setting1", "value1"); settings.setProperty("setting2", "value2"); PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37, new PartitionDefinition.Slice(13, 35), new String[] {"localhost:9200", "otherhost:9200"}); BytesArray bytes = writeSerializablePartition(expected); PartitionDefinition def = readSerializablePartition(bytes); assertPartitionEquals(expected, def); } @Test public void testNonDuplicationOfConfiguration() throws IOException { Mapping mapping = getTestMapping(); PropertiesSettings settings = new PropertiesSettings(); settings.setProperty("setting1", "value1"); settings.setProperty("setting2", "value2"); PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, mapping); PartitionDefinition first = partitionBuilder.build("foo", 11, new String[] {"localhost:9200", "otherhost:9200"}); PartitionDefinition second = partitionBuilder.build("foo", 12, new String[] {"localhost:9200", "otherhost:9200"}); assertNotEquals(first, second); assertSame(first.getSerializedSettings(), second.getSerializedSettings()); assertSame(first.getSerializedMapping(), second.getSerializedMapping()); BytesArray bytes = writeWritablePartition(first); PartitionDefinition def = readWritablePartition(bytes); assertPartitionEquals(first, def); assertNotSame(first.getSerializedSettings(), def.getSerializedSettings()); assertNotSame(first.getSerializedMapping(), def.getSerializedMapping()); } static PartitionDefinition readSerializablePartition(BytesArray bytes) throws IOException, ClassNotFoundException { FastByteArrayInputStream in = new FastByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(in); return (PartitionDefinition) ois.readObject(); } static BytesArray writeSerializablePartition(PartitionDefinition def) throws IOException { FastByteArrayOutputStream out = new FastByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(out); try { oos.writeObject(def); oos.flush(); return out.bytes(); } finally { oos.close(); } } static PartitionDefinition readWritablePartition(BytesArray bytes) throws IOException { FastByteArrayInputStream in = new FastByteArrayInputStream(bytes); try { DataInputStream di = new DataInputStream(in); return new PartitionDefinition(di); } finally { in.close(); } } static BytesArray writeWritablePartition(PartitionDefinition def) throws IOException { FastByteArrayOutputStream out = new FastByteArrayOutputStream(); DataOutputStream da = new DataOutputStream(out); try { def.write(da); da.flush(); return out.bytes(); } finally { da.close(); } } static void assertPartitionEquals(PartitionDefinition p1, PartitionDefinition p2) { assertEquals(p1, p2); assertArrayEquals(p1.getLocations(), p2.getLocations()); // the settings, the mapping and the locations are ignored in PartitionDefinition#equals // we need to test them separately assertEquals(p1.getSerializedSettings(), p2.getSerializedSettings()); assertEquals(p1.getSerializedMapping(), p2.getSerializedMapping()); } }