/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.index.seqno; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RetentionLeaseSyncActionTests extends OpenSearchTestCase { private ThreadPool threadPool; private CapturingTransport transport; private ClusterService clusterService; private TransportService transportService; private ShardStateAction shardStateAction; public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = transport.createTransportService( clusterService.getSettings(), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet() ); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); } public void tearDown() throws Exception { try { IOUtils.close(transportService, clusterService, transport); } finally { terminate(threadPool); } super.tearDown(); } public void testRetentionLeaseSyncActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( Settings.EMPTY, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()) ); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); // we should forward the request containing the current retention leases to the replica assertThat(result.replicaRequest(), sameInstance(request)); // we should start with an empty replication response assertNull(result.finalResponseIfSuccessful.getShardInfo()); })); } public void testRetentionLeaseSyncActionOnReplica() throws Exception { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( Settings.EMPTY, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()) ); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); PlainActionFuture listener = PlainActionFuture.newFuture(); action.dispatchedShardOperationOnReplica(request, indexShard, listener); final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); } public void testBlocks() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( Settings.EMPTY, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()) ); assertNull(action.indexBlockLevel()); } public void testGetReplicationModeWithRemoteTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } private RetentionLeaseSyncAction createAction() { return new RetentionLeaseSyncAction( Settings.EMPTY, transportService, clusterService, mock(IndicesService.class), threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()) ); } }