/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.ml.indices; import static org.opensearch.ml.common.CommonValue.META; import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX; import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX; import static org.opensearch.ml.common.CommonValue.SCHEMA_VERSION_FIELD; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.ml.common.CommonValue; import org.opensearch.ml.common.exception.MLException; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; import lombok.extern.log4j.Log4j2; @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) @RequiredArgsConstructor @Log4j2 public class MLIndicesHandler { ClusterService clusterService; Client client; private static final Map indexMappingUpdated = new HashMap<>(); static { indexMappingUpdated.put(ML_MODEL_INDEX, new AtomicBoolean(false)); indexMappingUpdated.put(ML_TASK_INDEX, new AtomicBoolean(false)); } public void initModelIndexIfAbsent(ActionListener listener) { initMLIndexIfAbsent(MLIndex.MODEL, listener); } public void initMLTaskIndex(ActionListener listener) { initMLIndexIfAbsent(MLIndex.TASK, listener); } public void initMLIndexIfAbsent(MLIndex index, ActionListener listener) { String indexName = index.getIndexName(); String mapping = index.getMapping(); try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { ActionListener internalListener = ActionListener.runBefore(listener, () -> threadContext.restore()); if (!clusterService.state().metadata().hasIndex(indexName)) { ActionListener actionListener = ActionListener.wrap(r -> { if (r.isAcknowledged()) { log.info("create index:{}", indexName); internalListener.onResponse(true); } else { internalListener.onResponse(false); } }, e -> { log.error("Failed to create index " + indexName, e); internalListener.onFailure(e); }); CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping); client.admin().indices().create(request, actionListener); } else { log.debug("index:{} is already created", indexName); if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { if (r) { // return true if should update index client .admin() .indices() .putMapping( new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), ActionListener.wrap(response -> { if (response.isAcknowledged()) { indexMappingUpdated.get(indexName).set(true); internalListener.onResponse(true); } else { internalListener.onFailure(new MLException("Failed to update index: " + indexName)); } }, exception -> { log.error("Failed to update index " + indexName, exception); internalListener.onFailure(exception); }) ); } else { // no need to update index if it does not exist or the version is already up-to-date. indexMappingUpdated.get(indexName).set(true); internalListener.onResponse(true); } }, e -> { log.error("Failed to update index mapping", e); internalListener.onFailure(e); })); } else { // No need to update index if it's not ML system index or it's already updated. internalListener.onResponse(true); } } } catch (Exception e) { log.error("Failed to init index " + indexName, e); listener.onFailure(e); } } /** * Check if we should update index based on schema version. * @param indexName index name * @param newVersion new index mapping version * @param listener action listener, if should update index, will pass true to its onResponse method */ public void shouldUpdateIndex(String indexName, Integer newVersion, ActionListener listener) { IndexMetadata indexMetaData = clusterService.state().getMetadata().indices().get(indexName); if (indexMetaData == null) { listener.onResponse(Boolean.FALSE); return; } Integer oldVersion = CommonValue.NO_SCHEMA_VERSION; Map indexMapping = indexMetaData.mapping().getSourceAsMap(); Object meta = indexMapping.get(META); if (meta != null && meta instanceof Map) { @SuppressWarnings("unchecked") Map metaMapping = (Map) meta; Object schemaVersion = metaMapping.get(SCHEMA_VERSION_FIELD); if (schemaVersion instanceof Integer) { oldVersion = (Integer) schemaVersion; } } listener.onResponse(newVersion > oldVersion); } }