/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.opensearch.hadoop.rest; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.opensearch.hadoop.OpenSearchHadoopIllegalStateException; import org.opensearch.hadoop.cfg.Settings; import org.opensearch.hadoop.rest.bulk.BulkProcessor; import org.opensearch.hadoop.rest.bulk.BulkResponse; import org.opensearch.hadoop.rest.query.QueryUtils; import org.opensearch.hadoop.rest.stats.Stats; import org.opensearch.hadoop.rest.stats.StatsAware; import org.opensearch.hadoop.serialization.ScrollReader; import org.opensearch.hadoop.serialization.ScrollReader.Scroll; import org.opensearch.hadoop.serialization.ScrollReaderConfigBuilder; import org.opensearch.hadoop.serialization.builder.JdkValueReader; import org.opensearch.hadoop.serialization.bulk.BulkCommands; import org.opensearch.hadoop.serialization.bulk.BulkEntryWriter; import org.opensearch.hadoop.serialization.bulk.MetadataExtractor; import org.opensearch.hadoop.serialization.dto.NodeInfo; import org.opensearch.hadoop.serialization.dto.ShardInfo; import org.opensearch.hadoop.serialization.dto.mapping.GeoField; import org.opensearch.hadoop.serialization.dto.mapping.GeoField.GeoType; import org.opensearch.hadoop.serialization.dto.mapping.Mapping; import org.opensearch.hadoop.serialization.dto.mapping.MappingSet; import org.opensearch.hadoop.serialization.dto.mapping.MappingUtils; import org.opensearch.hadoop.serialization.handler.read.impl.AbortOnlyHandlerLoader; import org.opensearch.hadoop.util.Assert; import org.opensearch.hadoop.util.BytesArray; import org.opensearch.hadoop.util.BytesRef; import org.opensearch.hadoop.util.SettingsUtils; import org.opensearch.hadoop.util.StringUtils; import org.opensearch.hadoop.util.unit.TimeValue; /** * Rest client performing high-level operations using buffers to improve performance. Stateful in that once created, it * is used to perform updates against the same index. */ public class RestRepository implements Closeable, StatsAware { private static Log log = LogFactory.getLog(RestRepository.class); // wrapper around existing BA (for cases where the serialization already occurred) private BytesRef trivialBytesRef; private boolean writeInitialized = false; private RestClient client; // optional extractor passed lazily to BulkCommand private MetadataExtractor metaExtractor; private BulkEntryWriter bulkEntryWriter; private BulkProcessor bulkProcessor; // Internal private static class Resources { private final Settings resourceSettings; private Resource resourceRead; private Resource resourceWrite; public Resources(Settings resourceSettings) { this.resourceSettings = resourceSettings; } public Resource getResourceRead() { if (resourceRead == null) { if (StringUtils.hasText(resourceSettings.getResourceRead())) { resourceRead = new Resource(resourceSettings, true); } } return resourceRead; } public Resource getResourceWrite() { if (resourceWrite == null) { if (StringUtils.hasText(resourceSettings.getResourceWrite())) { resourceWrite = new Resource(resourceSettings, false); } } return resourceWrite; } } private final Settings settings; private Resources resources; private final Stats stats = new Stats(); public RestRepository(Settings settings) { this.settings = settings; this.resources = new Resources(settings); // Check if we have a read resource first, and if not, THEN check the write resource // The write resource has more strict parsing rules, and if the process is only reading // with a resource that isn't good for writing, then eagerly parsing the resource as a // write resource can erroneously throw an error. Instead, we should just get the write // resource lazily as needed. Assert.isTrue(resources.getResourceRead() != null || resources.getResourceWrite() != null, "Invalid configuration - No read or write resource specified"); this.client = new RestClient(settings); } /** postpone writing initialization since we can do only reading so there's no need to allocate buffers */ private void lazyInitWriting() { if (!writeInitialized) { this.writeInitialized = true; this.bulkProcessor = new BulkProcessor(client, resources.getResourceWrite(), settings); this.trivialBytesRef = new BytesRef(); this.bulkEntryWriter = new BulkEntryWriter(settings, BulkCommands.create(settings, metaExtractor, client.clusterInfo.getMajorVersion())); } } ScrollQuery scanAll(String query, BytesArray body, ScrollReader reader) { return scanLimit(query, body, -1, reader); } /** * Returns a pageable (scan based) result to the given query. * * @param query scan query * @param reader scroll reader * @return a scroll query */ ScrollQuery scanLimit(String query, BytesArray body, long limit, ScrollReader reader) { return new ScrollQuery(this, query, body, limit, reader); } public void addRuntimeFieldExtractor(MetadataExtractor metaExtractor) { this.metaExtractor = metaExtractor; } /** * Writes the objects to index. * * @param object object to add to the index */ public void writeToIndex(Object object) { Assert.notNull(object, "no object data given"); lazyInitWriting(); BytesRef serialized = bulkEntryWriter.writeBulkEntry(object); if (serialized != null) { doWriteToIndex(serialized); } } /** * Writes the objects to index. * * @param ba The data as a bytes array */ public void writeProcessedToIndex(BytesArray ba) { Assert.notNull(ba, "no data given"); Assert.isTrue(ba.length() > 0, "no data given"); lazyInitWriting(); trivialBytesRef.reset(); trivialBytesRef.add(ba); doWriteToIndex(trivialBytesRef); } private void doWriteToIndex(BytesRef payload) { bulkProcessor.add(payload); payload.reset(); } public BulkResponse tryFlush() { if (writeInitialized) { return bulkProcessor.tryFlush(); } else { log.warn("Attempt to flush before any data had been written"); return BulkResponse.complete(); } } public void flush() { if (writeInitialized) { bulkProcessor.flush(); } else { log.warn("Attempt to flush before any data had been written"); } } @Override public void close() { if (log.isDebugEnabled()) { log.debug("Closing repository and connection to OpenSearch ..."); } // bail out if closed before if (client == null) { return; } try { if (bulkProcessor != null) { bulkProcessor.close(); // Aggregate stats before discarding them. stats.aggregate(bulkProcessor.stats()); bulkProcessor = null; } if (bulkEntryWriter != null) { bulkEntryWriter.close(); bulkEntryWriter = null; } } finally { client.close(); // Aggregate stats before discarding them. stats.aggregate(client.stats()); client = null; } } public RestClient getRestClient() { return client; } public List>> getReadTargetShards() { for (int retries = 0; retries < 3; retries++) { List>> result = doGetReadTargetShards(); if (result != null) { return result; } } throw new OpenSearchHadoopIllegalStateException("Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable"); } protected List>> doGetReadTargetShards() { return client.targetShards(resources.getResourceRead().index(), SettingsUtils.getFixedRouting(settings)); } public Map getWriteTargetPrimaryShards(boolean clientNodesOnly) { for (int retries = 0; retries < 3; retries++) { Map map = doGetWriteTargetPrimaryShards(clientNodesOnly); if (map != null) { return map; } } throw new OpenSearchHadoopIllegalStateException("Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable"); } protected Map doGetWriteTargetPrimaryShards(boolean clientNodesOnly) { List>> info = client.targetShards(resources.getResourceWrite().index(), SettingsUtils.getFixedRouting(settings)); Map shards = new LinkedHashMap(); List nodes = client.getHttpNodes(clientNodesOnly); Map nodeMap = new HashMap(nodes.size()); for (NodeInfo node : nodes) { nodeMap.put(node.getId(), node); } for (List> shardGroup : info) { // consider only primary shards for (Map shardData : shardGroup) { ShardInfo shard = new ShardInfo(shardData); if (shard.isPrimary()) { NodeInfo node = nodeMap.get(shard.getNode()); if (node == null) { log.warn(String.format("Cannot find node with id [%s] (is HTTP enabled?) from shard [%s] in nodes [%s]; layout [%s]", shard.getNode(), shard, nodes, info)); return null; } shards.put(shard, node); break; } } } return shards; } public MappingSet getMappings() { return client.getMappings(resources.getResourceRead()); } public Map sampleGeoFields(Mapping mapping) { Map fields = MappingUtils.geoFields(mapping); Map geoMapping = client.sampleForFields(resources.getResourceRead(), fields.keySet()); Map geoInfo = new LinkedHashMap(); for (Entry geoEntry : fields.entrySet()) { String fieldName = geoEntry.getKey(); geoInfo.put(fieldName, MappingUtils.parseGeoInfo(geoEntry.getValue(), geoMapping.get(fieldName))); } return geoInfo; } // used to initialize a scroll (based on a query) Scroll scroll(String query, BytesArray body, ScrollReader reader) throws IOException { InputStream scroll = client.execute(Request.Method.POST, query, body).body(); try { Scroll scrollResult = reader.read(scroll); if (scrollResult == null) { log.info(String.format("No scroll for query [%s/%s], likely because the index is frozen", query, body)); } return scrollResult; } finally { if (scroll != null && scroll instanceof StatsAware) { stats.aggregate(((StatsAware) scroll).stats()); } } } // consume the scroll Scroll scroll(String scrollId, ScrollReader reader) throws IOException { InputStream scroll = client.scroll(scrollId); try { return reader.read(scroll); } finally { if (scroll instanceof StatsAware) { stats.aggregate(((StatsAware) scroll).stats()); } } } public boolean resourceExists(boolean read) { Resource res = (read ? resources.getResourceRead() : resources.getResourceWrite()); // cheap hit - works for exact index names, index patterns, the `_all` resource, and alias names boolean exists = client.indexExists(res.index()); // Do we really care if it's typed? // Yes! If the index exists and a type is given, the type should exist on the index as well. if (exists && res.isTyped()) { exists = client.typeExists(res.index(), res.type()); } // could be a _all or a pattern which is valid for read // try again by asking the mapping - could be expensive if (!exists && read) { try { // make sure the mapping is null since the index might exist but the type might be missing MappingSet mappings = client.getMappings(res); exists = mappings != null && !mappings.isEmpty(); } catch (OpenSearchHadoopInvalidRequest ex) { exists = false; } } return exists; } public boolean touch() { return client.touch(resources.getResourceWrite().index()); } public void delete() { // try first a blind delete by query (since the plugin might be installed) try { if (resources.getResourceWrite().isTyped()) { client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type() + "/_query?q=*"); } else { client.delete(resources.getResourceWrite().index() + "/_query?q=*"); } } catch (OpenSearchHadoopInvalidRequest ehir) { log.info("Skipping delete by query as the plugin is not installed..."); } // in ES 2.0 and higher this means scrolling and deleting the docs by hand... // do a scroll-scan without source // as this is a delete, there's not much value in making this configurable so we just go for some sane/safe defaults // 10m scroll timeout // 250 results int batchSize = 500; StringBuilder sb = new StringBuilder(resources.getResourceWrite().index()); if (resources.getResourceWrite().isTyped()) { sb.append('/').append(resources.getResourceWrite().type()); } sb.append("/_search?scroll=10m&_source=false&size="); sb.append(batchSize); sb.append("&sort=_doc"); String scanQuery = sb.toString(); ScrollReader scrollReader = new ScrollReader( ScrollReaderConfigBuilder.builder(new JdkValueReader(), settings) .setReadMetadata(true) .setMetadataName("_metadata") .setReturnRawJson(false) .setIgnoreUnmappedFields(false) .setIncludeFields(Collections.emptyList()) .setExcludeFields(Collections.emptyList()) .setIncludeArrayFields(Collections.emptyList()) .setErrorHandlerLoader(new AbortOnlyHandlerLoader()) // Only abort since this is internal ); // start iterating ScrollQuery sq = scanAll(scanQuery, null, scrollReader); try { BytesArray entry = new BytesArray(0); // delete each retrieved batch, keep routing in mind: String baseFormat = "{\"delete\":{\"_id\":\"%s\"}}\n"; String routedFormat; routedFormat = "{\"delete\":{\"_id\":\"%s\", \"routing\":\"%s\"}}\n"; boolean hasData = false; while (sq.hasNext()) { hasData = true; entry.reset(); Object[] kv = sq.next(); @SuppressWarnings("unchecked") Map value = (Map) kv[1]; @SuppressWarnings("unchecked") Map metadata = (Map) value.get("_metadata"); String routing = (String) metadata.get("_routing"); String encodedId = StringUtils.jsonEncoding((String) kv[0]); if (StringUtils.hasText(routing)) { String encodedRouting = StringUtils.jsonEncoding(routing); entry.add(StringUtils.toUTF(String.format(routedFormat, encodedId, encodedRouting))); } else { entry.add(StringUtils.toUTF(String.format(baseFormat, encodedId))); } writeProcessedToIndex(entry); } if (hasData) { flush(); // once done force a refresh client.refresh(resources.getResourceWrite()); } } finally { stats.aggregate(sq.stats()); sq.close(); } } public boolean isEmpty(boolean read) { Resource res = (read ? resources.getResourceRead() : resources.getResourceWrite()); boolean exists = client.indexExists(res.index()); return (exists ? count(read) <= 0 : true); } public long count(boolean read) { Resource res = (read ? resources.getResourceRead() : resources.getResourceWrite()); if (res.isTyped()) { return client.count(res.index(), res.type(), QueryUtils.parseQuery(settings)); } else { return client.count(res.index(), QueryUtils.parseQuery(settings)); } } public boolean waitForYellow() { return client.waitForHealth(resources.getResourceWrite().index(), RestClient.Health.YELLOW, TimeValue.timeValueSeconds(10)); } @Override public Stats stats() { Stats copy = new Stats(stats); if (client != null) { // Aggregate stats if it's not already discarded copy.aggregate(client.stats()); } if (bulkProcessor != null) { // Aggregate stats if it's not already discarded copy.aggregate(bulkProcessor.stats()); } return copy; } public Settings getSettings() { return settings; } }