/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.transport; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.Booleans; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.Strings; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.util.io.IOUtils; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.common.settings.Setting.intSetting; /** * Sniff for initial seed nodes * * @opensearch.internal */ public class SniffConnectionStrategy extends RemoteConnectionStrategy { /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( "cluster.remote.", "seeds", (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate seed address parsePort(s); return s; }, new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope) ); /** * A proxy address for the remote cluster. By default this is not set, meaning that OpenSearch will connect directly to the nodes in * the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then OpenSearch will connect * to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately * undocumented as it does not work well with all proxies. */ public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( "cluster.remote.", "proxy", (ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { if (Strings.hasLength(s)) { parsePort(s); } }), Setting.Property.Dynamic, Setting.Property.NodeScope), () -> REMOTE_CLUSTER_SEEDS ); /** * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. */ public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = intSetting( "cluster.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope ); /** * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. */ public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "node_connections", (ns, key) -> intSetting( key, REMOTE_CONNECTIONS_PER_CLUSTER, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope ) ); static final int CHANNELS_PER_CONNECTION = 6; private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isClusterManagerNode() == false || node.isDataNode() || node.isIngestNode()); private final List configuredSeedNodes; private final List> seedNodes; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); private final String proxyAddress; SniffConnectionStrategy( String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, Settings settings ) { this( clusterAlias, transportService, connectionManager, REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings), settings, REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings), getNodePredicate(settings), REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings) ); } SniffConnectionStrategy( String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate nodePredicate, List configuredSeedNodes ) { this( clusterAlias, transportService, connectionManager, proxyAddress, settings, maxNumRemoteConnections, nodePredicate, configuredSeedNodes, configuredSeedNodes.stream() .map(seedAddress -> (Supplier) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress)) .collect(Collectors.toList()) ); } SniffConnectionStrategy( String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate nodePredicate, List configuredSeedNodes, List> seedNodes ) { super(clusterAlias, transportService, connectionManager, settings); this.proxyAddress = proxyAddress; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; this.configuredSeedNodes = configuredSeedNodes; this.seedNodes = seedNodes; } static Stream> enablementSettings() { return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS); } static Writeable.Reader infoReader() { return SniffModeInfo::new; } @Override protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumRemoteConnections; } @Override protected boolean strategyMustBeRebuilt(Settings newSettings) { String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings); List addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); return nodeConnections != maxNumRemoteConnections || seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy); } @Override protected ConnectionStrategy strategyType() { return ConnectionStrategy.SNIFF; } @Override protected void connectImpl(ActionListener listener) { collectRemoteNodes(seedNodes.iterator(), listener); } @Override protected RemoteConnectionInfo.ModeInfo getModeInfo() { return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size()); } private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); return; } if (seedNodes.hasNext()) { final Consumer onFailure = e -> { if (e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) { // ISE if we fail the handshake with an version incompatible node if (seedNodes.hasNext()) { logger.debug( () -> new ParameterizedMessage( "fetching nodes from external cluster [{}] failed moving to next seed node", clusterAlias ), e ); collectRemoteNodes(seedNodes, listener); return; } } logger.warn(new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e); listener.onFailure(e); }; final DiscoveryNode seedNode = seedNodes.next().get(); logger.trace("[{}] opening transient connection to seed node: [{}]", clusterAlias, seedNode); final StepListener openConnectionStep = new StepListener<>(); try { connectionManager.openConnection(seedNode, null, openConnectionStep); } catch (Exception e) { onFailure.accept(e); } final StepListener handshakeStep = new StepListener<>(); openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionProfile(); transportService.handshake( connection, connectionProfile.getHandshakeTimeout().millis(), getRemoteClusterNamePredicate(), handshakeStep ); }, onFailure); final StepListener fullConnectionStep = new StepListener<>(); handshakeStep.whenComplete(handshakeResponse -> { final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) { logger.trace( "[{}] opening managed connection to seed node: [{}] proxy address: [{}]", clusterAlias, handshakeNode, proxyAddress ); final DiscoveryNode handshakeNodeWithProxy = maybeAddProxyAddress(proxyAddress, handshakeNode); connectionManager.connectToNode( handshakeNodeWithProxy, null, transportService.connectionValidator(handshakeNodeWithProxy), fullConnectionStep ); } else { fullConnectionStep.onResponse(null); } }, e -> { final Transport.Connection connection = openConnectionStep.result(); final DiscoveryNode node = connection.getNode(); logger.debug(() -> new ParameterizedMessage("[{}] failed to handshake with seed node: [{}]", clusterAlias, node), e); IOUtils.closeWhileHandlingException(connection); onFailure.accept(e); }); fullConnectionStep.whenComplete(aVoid -> { if (remoteClusterName.get() == null) { TransportService.HandshakeResponse handshakeResponse = handshakeStep.result(); assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); } final Transport.Connection connection = openConnectionStep.result(); ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); // here we pass on the connection since we can only close it once the sendRequest returns otherwise // due to the async nature (it will return before it's actually sent) this can cause the request to fail // due to an already closed connection. ThreadPool threadPool = transportService.getThreadPool(); ThreadContext threadContext = threadPool.getThreadContext(); TransportService.ContextRestoreResponseHandler responseHandler = new TransportService.ContextRestoreResponseHandler<>( threadContext.newRestorableContext(false), new SniffClusterStateResponseHandler(connection, listener, seedNodes) ); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we stash any context here since this is an internal execution and should not leak any // existing context information. threadContext.markAsSystemContext(); transportService.sendRequest( connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, responseHandler ); } }, e -> { final Transport.Connection connection = openConnectionStep.result(); final DiscoveryNode node = connection.getNode(); logger.debug( () -> new ParameterizedMessage("[{}] failed to open managed connection to seed node: [{}]", clusterAlias, node), e ); IOUtils.closeWhileHandlingException(openConnectionStep.result()); onFailure.accept(e); }); } else { listener.onFailure(new NoSeedNodeLeftException(clusterAlias)); } } /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */ private class SniffClusterStateResponseHandler implements TransportResponseHandler { private final Transport.Connection connection; private final ActionListener listener; private final Iterator> seedNodes; SniffClusterStateResponseHandler( Transport.Connection connection, ActionListener listener, Iterator> seedNodes ) { this.connection = connection; this.listener = listener; this.seedNodes = seedNodes; } @Override public ClusterStateResponse read(StreamInput in) throws IOException { return new ClusterStateResponse(in); } @Override public void handleResponse(ClusterStateResponse response) { handleNodes(response.getState().nodes().getNodes().values().iterator()); } private void handleNodes(Iterator nodesIter) { while (nodesIter.hasNext()) { final DiscoveryNode node = nodesIter.next(); if (nodePredicate.test(node) && shouldOpenMoreConnections()) { logger.trace("[{}] opening managed connection to node: [{}] proxy address: [{}]", clusterAlias, node, proxyAddress); final DiscoveryNode nodeWithProxy = maybeAddProxyAddress(proxyAddress, node); connectionManager.connectToNode( nodeWithProxy, null, transportService.connectionValidator(node), new ActionListener() { @Override public void onResponse(Void aVoid) { handleNodes(nodesIter); } @Override public void onFailure(Exception e) { if (e instanceof ConnectTransportException || e instanceof IllegalStateException) { // ISE if we fail the handshake with an version incompatible node // fair enough we can't connect just move on logger.debug( () -> new ParameterizedMessage( "[{}] failed to open managed connection to node [{}]", clusterAlias, node ), e ); handleNodes(nodesIter); } else { logger.warn( new ParameterizedMessage("[{}] failed to open managed connection to node [{}]", clusterAlias, node), e ); IOUtils.closeWhileHandlingException(connection); collectRemoteNodes(seedNodes, listener); } } } ); return; } } // We have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. // from a code correctness perspective we could also close it afterwards. IOUtils.closeWhileHandlingException(connection); int openConnections = connectionManager.size(); if (openConnections == 0) { listener.onFailure(new IllegalStateException("Unable to open any connections to remote cluster [" + clusterAlias + "]")); } else { listener.onResponse(null); } } @Override public void handleException(TransportException exp) { logger.warn(new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp); try { IOUtils.closeWhileHandlingException(connection); } finally { // once the connection is closed lets try the next node collectRemoteNodes(seedNodes, listener); } } @Override public String executor() { return ThreadPool.Names.MANAGEMENT; } } private Predicate getRemoteClusterNamePredicate() { return new Predicate() { @Override public boolean test(ClusterName c) { return remoteClusterName.get() == null || c.equals(remoteClusterName.get()); } @Override public String toString() { return remoteClusterName.get() == null ? "any cluster name" : "expected remote cluster name [" + remoteClusterName.get().value() + "]"; } }; } private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) { if (proxyAddress == null || proxyAddress.isEmpty()) { TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(address)); return new DiscoveryNode( clusterAlias + "#" + transportAddress.toString(), transportAddress, Version.CURRENT.minimumCompatibilityVersion() ); } else { TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress)); String hostName = RemoteConnectionStrategy.parseHost(proxyAddress); return new DiscoveryNode( "", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address, transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion() ); } } // Default visibility for tests static Predicate getNodePredicate(Settings settings) { if (RemoteClusterService.REMOTE_NODE_ATTRIBUTE.exists(settings)) { // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search String attribute = RemoteClusterService.REMOTE_NODE_ATTRIBUTE.get(settings); return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); } return DEFAULT_NODE_PREDICATE; } private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; } else { // resolve proxy address lazy here InetSocketAddress proxyInetAddress = parseConfiguredAddress(proxyAddress); return new DiscoveryNode( node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion() ); } } private boolean seedsChanged(final List oldSeedNodes, final List newSeedNodes) { if (oldSeedNodes.size() != newSeedNodes.size()) { return true; } Set oldSeeds = new HashSet<>(oldSeedNodes); Set newSeeds = new HashSet<>(newSeedNodes); return oldSeeds.equals(newSeeds) == false; } private boolean proxyChanged(String oldProxy, String newProxy) { if (oldProxy == null || oldProxy.isEmpty()) { return (newProxy == null || newProxy.isEmpty()) == false; } return Objects.equals(oldProxy, newProxy) == false; } /** * Information about the sniff mode * * @opensearch.internal */ public static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo { final List seedNodes; final int maxConnectionsPerCluster; final int numNodesConnected; public SniffModeInfo(List seedNodes, int maxConnectionsPerCluster, int numNodesConnected) { this.seedNodes = seedNodes; this.maxConnectionsPerCluster = maxConnectionsPerCluster; this.numNodesConnected = numNodesConnected; } private SniffModeInfo(StreamInput input) throws IOException { seedNodes = Arrays.asList(input.readStringArray()); maxConnectionsPerCluster = input.readVInt(); numNodesConnected = input.readVInt(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startArray("seeds"); for (String address : seedNodes) { builder.value(address); } builder.endArray(); builder.field("num_nodes_connected", numNodesConnected); builder.field("max_connections_per_cluster", maxConnectionsPerCluster); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(seedNodes.toArray(new String[0])); out.writeVInt(maxConnectionsPerCluster); out.writeVInt(numNodesConnected); } @Override public boolean isConnected() { return numNodesConnected > 0; } @Override public String modeName() { return "sniff"; } public List getSeedNodes() { return seedNodes; } public int getMaxConnectionsPerCluster() { return maxConnectionsPerCluster; } public int getNumNodesConnected() { return numNodesConnected; } @Override public RemoteConnectionStrategy.ConnectionStrategy modeType() { return RemoteConnectionStrategy.ConnectionStrategy.SNIFF; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SniffModeInfo sniff = (SniffModeInfo) o; return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster && numNodesConnected == sniff.numNodesConnected && Objects.equals(seedNodes, sniff.seedNodes); } @Override public int hashCode() { return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected); } } }