/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ package org.opensearch.index.translog; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.function.LongConsumer; import java.util.function.LongSupplier; /** * A {@link Translog} implementation that creates translog files in local filesystem. * @opensearch.internal */ public class LocalTranslog extends Translog { /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is * {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If * the generation is not {@code null}, this method tries to open the given translog generation. The generation is treated as the last * generation referenced from already committed data. This means all operations that have not yet been committed should be in the * translog file referenced by this generation. The translog creation will fail if this generation can't be opened. * * @param config the configuration of this translog * @param translogUUID the translog uuid to open, null for a new translog * @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely * deleted * @param globalCheckpointSupplier a supplier for the global checkpoint * @param primaryTermSupplier a supplier for the latest value of primary term of the owning index shard. The latest term value is * examined and stored in the header whenever a new generation is rolled. It's guaranteed from outside * that a new generation is rolled when the term is increased. This guarantee allows to us to validate * and reject operation whose term is higher than the primary term stored in the translog header. * @param persistedSequenceNumberConsumer a callback that's called whenever an operation with a given sequence number is successfully * persisted. */ public LocalTranslog( final TranslogConfig config, final String translogUUID, TranslogDeletionPolicy deletionPolicy, final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, final LongConsumer persistedSequenceNumberConsumer ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); try { final Checkpoint checkpoint = readCheckpoint(location); final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); // this is special handling for error condition when we create a new writer but we fail to bake // the newly written file (generation+1) into the checkpoint. This is still a valid state // we just need to cleanup before we continue // we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this: // https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example // // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that // file exists. If not we don't even try to clean it up and wait until we fail creating it assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; if (Files.exists(currentCheckpointFile) // current checkpoint is already copied && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn( "deleted previously created, but not yet committed, next generation [{}]. This can happen due to a" + " tragic exception when creating a new generation", nextTranslogFile.getFileName() ); } this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); } boolean success = false; current = null; try { current = createWriter( checkpoint.generation + 1, getMinFileGeneration(), checkpoint.globalCheckpoint, persistedSequenceNumberConsumer ); success = true; } finally { // we have to close all the recovered ones otherwise we leak file handles here // for instance if we have a lot of tlog and we can't create the writer we keep on holding // on to all the uncommitted tlog files if we don't close if (success == false) { IOUtils.closeWhileHandlingException(readers); } } } catch (Exception e) { // close the opened translog files if we fail to create a new translog... IOUtils.closeWhileHandlingException(current); IOUtils.closeWhileHandlingException(readers); throw e; } } /** * Ensures that the given location has be synced / written to the underlying storage. * * @return Returns true iff this call caused an actual sync operation otherwise false */ @Override public boolean ensureSynced(Location location) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { if (location.generation == current.getGeneration()) { // if we have a new one it's already synced ensureOpen(); return current.syncUpTo(location.translogLocation + location.size); } } catch (final Exception ex) { closeOnTragicEvent(ex); throw ex; } return false; } /** * return stats */ @Override public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { long uncommittedGen = getMinGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1).translogFileGeneration; return new TranslogStats( totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge() ); } } @Override public void close() throws IOException { assert Translog.calledFromOutsideOrViaTragedyClose() : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { try { current.sync(); } finally { closeFilesIfNoPendingRetentionLocks(); } } finally { logger.debug("translog closed"); } } } }