/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ package org.opensearch.index.translog.transfer; import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; private ShardId shardId; private BlobPath remoteBaseTransferPath; private ThreadPool threadPool; private long primaryTerm; private long generation; private long minTranslogGeneration; @Override public void setUp() throws Exception { super.setUp(); primaryTerm = randomNonNegativeLong(); generation = randomNonNegativeLong(); shardId = mock(ShardId.class); when(shardId.getIndex()).thenReturn(new Index("index", "indexUUid")); minTranslogGeneration = randomLongBetween(0, generation); remoteBaseTransferPath = new BlobPath().add("base_path"); transferService = mock(TransferService.class); threadPool = new TestThreadPool(getClass().getName()); } @Override public void tearDown() throws Exception { super.tearDown(); terminate(threadPool); } @SuppressWarnings("unchecked") public void testTransferSnapshot() throws Exception { AtomicInteger fileTransferSucceeded = new AtomicInteger(); AtomicInteger fileTransferFailed = new AtomicInteger(); AtomicInteger translogTransferSucceeded = new AtomicInteger(); AtomicInteger translogTransferFailed = new AtomicInteger(); doNothing().when(transferService) .uploadBlob( any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), any(WritePriority.class) ); doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; transferFileSnapshots.forEach(listener::onResponse); return null; }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override public void onSuccess(TransferFileSnapshot fileSnapshot) { fileTransferSucceeded.incrementAndGet(); super.onSuccess(fileSnapshot); } @Override public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { fileTransferFailed.incrementAndGet(); super.onFailure(fileSnapshot, e); } }; TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, fileTransferTracker ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @Override public void onUploadComplete(TransferSnapshot transferSnapshot) { translogTransferSucceeded.incrementAndGet(); } @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { translogTransferFailed.incrementAndGet(); } })); assertEquals(4, fileTransferSucceeded.get()); assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); assertEquals(4, fileTransferTracker.allUploaded().size()); } private TransferSnapshot createTransferSnapshot() { return new TransferSnapshot() { @Override public Set getCheckpointFileSnapshots() { try { return Set.of( new CheckpointFileSnapshot( primaryTerm, generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), null ), new CheckpointFileSnapshot( primaryTerm, generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), null ) ); } catch (IOException e) { throw new AssertionError("Failed to create temp file", e); } } @Override public Set getTranslogFileSnapshots() { try { return Set.of( new TranslogFileSnapshot( primaryTerm, generation, createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), null ), new TranslogFileSnapshot( primaryTerm, generation - 1, createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), null ) ); } catch (IOException e) { throw new AssertionError("Failed to create temp file", e); } } @Override public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); } }; } public void testReadMetadataNoFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); latchedActionListener.onResponse(bmList); return null; }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); assertNull(translogTransferManager.readMetadata()); } // This should happen most of the time - Just a single metadata file public void testReadMetadataSingleFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn( new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); } public void testReadMetadataReadException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } public void testMetadataFileNameOrder() throws IOException { // asserting that new primary followed new generation are lexicographically smallest String mdFilenameGen1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); String mdFilenameGen2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); String mdFilenamePrimary2 = new TranslogTransferMetadata(2, 1, 1, 2).getFileName(); List metadataFiles = Arrays.asList(mdFilenameGen1, mdFilenameGen2, mdFilenamePrimary2); Collections.sort(metadataFiles); assertEquals(Arrays.asList(mdFilenamePrimary2, mdFilenameGen2, mdFilenameGen1), metadataFiles); } public void testReadMetadataListException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onFailure(new IOException("Issue while listing")); return null; }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } public void testDownloadTranslog() throws IOException { Path location = createTempDir(); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, new FileTransferTracker(new ShardId("index", "indexUuid", 0)) ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) ); assertFalse(Files.exists(location.resolve("translog-23.tlog"))); assertFalse(Files.exists(location.resolve("translog-23.ckp"))); translogTransferManager.downloadTranslog("12", "23", location); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } public void testDownloadTranslogAlreadyExists() throws IOException { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); Files.createFile(location.resolve("translog-23.tlog")); Files.createFile(location.resolve("translog-23.ckp")); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, tracker ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) ); translogTransferManager.downloadTranslog("12", "23", location); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } public void testDownloadTranslogWithTrackerUpdated() throws IOException { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); String translogFile = "translog-23.tlog", checkpointFile = "translog-23.ckp"; Files.createFile(location.resolve(translogFile)); Files.createFile(location.resolve(checkpointFile)); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, tracker ); when(transferService.downloadBlob(any(BlobPath.class), eq(translogFile))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) ); when(transferService.downloadBlob(any(BlobPath.class), eq(checkpointFile))).thenReturn( new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) ); translogTransferManager.downloadTranslog("12", "23", location); verify(transferService).downloadBlob(any(BlobPath.class), eq(translogFile)); verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); assertTrue(Files.exists(location.resolve(translogFile))); assertTrue(Files.exists(location.resolve(checkpointFile))); // Since the tracker already holds the files with success state, adding them with failed state would throw exception assertThrows(IllegalStateException.class, () -> tracker.add(translogFile, false)); assertThrows(IllegalStateException.class, () -> tracker.add(checkpointFile, false)); // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); } public void testDeleteTranslogSuccess() throws Exception { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); BlobStore blobStore = mock(BlobStore.class); BlobContainer blobContainer = mock(BlobContainer.class); when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, remoteBaseTransferPath, tracker ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); tracker.add(checkpointFile, true); assertEquals(2, tracker.allUploaded().size()); List files = List.of(checkpointFile, translogFile); translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); } public void testDeleteStaleTranslogMetadata() { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName(); doAnswer(invocation -> { ActionListener> actionListener = invocation.getArgument(4); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(tm1, 1)); bmList.add(new PlainBlobMetadata(tm2, 1)); bmList.add(new PlainBlobMetadata(tm3, 1)); actionListener.onResponse(bmList); return null; }).when(transferService) .listAllInSortedOrderAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class) ); List files = List.of(tm2, tm3); translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { verify(transferService).listAllInSortedOrderAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), eq(Integer.MAX_VALUE), any() ); verify(transferService).deleteBlobsAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), eq(files), any(ActionListener.class) ); }); } public void testDeleteTranslogFailure() throws Exception { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); BlobStore blobStore = mock(BlobStore.class); BlobContainer blobContainer = mock(BlobContainer.class); doAnswer(invocation -> { throw new IOException("test exception"); }).when(blobStore).blobContainer(any(BlobPath.class)); // when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, remoteBaseTransferPath, tracker ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); tracker.add(checkpointFile, true); assertEquals(2, tracker.allUploaded().size()); translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); assertEquals(2, tracker.allUploaded().size()); } }