/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.knn.index; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.index.memory.NativeMemoryCacheManagerDto; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.monitor.os.OsProbe; import java.security.InvalidParameterException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.IndexScope; import static org.opensearch.common.settings.Setting.Property.NodeScope; import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue; import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio; /** * This class defines * 1. KNN settings to hold the HNSW algorithm parameters. * 2. KNN settings to enable/disable plugin, circuit breaker settings * 3. KNN settings to manage graphs loaded in native memory */ public class KNNSettings { private static final Logger logger = LogManager.getLogger(KNNSettings.class); private static KNNSettings INSTANCE; private static final OsProbe osProbe = OsProbe.getInstance(); private static final int INDEX_THREAD_QTY_MAX = 32; /** * Settings name */ public static final String KNN_SPACE_TYPE = "index.knn.space_type"; public static final String KNN_ALGO_PARAM_M = "index.knn.algo_param.m"; public static final String KNN_ALGO_PARAM_EF_CONSTRUCTION = "index.knn.algo_param.ef_construction"; public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search"; public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty"; public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled"; public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit"; public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered"; public static final String KNN_CACHE_ITEM_EXPIRY_ENABLED = "knn.cache.item.expiry.enabled"; public static final String KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES = "knn.cache.item.expiry.minutes"; public static final String KNN_PLUGIN_ENABLED = "knn.plugin.enabled"; public static final String KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE = "knn.circuit_breaker.unset.percentage"; public static final String KNN_INDEX = "index.knn"; public static final String MODEL_INDEX_NUMBER_OF_SHARDS = "knn.model.index.number_of_shards"; public static final String MODEL_INDEX_NUMBER_OF_REPLICAS = "knn.model.index.number_of_replicas"; public static final String MODEL_CACHE_SIZE_LIMIT = "knn.model.cache.size.limit"; /** * Default setting values */ public static final String INDEX_KNN_DEFAULT_SPACE_TYPE = "l2"; public static final Integer INDEX_KNN_DEFAULT_ALGO_PARAM_M = 16; public static final Integer INDEX_KNN_DEFAULT_ALGO_PARAM_EF_SEARCH = 512; public static final Integer INDEX_KNN_DEFAULT_ALGO_PARAM_EF_CONSTRUCTION = 512; public static final Integer KNN_DEFAULT_ALGO_PARAM_INDEX_THREAD_QTY = 1; public static final Integer KNN_DEFAULT_CIRCUIT_BREAKER_UNSET_PERCENTAGE = 75; public static final Integer KNN_DEFAULT_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE = 10; // By default, set aside 10% of the JVM for the limit public static final Integer KNN_MAX_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE = 25; // Model cache limit cannot exceed 25% of the JVM heap public static final String KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT = "50%"; /** * Settings Definition */ public static final Setting INDEX_KNN_SPACE_TYPE = Setting.simpleString( KNN_SPACE_TYPE, INDEX_KNN_DEFAULT_SPACE_TYPE, new SpaceTypeValidator(), IndexScope, Setting.Property.Deprecated ); /** * M - the number of bi-directional links created for every new element during construction. * Reasonable range for M is 2-100. Higher M work better on datasets with high intrinsic * dimensionality and/or high recall, while low M work better for datasets with low intrinsic dimensionality and/or low recalls. * The parameter also determines the algorithm's memory consumption, which is roughly M * 8-10 bytes per stored element. */ public static final Setting INDEX_KNN_ALGO_PARAM_M_SETTING = Setting.intSetting( KNN_ALGO_PARAM_M, INDEX_KNN_DEFAULT_ALGO_PARAM_M, 2, IndexScope, Setting.Property.Deprecated ); /** * ef or efSearch - the size of the dynamic list for the nearest neighbors (used during the search). * Higher ef leads to more accurate but slower search. ef cannot be set lower than the number of queried nearest neighbors k. * The value ef can be anything between k and the size of the dataset. */ public static final Setting INDEX_KNN_ALGO_PARAM_EF_SEARCH_SETTING = Setting.intSetting( KNN_ALGO_PARAM_EF_SEARCH, INDEX_KNN_DEFAULT_ALGO_PARAM_EF_SEARCH, 2, IndexScope, Dynamic ); /** * ef_constrution - the parameter has the same meaning as ef, but controls the index_time/index_accuracy. * Bigger ef_construction leads to longer construction(more indexing time), but better index quality. */ public static final Setting INDEX_KNN_ALGO_PARAM_EF_CONSTRUCTION_SETTING = Setting.intSetting( KNN_ALGO_PARAM_EF_CONSTRUCTION, INDEX_KNN_DEFAULT_ALGO_PARAM_EF_CONSTRUCTION, 2, IndexScope, Setting.Property.Deprecated ); public static final Setting MODEL_INDEX_NUMBER_OF_SHARDS_SETTING = Setting.intSetting( MODEL_INDEX_NUMBER_OF_SHARDS, 1, 1, Setting.Property.NodeScope, Setting.Property.Dynamic ); public static final Setting MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING = Setting.intSetting( MODEL_INDEX_NUMBER_OF_REPLICAS, 1, 0, Setting.Property.NodeScope, Setting.Property.Dynamic ); public static final Setting MODEL_CACHE_SIZE_LIMIT_SETTING = new Setting<>( MODEL_CACHE_SIZE_LIMIT, percentageAsString(KNN_DEFAULT_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE), (s) -> { ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, MODEL_CACHE_SIZE_LIMIT); // parseBytesSizeValueOrHeapRatio will make sure that the value entered falls between 0 and 100% of the // JVM heap. However, we want the maximum percentage of the heap to be much smaller. So, we add // some additional validation here before returning ByteSizeValue jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax(); if ((userDefinedLimit.getKbFrac() / jvmHeapSize.getKbFrac()) > percentageAsFraction( KNN_MAX_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE )) { throw new OpenSearchParseException( "{} ({} KB) cannot exceed {}% of the heap ({} KB).", MODEL_CACHE_SIZE_LIMIT, userDefinedLimit.getKb(), KNN_MAX_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE, jvmHeapSize.getKb() ); } return userDefinedLimit; }, Setting.Property.NodeScope, Setting.Property.Dynamic ); /** * This setting identifies KNN index. */ public static final Setting IS_KNN_INDEX_SETTING = Setting.boolSetting(KNN_INDEX, false, IndexScope); /** * index_thread_quantity - the parameter specifies how many threads the nms library should use to create the graph. * By default, the nms library sets this value to NUM_CORES. However, because ES can spawn NUM_CORES threads for * indexing, and each indexing thread calls the NMS library to build the graph, which can also spawn NUM_CORES threads, * this could lead to NUM_CORES^2 threads running and could lead to 100% CPU utilization. This setting allows users to * configure number of threads for graph construction. */ public static final Setting KNN_ALGO_PARAM_INDEX_THREAD_QTY_SETTING = Setting.intSetting( KNN_ALGO_PARAM_INDEX_THREAD_QTY, KNN_DEFAULT_ALGO_PARAM_INDEX_THREAD_QTY, 1, INDEX_THREAD_QTY_MAX, NodeScope, Dynamic ); public static final Setting KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING = Setting.boolSetting( KNN_CIRCUIT_BREAKER_TRIGGERED, false, NodeScope, Dynamic ); public static final Setting KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING = Setting.doubleSetting( KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE, KNN_DEFAULT_CIRCUIT_BREAKER_UNSET_PERCENTAGE, 0, 100, NodeScope, Dynamic ); /** * Dynamic settings */ public static Map> dynamicCacheSettings = new HashMap>() { { /** * KNN plugin enable/disable setting */ put(KNN_PLUGIN_ENABLED, Setting.boolSetting(KNN_PLUGIN_ENABLED, true, NodeScope, Dynamic)); /** * Weight circuit breaker settings */ put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic)); put( KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, new Setting<>( KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT, (s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT), NodeScope, Dynamic ) ); /** * Cache expiry time settings */ put(KNN_CACHE_ITEM_EXPIRY_ENABLED, Setting.boolSetting(KNN_CACHE_ITEM_EXPIRY_ENABLED, false, NodeScope, Dynamic)); put( KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES, Setting.positiveTimeSetting(KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES, TimeValue.timeValueHours(3), NodeScope, Dynamic) ); } }; private ClusterService clusterService; private Client client; private KNNSettings() {} public static synchronized KNNSettings state() { if (INSTANCE == null) { INSTANCE = new KNNSettings(); } return INSTANCE; } private void setSettingsUpdateConsumers() { clusterService.getClusterSettings().addSettingsUpdateConsumer(updatedSettings -> { // When any of the dynamic settings are updated, rebuild the cache with the updated values. Use the current // cluster settings values as defaults. NativeMemoryCacheManagerDto.NativeMemoryCacheManagerDtoBuilder builder = NativeMemoryCacheManagerDto.builder(); builder.isWeightLimited( updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) ); builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb()); if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) { builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb()); } builder.isExpirationLimited( updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED)) ); builder.expiryTimeInMin( updatedSettings.getAsTime(KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES, getSettingValue(KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) .getMinutes() ); NativeMemoryCacheManager.getInstance().rebuildCache(builder.build()); }, new ArrayList<>(dynamicCacheSettings.values())); } /** * Get setting value by key. Return default value if not configured explicitly. * * @param key setting key. * @param Setting type * @return T setting value or default */ @SuppressWarnings("unchecked") public T getSettingValue(String key) { return (T) clusterService.getClusterSettings().get(getSetting(key)); } private Setting getSetting(String key) { if (dynamicCacheSettings.containsKey(key)) { return dynamicCacheSettings.get(key); } if (KNN_CIRCUIT_BREAKER_TRIGGERED.equals(key)) { return KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING; } if (KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE.equals(key)) { return KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING; } if (KNN_ALGO_PARAM_INDEX_THREAD_QTY.equals(key)) { return KNN_ALGO_PARAM_INDEX_THREAD_QTY_SETTING; } throw new IllegalArgumentException("Cannot find setting by key [" + key + "]"); } public List> getSettings() { List> settings = Arrays.asList( INDEX_KNN_SPACE_TYPE, INDEX_KNN_ALGO_PARAM_M_SETTING, INDEX_KNN_ALGO_PARAM_EF_CONSTRUCTION_SETTING, INDEX_KNN_ALGO_PARAM_EF_SEARCH_SETTING, KNN_ALGO_PARAM_INDEX_THREAD_QTY_SETTING, KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING, KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING, IS_KNN_INDEX_SETTING, MODEL_INDEX_NUMBER_OF_SHARDS_SETTING, MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING, MODEL_CACHE_SIZE_LIMIT_SETTING ); return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList()); } public static boolean isKNNPluginEnabled() { return KNNSettings.state().getSettingValue(KNNSettings.KNN_PLUGIN_ENABLED); } public static boolean isCircuitBreakerTriggered() { return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED); } public static ByteSizeValue getCircuitBreakerLimit() { return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT); } public static double getCircuitBreakerUnsetPercentage() { return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE); } public void initialize(Client client, ClusterService clusterService) { this.client = client; this.clusterService = clusterService; setSettingsUpdateConsumers(); } public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) { settingName = Objects.requireNonNull(settingName); if (sValue != null && sValue.endsWith("%")) { final String percentAsString = sValue.substring(0, sValue.length() - 1); try { final double percent = Double.parseDouble(percentAsString); if (percent < 0 || percent > 100) { throw new OpenSearchParseException("percentage should be in [0-100], got [{}]", percentAsString); } long physicalMemoryInBytes = osProbe.getTotalPhysicalMemorySize(); if (physicalMemoryInBytes <= 0) { throw new IllegalStateException("Physical memory size could not be determined"); } long esJvmSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); long eligibleMemoryInBytes = physicalMemoryInBytes - esJvmSizeInBytes; return new ByteSizeValue((long) ((percent / 100) * eligibleMemoryInBytes), ByteSizeUnit.BYTES); } catch (NumberFormatException e) { throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString); } } else { return parseBytesSizeValue(sValue, settingName); } } /** * Updates knn.circuit_breaker.triggered setting to true/false * @param flag true/false */ public synchronized void updateCircuitBreakerSettings(boolean flag) { ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest(); Settings circuitBreakerSettings = Settings.builder().put(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED, flag).build(); clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings); client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener() { @Override public void onResponse(ClusterUpdateSettingsResponse clusterUpdateSettingsResponse) { logger.debug( "Cluster setting {}, acknowledged: {} ", clusterUpdateSettingsRequest.persistentSettings(), clusterUpdateSettingsResponse.isAcknowledged() ); } @Override public void onFailure(Exception e) { logger.info( "Exception while updating circuit breaker setting {} to {}", clusterUpdateSettingsRequest.persistentSettings(), e.getMessage() ); } }); } /** * * @param index Name of the index * @return efSearch value */ public static int getEfSearchParam(String index) { return KNNSettings.state().clusterService.state() .getMetadata() .index(index) .getSettings() .getAsInt(KNNSettings.KNN_ALGO_PARAM_EF_SEARCH, 512); } public void setClusterService(ClusterService clusterService) { this.clusterService = clusterService; } static class SpaceTypeValidator implements Setting.Validator { @Override public void validate(String value) { try { SpaceType.getSpace(value); } catch (IllegalArgumentException ex) { throw new InvalidParameterException(ex.getMessage()); } } } public void onIndexModule(IndexModule module) { module.addSettingsUpdateConsumer(INDEX_KNN_ALGO_PARAM_EF_SEARCH_SETTING, newVal -> { logger.debug("The value of [KNN] setting [{}] changed to [{}]", KNN_ALGO_PARAM_EF_SEARCH, newVal); // TODO: replace cache-rebuild with index reload into the cache NativeMemoryCacheManager.getInstance().rebuildCache(); }); } private static String percentageAsString(Integer percentage) { return percentage + "%"; } private static Double percentageAsFraction(Integer percentage) { return percentage / 100.0; } }