/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.gateway;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.core.index.Index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
/**
* The dangling indices state is responsible for finding new dangling indices (indices that have
* their state written on disk, but don't exists in the metadata of the cluster), and importing
* them into the cluster.
*
* @opensearch.internal
*/
public class DanglingIndicesState implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(DanglingIndicesState.class);
/**
* Controls whether dangling indices should be automatically detected and imported into the cluster
* state upon discovery. This setting is deprecated - use the _dangling
API instead.
* If disabled, dangling indices will not be automatically detected.
*
* @see org.opensearch.action.admin.indices.dangling
*/
public static final Setting AUTO_IMPORT_DANGLING_INDICES_SETTING = Setting.boolSetting(
"gateway.auto_import_dangling_indices",
false,
Setting.Property.NodeScope,
Setting.Property.Deprecated
);
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final LocalAllocateDangledIndices danglingIndicesAllocator;
private final boolean isAutoImportDanglingIndicesEnabled;
private final ClusterService clusterService;
private final Map danglingIndices = ConcurrentCollections.newConcurrentMap();
@Inject
public DanglingIndicesState(
NodeEnvironment nodeEnv,
MetaStateService metaStateService,
LocalAllocateDangledIndices danglingIndicesAllocator,
ClusterService clusterService
) {
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.danglingIndicesAllocator = danglingIndicesAllocator;
this.clusterService = clusterService;
this.isAutoImportDanglingIndicesEnabled = AUTO_IMPORT_DANGLING_INDICES_SETTING.get(clusterService.getSettings());
if (this.isAutoImportDanglingIndicesEnabled) {
clusterService.addListener(this);
} else {
logger.warn(
AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey()
+ " is disabled, dangling indices will not be automatically detected or imported and must be managed manually"
);
}
}
boolean isAutoImportDanglingIndicesEnabled() {
return this.isAutoImportDanglingIndicesEnabled;
}
/**
* Process dangling indices based on the provided meta data, handling cleanup, finding
* new dangling indices, and allocating outstanding ones.
*/
public void processDanglingIndices(final Metadata metadata) {
assert this.isAutoImportDanglingIndicesEnabled;
if (nodeEnv.hasNodeFile() == false) {
return;
}
cleanupAllocatedDangledIndices(metadata);
findNewAndAddDanglingIndices(metadata);
allocateDanglingIndices(metadata);
}
/**
* Either return the current set of dangling indices, if auto-import is enabled, otherwise
* scan for dangling indices right away.
* @return a map of currently-known dangling indices
*/
public Map getDanglingIndices() {
if (this.isAutoImportDanglingIndicesEnabled) {
// This might be a good use case for CopyOnWriteHashMap
return unmodifiableMap(new HashMap<>(danglingIndices));
} else {
return findNewDanglingIndices(emptyMap(), this.clusterService.state().metadata());
}
}
/**
* Cleans dangling indices if they are already allocated on the provided meta data.
*/
void cleanupAllocatedDangledIndices(Metadata metadata) {
for (Index index : danglingIndices.keySet()) {
final IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata != null && indexMetadata.getIndex().getName().equals(index.getName())) {
if (indexMetadata.getIndex().getUUID().equals(index.getUUID()) == false) {
logger.warn(
"[{}] can not be imported as a dangling index, as there is already another index "
+ "with the same name but a different uuid. local index will be ignored (but not deleted)",
index
);
} else {
logger.debug("[{}] no longer dangling (created), removing from dangling list", index);
}
danglingIndices.remove(index);
}
}
}
/**
* Finds (@{link #findNewAndAddDanglingIndices}) and adds the new dangling indices
* to the currently tracked dangling indices.
*/
void findNewAndAddDanglingIndices(final Metadata metadata) {
final IndexGraveyard graveyard = metadata.indexGraveyard();
// If a tombstone is created for a dangling index, we need to make sure that the
// index is no longer considered dangling.
danglingIndices.keySet().removeIf(graveyard::containsIndex);
danglingIndices.putAll(findNewDanglingIndices(danglingIndices, metadata));
}
/**
* Finds new dangling indices by iterating over the indices and trying to find indices
* that have state on disk, but are not part of the provided metadata, or not detected
* as dangled already.
*/
public Map findNewDanglingIndices(Map existingDanglingIndices, final Metadata metadata) {
final Set excludeIndexPathIds = new HashSet<>(metadata.indices().size() + danglingIndices.size());
for (final IndexMetadata indexMetadata : metadata.indices().values()) {
excludeIndexPathIds.add(indexMetadata.getIndex().getUUID());
}
for (Index index : existingDanglingIndices.keySet()) {
excludeIndexPathIds.add(index.getUUID());
}
try {
final List indexMetadataList = metaStateService.loadIndicesStates(excludeIndexPathIds::contains);
Map newIndices = new HashMap<>(indexMetadataList.size());
final IndexGraveyard graveyard = metadata.indexGraveyard();
for (IndexMetadata indexMetadata : indexMetadataList) {
Index index = indexMetadata.getIndex();
if (graveyard.containsIndex(index) == false) {
newIndices.put(index, stripAliases(indexMetadata));
}
}
return newIndices;
} catch (IOException e) {
logger.warn("failed to list dangling indices", e);
return emptyMap();
}
}
/**
* Filters out dangling indices that cannot be automatically imported into the cluster state.
* @param metadata the current cluster metadata
* @param allIndices all currently known dangling indices
* @return a filtered list of dangling index metadata
*/
List filterDanglingIndices(Metadata metadata, Map allIndices) {
List filteredIndices = new ArrayList<>(allIndices.size());
allIndices.forEach((index, indexMetadata) -> {
if (metadata.hasIndex(indexMetadata.getIndex().getName())) {
logger.warn(
"[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata",
indexMetadata.getIndex()
);
} else {
logger.info(
"[{}] dangling index exists on local file system, but not in cluster metadata, auto import to cluster state",
indexMetadata.getIndex()
);
filteredIndices.add(stripAliases(indexMetadata));
}
});
return filteredIndices;
}
/**
* Removes all aliases from the supplied index metadata.
*
* Dangling importing indices with aliases is dangerous, it could for instance result in inability to write to an existing alias if it
* previously had only one index with any is_write_index indication.
*/
private IndexMetadata stripAliases(IndexMetadata indexMetadata) {
if (indexMetadata.getAliases().isEmpty()) {
return indexMetadata;
} else {
logger.info(
"[{}] stripping aliases: {} from index before importing",
indexMetadata.getIndex(),
indexMetadata.getAliases().keySet()
);
return IndexMetadata.builder(indexMetadata).removeAllAliases().build();
}
}
/**
* Allocates the detected list of dangling indices by sending them to the cluster-manager node
* for allocation, provided auto-import is enabled via the
* {@link #AUTO_IMPORT_DANGLING_INDICES_SETTING} setting.
* @param metadata the current cluster metadata, used to filter out dangling indices that cannot be allocated
* for some reason.
*/
void allocateDanglingIndices(Metadata metadata) {
if (danglingIndices.isEmpty()) {
return;
}
final List filteredIndices = filterDanglingIndices(metadata, danglingIndices);
if (filteredIndices.isEmpty()) {
return;
}
try {
danglingIndicesAllocator.allocateDangled(
filteredIndices,
new ActionListener() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
logger.trace("allocated dangled");
}
@Override
public void onFailure(Exception e) {
logger.info("failed to send allocated dangled", e);
}
}
);
} catch (Exception e) {
logger.warn("failed to send allocate dangled", e);
}
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence() == false) {
processDanglingIndices(event.state().metadata());
}
}
}