/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.index.engine; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.opensearch.Version; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.NoOpTranslogManager; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.transport.Transports; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; import java.util.function.Function; /** * A basic read-only engine that allows switching a shard to be true read-only temporarily or permanently. * Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write * engine. * * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean) * * @opensearch.internal */ public class ReadOnlyEngine extends Engine { /** * Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an * ID field. */ private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; private final OpenSearchReaderManager readerManager; private final IndexCommit indexCommit; private final Lock indexWriterLock; private final SafeCommitInfo safeCommitInfo; private final CompletionStatsCache completionStatsCache; private final boolean requireCompleteHistory; private final TranslogManager translogManager; private final Version minimumSupportedVersion; protected volatile TranslogStats translogStats; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened * read-write engine. It allows to optionally obtain the writer locks for the shard which would time-out if another * engine is still open. * * @param config the engine configuration * @param seqNoStats sequence number statistics for this engine or null if not provided * @param translogStats translog stats for this engine or null if not provided * @param obtainLock if true this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise * the lock won't be obtained * @param readerWrapperFunction allows to wrap the index-reader for this engine. * @param requireCompleteHistory indicates whether this engine permits an incomplete history (i.e. LCP < MSN) */ public ReadOnlyEngine( EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, Function readerWrapperFunction, boolean requireCompleteHistory ) { super(config); this.requireCompleteHistory = requireCompleteHistory; // fetch the minimum Version for extended backward compatibility use-cases this.minimumSupportedVersion = config.getIndexSettings().getExtendedCompatibilitySnapshotVersion(); try { Store store = config.getStore(); store.incRef(); OpenSearchDirectoryReader reader = null; Directory directory = store.directory(); Lock indexWriterLock = null; boolean success = false; try { // we obtain the IW lock even though we never modify the index. // yet this makes sure nobody else does. including some testing tools that try to be messy indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; if (isExtendedCompatibility()) { this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory, this.minimumSupportedVersion); } else { this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); } if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); } this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); reader = wrapReader(open(indexCommit), readerWrapperFunction); readerManager = new OpenSearchReaderManager(reader); assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time"; this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); translogManager = new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() { @Override public void close() {} @Override public int totalOperations() { return 0; } @Override public Translog.Operation next() { return null; } }); success = true; } finally { if (success == false) { IOUtils.close(reader, indexWriterLock, store::decRef); } } } catch (IOException e) { throw new UncheckedIOException(e); // this is stupid } } protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { if (requireCompleteHistory == false) { return; } // Before 3.0 the global checkpoint is not known and up to date when the engine is created after // peer recovery, so we only check the max seq no / global checkpoint coherency when the global // checkpoint is different from the unassigned sequence number value. // In addition to that we only execute the check if the index the engine belongs to has been // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction // that guarantee that all operations have been flushed to Lucene. assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint()); if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { throw new IllegalStateException( "Maximum sequence number [" + seqNoStats.getMaxSeqNo() + "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]" ); } } protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { assert maxSeqNo == globalCheckpoint : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]"; return true; } @Override public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is verified when the read-only engine is opened, // and it is not expected to change during the lifecycle of the engine. We could also // check this value before closing the read-only engine but if something went wrong // and the global checkpoint is not in-sync with the max. sequence number anymore, // checking the value here again would prevent the read-only engine to be closed and // reopened as an internal engine, which would be the path to fix the issue. } protected final OpenSearchDirectoryReader wrapReader( DirectoryReader reader, Function readerWrapperFunction ) throws IOException { reader = readerWrapperFunction.apply(reader); return OpenSearchDirectoryReader.wrap(reader, engineConfig.getShardId()); } protected DirectoryReader open(IndexCommit commit) throws IOException { assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); DirectoryReader reader; if (isExtendedCompatibility()) { reader = DirectoryReader.open(commit, this.minimumSupportedVersion.luceneVersion.major, null); } else { reader = DirectoryReader.open(commit); } return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); } private boolean isExtendedCompatibility() { return Version.CURRENT.minimumIndexCompatibilityVersion().onOrAfter(this.minimumSupportedVersion); } @Override protected void closeNoLock(String reason, CountDownLatch closedLatch) { if (isClosed.compareAndSet(false, true)) { try { IOUtils.close(readerManager, indexWriterLock, store::decRef); } catch (Exception ex) { logger.warn("failed to close reader", ex); } finally { closedLatch.countDown(); } } } private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos infos) { final SequenceNumbers.CommitInfo seqNoStats = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet()); long maxSeqNo = seqNoStats.maxSeqNo; long localCheckpoint = seqNoStats.localCheckpoint; return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong()); } private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException { final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY); if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy( config.getIndexSettings().getTranslogRetentionSize().getBytes(), config.getIndexSettings().getTranslogRetentionAge().getMillis(), config.getIndexSettings().getTranslogRetentionTotalFiles() ); final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try ( Translog translog = config.getTranslogFactory() .newTranslog( translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}, config.getPrimaryModeSupplier() ) ) { return translog.stats(); } } @Override public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); } @Override protected ReferenceManager getReferenceManager(SearcherScope scope) { return readerManager; } @Override public TranslogManager translogManager() { return translogManager; } @Override protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; } @Override protected SegmentInfos getLatestSegmentInfos() { return lastCommittedSegmentInfos; } @Override public String getHistoryUUID() { return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); } @Override public long getWritingBytes() { return 0; } @Override public long getIndexThrottleTimeInMillis() { return 0; } @Override public boolean isThrottled() { return false; } @Override public IndexResult index(Index index) { assert false : "this should not be called"; throw new UnsupportedOperationException("indexing is not supported on a read-only engine"); } @Override public DeleteResult delete(Delete delete) { assert false : "this should not be called"; throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; throw new UnsupportedOperationException("no-ops are not supported on a read-only engine"); } @Override public Closeable acquireHistoryRetentionLock() { return () -> {}; } @Override public Translog.Snapshot newChangesSnapshot( String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean accurateCount ) { return newEmptySnapshot(); } @Override public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException { try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) { return snapshot.totalOperations(); } } public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { // we can do operation-based recovery if we don't have to replay any operation. return startingSeqNo > seqNoStats.getMaxSeqNo(); } @Override public long getMinRetainedSeqNo() { throw new UnsupportedOperationException(); } @Override public long getPersistedLocalCheckpoint() { return seqNoStats.getLocalCheckpoint(); } @Override public long getProcessedLocalCheckpoint() { // the read-only engine does not process checkpoints, so its // processed checkpoint is identical to its persisted one. return getPersistedLocalCheckpoint(); } @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); } @Override public long getLastSyncedGlobalCheckpoint() { return seqNoStats.getGlobalCheckpoint(); } @Override public long getIndexBufferRAMBytesUsed() { return 0; } @Override public List segments(boolean verbose) { return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); } @Override public void refresh(String source) { // we could allow refreshes if we want down the road the reader manager will then reflect changes to a rw-engine // opened side-by-side } @Override public boolean maybeRefresh(String source) throws EngineException { return false; } @Override public void writeIndexingBuffer() throws EngineException {} @Override public boolean shouldPeriodicallyFlush() { return false; } @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} @Override public void forceMerge( boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID ) {} @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) { store.incRef(); return new GatedCloseable<>(indexCommit, store::decRef); } @Override public GatedCloseable acquireSafeIndexCommit() { return acquireLastIndexCommit(false); } @Override public SafeCommitInfo getSafeCommitInfo() { return safeCommitInfo; } @Override public void activateThrottling() {} @Override public void deactivateThrottling() {} @Override public int fillSeqNoGaps(long primaryTerm) { return 0; } @Override public void maybePruneDeletes() {} @Override public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } @Override public boolean refreshNeeded() { return false; } private Translog.Snapshot newEmptySnapshot() { return new Translog.Snapshot() { @Override public void close() {} @Override public int totalOperations() { return 0; } @Override public Translog.Operation next() { return null; } }; } @Override public long getMaxSeqNoOfUpdatesOrDeletes() { return seqNoStats.getMaxSeqNo(); } @Override public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() : maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes(); } protected static DirectoryReader openDirectory(Directory directory, boolean wrapSoftDeletes) throws IOException { assert Transports.assertNotTransportThread("opening directory reader of a read-only engine"); final DirectoryReader reader = DirectoryReader.open(directory); if (wrapSoftDeletes) { return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); } else { return reader; } } @Override public CompletionStats completionStats(String... fieldNamePatterns) { return completionStatsCache.get(fieldNamePatterns); } }