/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.update; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateRequestBuilder; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.transport.NoNodeAvailableException; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.plugins.Plugin; import org.opensearch.core.rest.RestStatus; import org.opensearch.script.MockScriptPlugin; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.InternalSettingsPlugin; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFutureThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class UpdateIT extends OpenSearchIntegTestCase { private static final String UPDATE_SCRIPTS = "update_scripts"; private static final String PUT_VALUES_SCRIPT = "put_values"; private static final String FIELD_INC_SCRIPT = "field_inc"; private static final String UPSERT_SCRIPT = "scripted_upsert"; private static final String EXTRACT_CTX_SCRIPT = "extract_ctx"; public static class UpdateScriptsPlugin extends MockScriptPlugin { @Override public String pluginScriptLang() { return UPDATE_SCRIPTS; } @Override protected Map, Object>> pluginScripts() { Map, Object>> scripts = new HashMap<>(); scripts.put(PUT_VALUES_SCRIPT, vars -> { Map ctx = (Map) vars.get("ctx"); assertNotNull(ctx); Map params = new HashMap<>((Map) vars.get("params")); Map newCtx = (Map) params.remove("_ctx"); if (newCtx != null) { assertFalse(newCtx.containsKey("_source")); ctx.putAll(newCtx); } Map source = (Map) ctx.get("_source"); params.remove("ctx"); source.putAll(params); return ctx; }); scripts.put(FIELD_INC_SCRIPT, vars -> { Map params = (Map) vars.get("params"); String fieldname = (String) vars.get("field"); Map ctx = (Map) vars.get("ctx"); assertNotNull(ctx); Map source = (Map) ctx.get("_source"); Number currentValue = (Number) source.get(fieldname); Number inc = (Number) params.getOrDefault("inc", 1); source.put(fieldname, currentValue.longValue() + inc.longValue()); return ctx; }); scripts.put(UPSERT_SCRIPT, vars -> { Map ctx = (Map) vars.get("ctx"); assertNotNull(ctx); Map source = (Map) ctx.get("_source"); Number payment = (Number) vars.get("payment"); Number oldBalance = (Number) source.get("balance"); int deduction = "create".equals(ctx.get("op")) ? payment.intValue() / 2 : payment.intValue(); source.put("balance", oldBalance.intValue() - deduction); return ctx; }); scripts.put(EXTRACT_CTX_SCRIPT, vars -> { Map ctx = (Map) vars.get("ctx"); assertNotNull(ctx); Map source = (Map) ctx.get("_source"); Map ctxWithoutSource = new HashMap<>(ctx); ctxWithoutSource.remove("_source"); source.put("update_context", ctxWithoutSource); return ctx; }); return scripts; } } @Override protected Collection> nodePlugins() { return Arrays.asList(UpdateScriptsPlugin.class, InternalSettingsPlugin.class); } private void createTestIndex() throws Exception { logger.info("--> creating index test"); assertAcked(prepareCreate("test").addAlias(new Alias("alias").writeIndex(randomFrom(true, null)))); } public void testUpsert() throws Exception { createTestIndex(); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setScript(fieldIncScript) .execute() .actionGet(); assertEquals(DocWriteResponse.Result.CREATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("1")); } updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setScript(fieldIncScript) .execute() .actionGet(); assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); } } public void testScriptedUpsert() throws Exception { createTestIndex(); ensureGreen(); // Script logic is // 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50% // 2) Existing accounts subtract full payment from balance stored in opensearch int openingBalance = 10; Map params = new HashMap<>(); params.put("payment", 2); // Pay money from what will be a new account and opening balance comes from upsert doc // provided by client UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) .setScriptedUpsert(true) .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, UPSERT_SCRIPT, params)) .execute() .actionGet(); assertEquals(DocWriteResponse.Result.CREATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9")); } // Now pay money for an existing account where balance is stored in es updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) .setScriptedUpsert(true) .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, UPSERT_SCRIPT, params)) .execute() .actionGet(); assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7")); } } public void testUpsertDoc() throws Exception { createTestIndex(); ensureGreen(); UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setDocAsUpsert(true) .setFetchSource(true) .execute() .actionGet(); assertThat(updateResponse.getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); } // Issue #3265 public void testNotUpsertDoc() throws Exception { createTestIndex(); ensureGreen(); assertFutureThrows( client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setDocAsUpsert(false) .setFetchSource(true) .execute(), DocumentMissingException.class ); } public void testUpsertFields() throws Exception { createTestIndex(); ensureGreen(); UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, PUT_VALUES_SCRIPT, Collections.singletonMap("extra", "foo"))) .setFetchSource(true) .execute() .actionGet(); assertThat(updateResponse.getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); assertThat(updateResponse.getGetResult().sourceAsMap().get("extra"), nullValue()); updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, PUT_VALUES_SCRIPT, Collections.singletonMap("extra", "foo"))) .setFetchSource(true) .execute() .actionGet(); assertThat(updateResponse.getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); assertThat(updateResponse.getGetResult().sourceAsMap().get("extra").toString(), equalTo("foo")); } public void testIndexAutoCreation() throws Exception { UpdateResponse updateResponse = client().prepareUpdate("test", "1") .setUpsert(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, PUT_VALUES_SCRIPT, Collections.singletonMap("extra", "foo"))) .setFetchSource(true) .execute() .actionGet(); assertThat(updateResponse.getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); assertThat(updateResponse.getGetResult().sourceAsMap().get("extra"), nullValue()); } public void testUpdate() throws Exception { assertAcked(prepareCreate("test").addAlias(new Alias("alias").writeIndex(true))); assertAcked(prepareCreate("test2").addAlias(new Alias("alias"))); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); DocumentMissingException ex = expectThrows( DocumentMissingException.class, () -> client().prepareUpdate(indexOrAlias(), "1").setScript(fieldIncScript).execute().actionGet() ); assertEquals("[1]: document missing", ex.getMessage()); client().prepareIndex("test").setId("1").setSource("field", 1).execute().actionGet(); UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1").setScript(fieldIncScript).execute().actionGet(); assertThat(updateResponse.getVersion(), equalTo(2L)); assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); } Map params = new HashMap<>(); params.put("inc", 3); params.put("field", "field"); updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, params)) .execute() .actionGet(); assertThat(updateResponse.getVersion(), equalTo(3L)); assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("5")); } // check noop updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setScript( new Script( ScriptType.INLINE, UPDATE_SCRIPTS, PUT_VALUES_SCRIPT, Collections.singletonMap("_ctx", Collections.singletonMap("op", "none")) ) ) .execute() .actionGet(); assertThat(updateResponse.getVersion(), equalTo(3L)); assertEquals(DocWriteResponse.Result.NOOP, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("5")); } // check delete updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setScript( new Script( ScriptType.INLINE, UPDATE_SCRIPTS, PUT_VALUES_SCRIPT, Collections.singletonMap("_ctx", Collections.singletonMap("op", "delete")) ) ) .execute() .actionGet(); assertThat(updateResponse.getVersion(), equalTo(4L)); assertEquals(DocWriteResponse.Result.DELETED, updateResponse.getResult()); assertThat(updateResponse.getIndex(), equalTo("test")); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.isExists(), equalTo(false)); } // check _source parameter client().prepareIndex("test").setId("1").setSource("field1", 1, "field2", 2).execute().actionGet(); updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field1"))) .setFetchSource("field1", "field2") .get(); assertThat(updateResponse.getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().getIndex(), equalTo("test")); assertThat(updateResponse.getGetResult().sourceRef(), notNullValue()); assertThat(updateResponse.getGetResult().field("field1"), nullValue()); assertThat(updateResponse.getGetResult().sourceAsMap().size(), equalTo(1)); assertThat(updateResponse.getGetResult().sourceAsMap().get("field1"), equalTo(2)); // check updates without script // add new field client().prepareIndex("test").setId("1").setSource("field", 1).execute().actionGet(); client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field2", 2).endObject()) .execute() .actionGet(); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("1")); assertThat(getResponse.getSourceAsMap().get("field2").toString(), equalTo("2")); } // change existing field client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 3).endObject()) .execute() .actionGet(); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("3")); assertThat(getResponse.getSourceAsMap().get("field2").toString(), equalTo("2")); } // recursive map Map testMap = new HashMap<>(); Map testMap2 = new HashMap<>(); Map testMap3 = new HashMap<>(); testMap3.put("commonkey", testMap); testMap3.put("map3", 5); testMap2.put("map2", 6); testMap.put("commonkey", testMap2); testMap.put("map1", 8); client().prepareIndex("test").setId("1").setSource("map", testMap).execute().actionGet(); client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("map", testMap3).endObject()) .execute() .actionGet(); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "1").execute().actionGet(); Map map1 = (Map) getResponse.getSourceAsMap().get("map"); assertThat(map1.size(), equalTo(3)); assertThat(map1.containsKey("map1"), equalTo(true)); assertThat(map1.containsKey("map3"), equalTo(true)); assertThat(map1.containsKey("commonkey"), equalTo(true)); Map map2 = (Map) map1.get("commonkey"); assertThat(map2.size(), equalTo(3)); assertThat(map2.containsKey("map1"), equalTo(true)); assertThat(map2.containsKey("map2"), equalTo(true)); assertThat(map2.containsKey("commonkey"), equalTo(true)); } } public void testUpdateWithIfSeqNo() throws Exception { createTestIndex(); ensureGreen(); IndexResponse result = client().prepareIndex("test").setId("1").setSource("field", 1).get(); expectThrows( VersionConflictEngineException.class, () -> client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) .setIfSeqNo(result.getSeqNo() + 1) .setIfPrimaryTerm(result.getPrimaryTerm()) .get() ); expectThrows( VersionConflictEngineException.class, () -> client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) .setIfSeqNo(result.getSeqNo()) .setIfPrimaryTerm(result.getPrimaryTerm() + 1) .get() ); expectThrows( VersionConflictEngineException.class, () -> client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) .setIfSeqNo(result.getSeqNo() + 1) .setIfPrimaryTerm(result.getPrimaryTerm() + 1) .get() ); UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) .setIfSeqNo(result.getSeqNo()) .setIfPrimaryTerm(result.getPrimaryTerm()) .get(); assertThat(updateResponse.status(), equalTo(RestStatus.OK)); assertThat(updateResponse.getSeqNo(), equalTo(result.getSeqNo() + 1)); } public void testUpdateRequestWithBothScriptAndDoc() throws Exception { createTestIndex(); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); try { client().prepareUpdate(indexOrAlias(), "1") .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setScript(fieldIncScript) .execute() .actionGet(); fail("Should have thrown ActionRequestValidationException"); } catch (ActionRequestValidationException e) { assertThat(e.validationErrors().size(), equalTo(1)); assertThat(e.validationErrors().get(0), containsString("can't provide both script and doc")); assertThat(e.getMessage(), containsString("can't provide both script and doc")); } } public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception { createTestIndex(); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); try { client().prepareUpdate(indexOrAlias(), "1").setScript(fieldIncScript).setDocAsUpsert(true).execute().actionGet(); fail("Should have thrown ActionRequestValidationException"); } catch (ActionRequestValidationException e) { assertThat(e.validationErrors().size(), equalTo(1)); assertThat(e.validationErrors().get(0), containsString("doc must be specified if doc_as_upsert is enabled")); assertThat(e.getMessage(), containsString("doc must be specified if doc_as_upsert is enabled")); } } public void testContextVariables() throws Exception { assertAcked(prepareCreate("test").addAlias(new Alias("alias"))); ensureGreen(); // Index some documents client().prepareIndex() .setIndex("test") .setId("id1") .setRouting("routing1") .setSource("field1", 1, "content", "foo") .execute() .actionGet(); client().prepareIndex().setIndex("test").setId("id2").setSource("field1", 0, "content", "bar").execute().actionGet(); // Update the first object and note context variables values UpdateResponse updateResponse = client().prepareUpdate("test", "id1") .setRouting("routing1") .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, EXTRACT_CTX_SCRIPT, Collections.emptyMap())) .execute() .actionGet(); assertEquals(2, updateResponse.getVersion()); GetResponse getResponse = client().prepareGet("test", "id1").setRouting("routing1").execute().actionGet(); Map updateContext = (Map) getResponse.getSourceAsMap().get("update_context"); assertEquals("test", updateContext.get("_index")); assertEquals("id1", updateContext.get("_id")); assertEquals(1, updateContext.get("_version")); assertEquals("routing1", updateContext.get("_routing")); // Idem with the second object updateResponse = client().prepareUpdate("test", "id2") .setScript(new Script(ScriptType.INLINE, UPDATE_SCRIPTS, EXTRACT_CTX_SCRIPT, Collections.emptyMap())) .execute() .actionGet(); assertEquals(2, updateResponse.getVersion()); getResponse = client().prepareGet("test", "id2").execute().actionGet(); updateContext = (Map) getResponse.getSourceAsMap().get("update_context"); assertEquals("test", updateContext.get("_index")); assertEquals("id2", updateContext.get("_id")); assertEquals(1, updateContext.get("_version")); assertNull(updateContext.get("_routing")); assertNull(updateContext.get("_ttl")); } public void testConcurrentUpdateWithRetryOnConflict() throws Exception { final boolean useBulkApi = randomBoolean(); createTestIndex(); ensureGreen(); int numberOfThreads = scaledRandomIntBetween(2, 5); final CountDownLatch latch = new CountDownLatch(numberOfThreads); final CountDownLatch startLatch = new CountDownLatch(1); final int numberOfUpdatesPerThread = scaledRandomIntBetween(100, 500); final List failures = new CopyOnWriteArrayList<>(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); for (int i = 0; i < numberOfThreads; i++) { Runnable r = new Runnable() { @Override public void run() { try { startLatch.await(); for (int i = 0; i < numberOfUpdatesPerThread; i++) { if (i % 100 == 0) { logger.debug( "Client [{}] issued [{}] of [{}] requests", Thread.currentThread().getName(), i, numberOfUpdatesPerThread ); } if (useBulkApi) { UpdateRequestBuilder updateRequestBuilder = client().prepareUpdate(indexOrAlias(), Integer.toString(i)) .setScript(fieldIncScript) .setRetryOnConflict(Integer.MAX_VALUE) .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()); client().prepareBulk().add(updateRequestBuilder).execute().actionGet(); } else { client().prepareUpdate(indexOrAlias(), Integer.toString(i)) .setScript(fieldIncScript) .setRetryOnConflict(Integer.MAX_VALUE) .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()) .execute() .actionGet(); } } logger.info("Client [{}] issued all [{}] requests.", Thread.currentThread().getName(), numberOfUpdatesPerThread); } catch (InterruptedException e) { // test infrastructure kills long-running tests by interrupting them, thus we handle this case separately logger.warn( "Test was forcefully stopped. Client [{}] may still have outstanding requests.", Thread.currentThread().getName() ); failures.add(e); Thread.currentThread().interrupt(); } catch (Exception e) { failures.add(e); } finally { latch.countDown(); } } }; Thread updater = new Thread(r); updater.setName("UpdateIT-Client-" + i); updater.start(); } startLatch.countDown(); latch.await(); for (Throwable throwable : failures) { logger.info("Captured failure on concurrent update:", throwable); } assertThat(failures.size(), equalTo(0)); for (int i = 0; i < numberOfUpdatesPerThread; i++) { GetResponse response = client().prepareGet("test", Integer.toString(i)).execute().actionGet(); assertThat(response.getId(), equalTo(Integer.toString(i))); assertThat(response.isExists(), equalTo(true)); assertThat(response.getVersion(), equalTo((long) numberOfThreads)); assertThat(response.getSource().get("field"), equalTo(numberOfThreads)); } } public void testStressUpdateDeleteConcurrency() throws Exception { // We create an index with merging disabled so that deletes don't get merged away assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false))); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); final int numberOfThreads = scaledRandomIntBetween(3, 5); final int numberOfIdsPerThread = scaledRandomIntBetween(3, 10); final int numberOfUpdatesPerId = scaledRandomIntBetween(10, 100); final int retryOnConflict = randomIntBetween(0, 1); final CountDownLatch latch = new CountDownLatch(numberOfThreads); final CountDownLatch startLatch = new CountDownLatch(1); final List failures = new CopyOnWriteArrayList<>(); final class UpdateThread extends Thread { final Map failedMap = new HashMap<>(); final int numberOfIds; final int maxUpdateRequests = numberOfIdsPerThread * numberOfUpdatesPerId; final int maxDeleteRequests = numberOfIdsPerThread * numberOfUpdatesPerId; private final Semaphore updateRequestsOutstanding = new Semaphore(maxUpdateRequests); private final Semaphore deleteRequestsOutstanding = new Semaphore(maxDeleteRequests); UpdateThread(int numberOfIds) { this.numberOfIds = numberOfIds; } final class UpdateListener implements ActionListener { int id; UpdateListener(int id) { this.id = id; } @Override public void onResponse(UpdateResponse updateResponse) { updateRequestsOutstanding.release(1); } @Override public void onFailure(Exception e) { synchronized (failedMap) { incrementMapValue(id, failedMap); } updateRequestsOutstanding.release(1); } } final class DeleteListener implements ActionListener { int id; DeleteListener(int id) { this.id = id; } @Override public void onResponse(DeleteResponse deleteResponse) { deleteRequestsOutstanding.release(1); } @Override public void onFailure(Exception e) { synchronized (failedMap) { incrementMapValue(id, failedMap); } deleteRequestsOutstanding.release(1); } } @Override public void run() { try { startLatch.await(); boolean hasWaitedForNoNode = false; for (int j = 0; j < numberOfIds; j++) { for (int k = 0; k < numberOfUpdatesPerId; ++k) { updateRequestsOutstanding.acquire(); try { UpdateRequest ur = client().prepareUpdate("test", Integer.toString(j)) .setScript(fieldIncScript) .setRetryOnConflict(retryOnConflict) .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()) .request(); if (randomBoolean()) { client().update(ur, new UpdateListener(j)); } else { client().prepareBulk().add(ur).execute(ActionListener.map(new UpdateListener(j), br -> { final BulkItemResponse ir = br.getItems()[0]; if (ir.isFailed()) { throw ir.getFailure().getCause(); } else { return ir.getResponse(); } })); } } catch (NoNodeAvailableException nne) { updateRequestsOutstanding.release(); synchronized (failedMap) { incrementMapValue(j, failedMap); } if (hasWaitedForNoNode) { throw nne; } logger.warn("Got NoNodeException waiting for 1 second for things to recover."); hasWaitedForNoNode = true; Thread.sleep(1000); } try { deleteRequestsOutstanding.acquire(); DeleteRequest dr = client().prepareDelete("test", Integer.toString(j)).request(); client().delete(dr, new DeleteListener(j)); } catch (NoNodeAvailableException nne) { deleteRequestsOutstanding.release(); synchronized (failedMap) { incrementMapValue(j, failedMap); } if (hasWaitedForNoNode) { throw nne; } logger.warn("Got NoNodeException waiting for 1 second for things to recover."); hasWaitedForNoNode = true; Thread.sleep(1000); // Wait for no-node to clear } } } } catch (Exception e) { logger.error("Something went wrong", e); failures.add(e); } finally { try { waitForOutstandingRequests(TimeValue.timeValueSeconds(60), updateRequestsOutstanding, maxUpdateRequests, "Update"); waitForOutstandingRequests(TimeValue.timeValueSeconds(60), deleteRequestsOutstanding, maxDeleteRequests, "Delete"); } catch (OpenSearchTimeoutException ete) { failures.add(ete); } latch.countDown(); } } private void incrementMapValue(int j, Map map) { if (!map.containsKey(j)) { map.put(j, 0); } map.put(j, map.get(j) + 1); } private void waitForOutstandingRequests(TimeValue timeOut, Semaphore requestsOutstanding, int maxRequests, String name) { long start = System.currentTimeMillis(); do { long msRemaining = timeOut.getMillis() - (System.currentTimeMillis() - start); logger.info( "[{}] going to try and acquire [{}] in [{}]ms [{}] available to acquire right now", name, maxRequests, msRemaining, requestsOutstanding.availablePermits() ); try { requestsOutstanding.tryAcquire(maxRequests, msRemaining, TimeUnit.MILLISECONDS); return; } catch (InterruptedException ie) { // Just keep swimming } } while ((System.currentTimeMillis() - start) < timeOut.getMillis()); throw new OpenSearchTimeoutException( "Requests were still outstanding after the timeout [" + timeOut + "] for type [" + name + "]" ); } } final List threads = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { UpdateThread ut = new UpdateThread(numberOfIdsPerThread); ut.start(); threads.add(ut); } startLatch.countDown(); latch.await(); for (UpdateThread ut : threads) { ut.join(); // Threads should have finished because of the latch.await } // If are no errors every request received a response otherwise the test would have timedout // aquiring the request outstanding semaphores. for (Throwable throwable : failures) { logger.info("Captured failure on concurrent update:", throwable); } assertThat(failures.size(), equalTo(0)); // Upsert all the ids one last time to make sure they are available at get time // This means that we add 1 to the expected versions and attempts // All the previous operations should be complete or failed at this point for (int i = 0; i < numberOfIdsPerThread; ++i) { client().prepareUpdate("test", Integer.toString(i)) .setScript(fieldIncScript) .setRetryOnConflict(Integer.MAX_VALUE) .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()) .execute() .actionGet(); } refresh(); for (int i = 0; i < numberOfIdsPerThread; ++i) { int totalFailures = 0; GetResponse response = client().prepareGet("test", Integer.toString(i)).execute().actionGet(); if (response.isExists()) { assertThat(response.getId(), equalTo(Integer.toString(i))); int expectedVersion = (numberOfThreads * numberOfUpdatesPerId * 2) + 1; for (UpdateThread ut : threads) { if (ut.failedMap.containsKey(i)) { totalFailures += ut.failedMap.get(i); } } expectedVersion -= totalFailures; logger.error( "Actual version [{}] Expected version [{}] Total failures [{}]", response.getVersion(), expectedVersion, totalFailures ); assertThat(response.getVersion(), equalTo((long) expectedVersion)); assertThat(response.getVersion() + totalFailures, equalTo((long) ((numberOfUpdatesPerId * numberOfThreads * 2) + 1))); } } } private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } }