/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.transport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchServerException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Streamables; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.core.common.Strings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; import org.opensearch.node.ReportingService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; /** * The main OpenSearch transport service * * @opensearch.internal */ public class TransportService extends AbstractLifecycleComponent implements ReportingService, TransportMessageListener, TransportConnectionListener { private static final Logger logger = LogManager.getLogger(TransportService.class); public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); protected final Transport transport; protected final ConnectionManager connectionManager; protected final ThreadPool threadPool; protected final ClusterName clusterName; protected final TaskManager taskManager; private final TransportInterceptor.AsyncSender asyncSender; private final Function localNodeFactory; private final boolean remoteClusterClient; private final Transport.ResponseHandlers responseHandlers; private final TransportInterceptor interceptor; // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them final Map timeoutInfoHandlers = Collections.synchronizedMap( new LinkedHashMap(100, .75F, true) { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() > 100; } } ); public static final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { }; // tracer log private final Logger tracerLog; volatile String[] tracerLogInclude; volatile String[] tracerLogExclude; private final RemoteClusterService remoteClusterService; /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; private final Transport.Connection localNodeConnection = new Transport.Connection() { @Override public DiscoveryNode getNode() { return localNode; } @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { sendLocalRequest(requestId, action, request, options); } @Override public void addCloseListener(ActionListener listener) {} @Override public boolean isClosed() { return false; } @Override public void close() {} }; static { /** * Registers server specific types as a streamables for serialization * over the {@link StreamOutput} and {@link StreamInput} wire */ Streamables.registerStreamables(); /** Registers OpenSearch server specific exceptions (exceptions outside of core library) */ OpenSearchServerException.registerExceptions(); } /** does nothing. easy way to ensure class is loaded so the above static block is called to register the streamables */ public static void ensureClassloaded() {} /** * Build the service. * * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. */ public TransportService( Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { this( settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, new ClusterConnectionManager(settings, transport) ); } public TransportService( Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders, ConnectionManager connectionManager ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; this.connectionManager = connectionManager; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (remoteClusterClient) { remoteClusterService.listenForUpdates(clusterSettings); } clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } registerRequestHandler( HANDSHAKE_ACTION_NAME, ThreadPool.Names.SAME, false, false, HandshakeRequest::new, (request, channel, task) -> channel.sendResponse(new HandshakeResponse(localNode, clusterName, localNode.getVersion())) ); } public RemoteClusterService getRemoteClusterService() { return remoteClusterService; } public DiscoveryNode getLocalNode() { return localNode; } public TaskManager getTaskManager() { return taskManager; } protected TaskManager createTaskManager( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Set taskHeaders ) { if (clusterSettings != null) { return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders); } else { return new TaskManager(settings, threadPool, taskHeaders); } } /** * The executor service for this transport service. * * @return the executor service */ private ExecutorService getExecutorService() { return threadPool.generic(); } void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } void setTracerLogExclude(List tracerLogExclude) { this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY); } @Override protected void doStart() { transport.setMessageListener(this); connectionManager.addListener(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); for (Map.Entry entry : transport.profileBoundAddresses().entrySet()) { logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); } } localNode = localNodeFactory.apply(transport.boundAddress()); if (remoteClusterClient) { // here we start to connect to the remote clusters remoteClusterService.initializeRemoteClusters( ActionListener.wrap( r -> logger.info("Remote clusters initialized successfully."), e -> logger.error("Remote clusters initialization failed partially", e) ) ); } } @Override protected void doStop() { try { IOUtils.close(connectionManager, remoteClusterService, transport::stop); } catch (IOException e) { throw new UncheckedIOException(e); } finally { // in case the transport is not connected to our local node (thus cleaned on node disconnect) // make sure to clean any leftover on going handles for (final Transport.ResponseContext holderToNotify : responseHandlers.prune(h -> true)) { // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows getExecutorService().execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( "failed to notify response handler on rejection, action: {}", holderToNotify.action() ), e ); } @Override public void onFailure(Exception e) { logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", holderToNotify.action() ), e ); } @Override public void doRun() { TransportException ex = new SendRequestTransportException( holderToNotify.connection().getNode(), holderToNotify.action(), new NodeClosedException(localNode) ); holderToNotify.handler().handleException(ex); } }); } } } @Override protected void doClose() throws IOException { transport.close(); } /** * start accepting incoming requests. * when the transport layer starts up it will block any incoming requests until * this method is called */ public final void acceptIncomingRequests() { handleIncomingRequests.set(true); } @Override public TransportInfo info() { BoundTransportAddress boundTransportAddress = boundAddress(); if (boundTransportAddress == null) { return null; } return new TransportInfo(boundTransportAddress, transport.profileBoundAddresses()); } public TransportStats stats() { return transport.getStats(); } public boolean isTransportSecure() { return transport.isSecure(); } public BoundTransportAddress boundAddress() { return transport.boundAddress(); } public List getDefaultSeedAddresses() { return transport.getDefaultSeedAddresses(); } /** * Returns true iff the given node is already connected. */ public boolean nodeConnected(DiscoveryNode node) { return isLocalNode(node) || connectionManager.nodeConnected(node); } /** * Connect to the specified node with the default connection profile * * @param node the node to connect to */ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { connectToNode(node, (ConnectionProfile) null); } /** * Connect to the specified node as an extension with the default connection profile * * @param node the node to connect to */ public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException { connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId); } // We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's public void connectToExtensionNode(final DiscoveryNode node) { PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null))); } /** * Connect to the specified node with the given connection profile * * @param node the node to connect to * @param connectionProfile the connection profile to use when connecting to this node */ public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null))); } /** * Connect to the specified node with the given connection profile * * @param node the node to connect to * @param connectionProfile the connection profile to use when connecting to this node * @param extensionUniqueIq the id of the extension */ public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) { PlainActionFuture.get( fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null)) ); } public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null))); } /** * Connect to the specified node with the given connection profile. * The ActionListener will be called on the calling thread or the generic thread pool. * * @param node the node to connect to * @param listener the action listener to notify */ public void connectToNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { connectToNode(node, null, listener); } public void connectToExtensionNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { connectToExtensionNode(node, null, listener); } /** * Connect to the specified node with the given connection profile. * The ActionListener will be called on the calling thread or the generic thread pool. * * @param node the node to connect to * @param connectionProfile the connection profile to use when connecting to this node * @param listener the action listener to notify */ public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { if (isLocalNode(node)) { listener.onResponse(null); return; } connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener); } /** * Connect to the specified node as an extension with the given connection profile. * The ActionListener will be called on the calling thread or the generic thread pool. * * @param node the node to connect to * @param connectionProfile the connection profile to use when connecting to this node * @param extensionUniqueId the id of the extension * @param listener the action listener to notify */ public void connectToNodeAsExtension( final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueId, ActionListener listener ) { if (isLocalNode(node)) { listener.onResponse(null); return; } connectionManager.connectToNode( node, connectionProfile, connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId), listener ); } public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { if (isLocalNode(node)) { listener.onResponse(null); return; } connectionManager.connectToNode(node, connectionProfile, extensionConnectionValidator(node), listener); } public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode node) { return (newConnection, actualProfile, listener) -> { // We don't validate cluster names to allow for CCS connections. handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { final DiscoveryNode remote = resp.discoveryNode; if (node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } return null; })); }; } public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode( DiscoveryNode node, String extensionUniqueId ) { return (newConnection, actualProfile, listener) -> { // We don't validate cluster names to allow for CCS connections. String currentId = threadPool.getThreadContext().getHeader("extension_unique_id"); if (Strings.isNullOrEmpty(currentId) || !extensionUniqueId.equals(currentId)) { threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId); } handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { final DiscoveryNode remote = resp.discoveryNode; if (node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } return null; })); }; } public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) { return (newConnection, actualProfile, listener) -> { // We don't validate cluster names to allow for CCS connections. handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { final DiscoveryNode remote = resp.discoveryNode; logger.info("Connection validation was skipped"); return null; })); }; } /** * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers * responsibility to close the connection once it goes out of scope. * The ActionListener will be called on the calling thread or the generic thread pool. * @param node the node to connect to * @param connectionProfile the connection profile to use */ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile connectionProfile) { return PlainActionFuture.get(fut -> openConnection(node, connectionProfile, fut)); } /** * Establishes a new connection to the given node. The connection is NOT maintained by this service, it's the callers * responsibility to close the connection once it goes out of scope. * The ActionListener will be called on the calling thread or the generic thread pool. * @param node the node to connect to * @param connectionProfile the connection profile to use * @param listener the action listener to notify */ public void openConnection( final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener ) { if (isLocalNode(node)) { listener.onResponse(localNodeConnection); } else { connectionManager.openConnection(node, connectionProfile, listener); } } /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection * was established with. The handshake will fail if the cluster * name on the target node mismatches the local cluster name. * The ActionListener will be called on the calling thread or the generic thread pool. * * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @param listener action listener to notify * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ public void handshake( final Transport.Connection connection, final long handshakeTimeout, final ActionListener listener ) { handshake( connection, handshakeTimeout, clusterName.getEqualityPredicate(), ActionListener.map(listener, HandshakeResponse::getDiscoveryNode) ); } /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection * was established with. The handshake will fail if the cluster * name on the target node doesn't match the local cluster name. * The ActionListener will be called on the calling thread or the generic thread pool. * * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @param clusterNamePredicate cluster name validation predicate * @param listener action listener to notify * @throws IllegalStateException if the handshake failed */ public void handshake( final Transport.Connection connection, final long handshakeTimeout, Predicate clusterNamePredicate, final ActionListener listener ) { final DiscoveryNode node = connection.getNode(); sendRequest( connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(), new ActionListenerResponseHandler<>(new ActionListener() { @Override public void onResponse(HandshakeResponse response) { if (clusterNamePredicate.test(response.clusterName) == false) { listener.onFailure( new IllegalStateException( "handshake with [" + node + "] failed: remote cluster name [" + response.clusterName.value() + "] does not match " + clusterNamePredicate ) ); } else if (response.version.isCompatible(localNode.getVersion()) == false) { listener.onFailure( new IllegalStateException( "handshake with [" + node + "] failed: remote node version [" + response.version + "] is incompatible with local node version [" + localNode.getVersion() + "]" ) ); } else { listener.onResponse(response); } } @Override public void onFailure(Exception e) { listener.onFailure(e); } }, HandshakeResponse::new, ThreadPool.Names.GENERIC) ); } public ConnectionManager getConnectionManager() { return connectionManager; } /** * Internal Handshake request * * @opensearch.internal */ static class HandshakeRequest extends TransportRequest { public static final HandshakeRequest INSTANCE = new HandshakeRequest(); HandshakeRequest(StreamInput in) throws IOException { super(in); } private HandshakeRequest() {} } /** * Internal handshake response * * @opensearch.internal */ public static class HandshakeResponse extends TransportResponse { private final DiscoveryNode discoveryNode; private final ClusterName clusterName; private final Version version; public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { this.discoveryNode = discoveryNode; this.version = version; this.clusterName = clusterName; } public HandshakeResponse(StreamInput in) throws IOException { super(in); discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); clusterName = new ClusterName(in); version = in.readVersion(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(discoveryNode); clusterName.writeTo(out); out.writeVersion(version); } public DiscoveryNode getDiscoveryNode() { return discoveryNode; } public ClusterName getClusterName() { return clusterName; } } public void disconnectFromNode(DiscoveryNode node) { if (isLocalNode(node)) { return; } connectionManager.disconnectFromNode(node); } public void addMessageListener(TransportMessageListener listener) { messageListener.listeners.add(listener); } public boolean removeMessageListener(TransportMessageListener listener) { return messageListener.listeners.remove(listener); } public void addConnectionListener(TransportConnectionListener listener) { connectionManager.addListener(listener); } public void removeConnectionListener(TransportConnectionListener listener) { connectionManager.removeListener(listener); } public TransportFuture submitRequest( DiscoveryNode node, String action, TransportRequest request, TransportResponseHandler handler ) throws TransportException { return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler); } public TransportFuture submitRequest( DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler ) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture<>(handler); try { Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, options, futureHandler); } catch (NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler futureHandler.handleException(ex); } return futureHandler; } public void sendRequest( final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler handler ) { final Transport.Connection connection; try { connection = getConnection(node); } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); return; } sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); } public final void sendRequest( final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler ) { final Transport.Connection connection; try { connection = getConnection(node); } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); return; } sendRequest(connection, action, request, options, handler); } /** * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked. * * @param connection the connection to send the request on * @param action the name of the action * @param request the request * @param options the options for this request * @param handler the response handler * @param the type of the transport response */ public final void sendRequest( final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, final TransportResponseHandler handler ) { try { logger.debug("Action: " + action); final TransportResponseHandler delegate; if (request.getParentTask().isSet()) { // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); delegate = new TransportResponseHandler() { @Override public void handleResponse(T response) { unregisterChildNode.close(); handler.handleResponse(response); } @Override public void handleException(TransportException exp) { unregisterChildNode.close(); handler.handleException(exp); } @Override public String executor() { return handler.executor(); } @Override public T read(StreamInput in) throws IOException { return handler.read(in); } @Override public String toString() { return getClass().getName() + "/[" + action + "]:" + handler.toString(); } }; } else { delegate = handler; } asyncSender.sendRequest(connection, action, request, options, delegate); } catch (final Exception ex) { // the caller might not handle this so we invoke the handler final TransportException te; if (ex instanceof TransportException) { te = (TransportException) ex; } else { te = new TransportException("failure to send", ex); } handler.handleException(te); } } /** * Returns either a real transport connection or a local node connection if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected */ public Transport.Connection getConnection(DiscoveryNode node) { if (isLocalNode(node)) { return localNodeConnection; } else { return connectionManager.getConnection(node); } } public final void sendChildRequest( final DiscoveryNode node, final String action, final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler ) { final Transport.Connection connection; try { connection = getConnection(node); } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); return; } sendChildRequest(connection, action, request, parentTask, options, handler); } public void sendChildRequest( final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportResponseHandler handler ) { sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler); } public void sendChildRequest( final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler ) { request.setParentTask(localNode.getId(), parentTask.getId()); sendRequest(connection, action, request, options, handler); } private void sendRequestInternal( final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler ) { if (connection == null) { throw new IllegalStateException("can't send request to a null connection"); } DiscoveryNode node = connection.getNode(); Supplier storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true); ContextRestoreResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler); // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action)); final TimeoutHandler timeoutHandler; if (options.timeout() != null) { timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action); responseHandler.setTimeoutHandler(timeoutHandler); } else { timeoutHandler = null; } try { if (lifecycle.stoppedOrClosed()) { /* * If we are not started the exception handling will remove the request holder again and calls the handler to notify the * caller. It will only notify if toStop hasn't done the work yet. */ throw new NodeClosedException(localNode); } if (timeoutHandler != null) { assert options.timeout() != null; timeoutHandler.scheduleTimeout(options.timeout()); } connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream } catch (final Exception e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message final Transport.ResponseContext contextToNotify = responseHandlers.remove(requestId); // If holderToNotify == null then handler has already been taken care of. if (contextToNotify != null) { if (timeoutHandler != null) { timeoutHandler.cancel(); } // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current // thread on a best effort basis though. final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e); final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; threadPool.executor(executor).execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( "failed to notify response handler on rejection, action: {}", contextToNotify.action() ), e ); } @Override public void onFailure(Exception e) { logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", contextToNotify.action() ), e ); } @Override protected void doRun() throws Exception { contextToNotify.handler().handleException(sendRequestException); } }); } else { logger.debug("Exception while sending request, handler likely already notified due to timeout", e); } } } private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool); try { onRequestSent(localNode, requestId, action, request, options); onRequestReceived(requestId, action); final RequestHandlerRegistry reg = getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } final String executor = reg.getExecutor(); if (ThreadPool.Names.SAME.equals(executor)) { // noinspection unchecked reg.processMessageReceived(request, channel); } else { threadPool.executor(executor).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { // noinspection unchecked reg.processMessageReceived(request, channel); } @Override public boolean isForceExecution() { return reg.isForceExecution(); } @Override public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn( () -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner ); } } @Override public String toString() { return "processing of [" + requestId + "][" + action + "]: " + request; } }); } } catch (Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn(() -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner); } } } private boolean shouldTraceAction(String action) { return shouldTraceAction(action, tracerLogInclude, tracerLogExclude); } public static boolean shouldTraceAction(String action, String[] include, String[] exclude) { if (include.length > 0) { if (Regex.simpleMatch(include, action) == false) { return false; } } if (exclude.length > 0) { return !Regex.simpleMatch(exclude, action); } return true; } public TransportAddress[] addressesFromString(String address) throws UnknownHostException { return transport.addressesFromString(address); } /** * A set of all valid action prefixes. */ public static final Set VALID_ACTION_PREFIXES = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( "indices:admin", "indices:monitor", "indices:data/write", "indices:data/read", "indices:internal", "cluster:admin", "cluster:monitor", "cluster:internal", "internal:" ) ) ); private void validateActionName(String actionName) { // TODO we should makes this a hard validation and throw an exception but we need a good way to add backwards layer // for it. Maybe start with a deprecation layer if (isValidActionName(actionName) == false) { logger.warn("invalid action name [" + actionName + "] must start with one of: " + TransportService.VALID_ACTION_PREFIXES); } } /** * Returns true iff the action name starts with a valid prefix. * * @see #VALID_ACTION_PREFIXES */ public static boolean isValidActionName(String actionName) { for (String prefix : VALID_ACTION_PREFIXES) { if (actionName.startsWith(prefix)) { return true; } } return false; } /** * Registers a new request handler * * @param action The action the request handler is associated with * @param requestReader a callable to be used construct new instances for streaming * @param executor The executor the request handling will be executed on * @param handler The handler itself that implements the request handling */ public void registerRequestHandler( String action, String executor, Writeable.Reader requestReader, TransportRequestHandler handler ) { validateActionName(action); handler = interceptor.interceptHandler(action, executor, false, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, taskManager, handler, executor, false, true ); transport.registerRequestHandler(reg); } /** * Registers a new request handler * * @param action The action the request handler is associated with * @param requestReader The request class that will be used to construct new instances for streaming * @param executor The executor the request handling will be executed on * @param forceExecution Force execution on the executor queue and never reject it * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. * @param handler The handler itself that implements the request handling */ public void registerRequestHandler( String action, String executor, boolean forceExecution, boolean canTripCircuitBreaker, Writeable.Reader requestReader, TransportRequestHandler handler ) { validateActionName(action); handler = interceptor.interceptHandler(action, executor, forceExecution, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker ); transport.registerRequestHandler(reg); } /** * called by the {@link Transport} implementation when an incoming request arrives but before * any parsing of it has happened (with the exception of the requestId and action) */ @Override public void onRequestReceived(long requestId, String action) { if (handleIncomingRequests.get() == false) { throw new IllegalStateException("transport not ready yet to handle incoming requests"); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } messageListener.onRequestReceived(requestId, action); } /** called by the {@link Transport} implementation once a request has been sent */ @Override public void onRequestSent( DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options ) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } messageListener.onRequestSent(node, requestId, action, request, options); } @Override public void onResponseReceived(long requestId, Transport.ResponseContext holder) { if (holder == null) { checkForTimeout(requestId); } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); } messageListener.onResponseReceived(requestId, holder); } /** called by the {@link Transport} implementation once a response was sent to calling node */ @Override public void onResponseSent(long requestId, String action, TransportResponse response) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent response", requestId, action); } messageListener.onResponseSent(requestId, action, response); } /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ @Override public void onResponseSent(long requestId, String action, Exception e) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace(() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); } messageListener.onResponseSent(requestId, action, e); } public RequestHandlerRegistry getRequestHandler(String action) { return transport.getRequestHandlers().getHandler(action); } private void checkForTimeout(long requestId) { // lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished final DiscoveryNode sourceNode; final String action; assert responseHandlers.contains(requestId) == false; TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); if (timeoutInfoHolder != null) { long time = threadPool.relativeTimeInMillis(); logger.warn( "Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId ); action = timeoutInfoHolder.action(); sourceNode = timeoutInfoHolder.node(); } else { logger.warn("Transport response handler not found of id [{}]", requestId); action = null; sourceNode = null; } // call tracer out of lock if (tracerLog.isTraceEnabled() == false) { return; } if (action == null) { assert sourceNode == null; tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); } else if (shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); } } @Override public void onConnectionClosed(Transport.Connection connection) { try { List> pruned = responseHandlers.prune( h -> h.connection().getCacheKey().equals(connection.getCacheKey()) ); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows getExecutorService().execute(new Runnable() { @Override public void run() { for (Transport.ResponseContext holderToNotify : pruned) { holderToNotify.handler() .handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action())); } } @Override public String toString() { return "onConnectionClosed(" + connection.getNode() + ")"; } }); } catch (OpenSearchRejectedExecutionException ex) { logger.debug("Rejected execution on onConnectionClosed", ex); } } final class TimeoutHandler implements Runnable { private final long requestId; private final long sentTime = threadPool.relativeTimeInMillis(); private final String action; private final DiscoveryNode node; volatile Scheduler.Cancellable cancellable; TimeoutHandler(long requestId, DiscoveryNode node, String action) { this.requestId = requestId; this.node = node; this.action = action; } @Override public void run() { if (responseHandlers.contains(requestId)) { long timeoutTime = threadPool.relativeTimeInMillis(); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime)); // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id final Transport.ResponseContext holder = responseHandlers.remove(requestId); if (holder != null) { assert holder.action().equals(action); assert holder.connection().getNode().equals(node); holder.handler() .handleException( new ReceiveTimeoutTransportException( holder.connection().getNode(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]" ) ); } else { // response was processed, remove timeout info. timeoutInfoHandlers.remove(requestId); } } } /** * cancels timeout handling. this is a best effort only to avoid running it. remove the requestId from {@link #responseHandlers} * to make sure this doesn't run. */ public void cancel() { assert responseHandlers.contains(requestId) == false : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; if (cancellable != null) { cancellable.cancel(); } } @Override public String toString() { return "timeout handler for [" + requestId + "][" + action + "]"; } private void scheduleTimeout(TimeValue timeout) { this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC); } } /** * Holder for timeout information * * @opensearch.internal */ static class TimeoutInfoHolder { private final DiscoveryNode node; private final String action; private final long sentTime; private final long timeoutTime; TimeoutInfoHolder(DiscoveryNode node, String action, long sentTime, long timeoutTime) { this.node = node; this.action = action; this.sentTime = sentTime; this.timeoutTime = timeoutTime; } public DiscoveryNode node() { return node; } public String action() { return action; } public long sentTime() { return sentTime; } public long timeoutTime() { return timeoutTime; } } /** * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the handle methods * are invoked we restore the context. */ public static final class ContextRestoreResponseHandler implements TransportResponseHandler { private final TransportResponseHandler delegate; private final Supplier contextSupplier; private volatile TimeoutHandler handler; public ContextRestoreResponseHandler(Supplier contextSupplier, TransportResponseHandler delegate) { this.delegate = delegate; this.contextSupplier = contextSupplier; } @Override public T read(StreamInput in) throws IOException { return delegate.read(in); } @Override public void handleResponse(T response) { if (handler != null) { handler.cancel(); } try (ThreadContext.StoredContext ignore = contextSupplier.get()) { delegate.handleResponse(response); } } @Override public void handleException(TransportException exp) { if (handler != null) { handler.cancel(); } try (ThreadContext.StoredContext ignore = contextSupplier.get()) { delegate.handleException(exp); } } @Override public String executor() { return delegate.executor(); } @Override public String toString() { return getClass().getName() + "/" + delegate.toString(); } void setTimeoutHandler(TimeoutHandler handler) { this.handler = handler; } } /** * A channel for a direct response * * @opensearch.internal */ static class DirectResponseChannel implements TransportChannel { final DiscoveryNode localNode; private final String action; private final long requestId; final TransportService service; final ThreadPool threadPool; DirectResponseChannel(DiscoveryNode localNode, String action, long requestId, TransportService service, ThreadPool threadPool) { this.localNode = localNode; this.action = action; this.requestId = requestId; this.service = service; this.threadPool = threadPool; } @Override public String getProfileName() { return DIRECT_RESPONSE_PROFILE; } @Override public void sendResponse(TransportResponse response) throws IOException { service.onResponseSent(requestId, action, response); final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service); // ignore if its null, the service logs it if (handler != null) { final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { processResponse(handler, response); } else { threadPool.executor(executor).execute(new Runnable() { @Override public void run() { processResponse(handler, response); } @Override public String toString() { return "delivery of response to [" + requestId + "][" + action + "]: " + response; } }); } } } @SuppressWarnings("unchecked") protected void processResponse(TransportResponseHandler handler, TransportResponse response) { try { handler.handleResponse(response); } catch (Exception e) { processException(handler, wrapInRemote(new ResponseHandlerFailureTransportException(e))); } } @Override public void sendResponse(Exception exception) throws IOException { service.onResponseSent(requestId, action, exception); final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service); // ignore if its null, the service logs it if (handler != null) { final RemoteTransportException rtx = wrapInRemote(exception); final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { processException(handler, rtx); } else { threadPool.executor(handler.executor()).execute(new Runnable() { @Override public void run() { processException(handler, rtx); } @Override public String toString() { return "delivery of failure response to [" + requestId + "][" + action + "]: " + exception; } }); } } } protected RemoteTransportException wrapInRemote(Exception e) { if (e instanceof RemoteTransportException) { return (RemoteTransportException) e; } return new RemoteTransportException(localNode.getName(), localNode.getAddress(), action, e); } protected void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) { try { handler.handleException(rtx); } catch (Exception e) { logger.error( () -> new ParameterizedMessage("failed to handle exception for action [{}], handler [{}]", action, handler), e ); } } @Override public String getChannelType() { return "direct"; } @Override public Version getVersion() { return localNode.getVersion(); } } /** * Returns the internal thread pool */ public ThreadPool getThreadPool() { return threadPool; } private boolean isLocalNode(DiscoveryNode discoveryNode) { return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); } private static final class DelegatingTransportMessageListener implements TransportMessageListener { private final List listeners = new CopyOnWriteArrayList<>(); @Override public void onRequestReceived(long requestId, String action) { for (TransportMessageListener listener : listeners) { listener.onRequestReceived(requestId, action); } } @Override public void onResponseSent(long requestId, String action, TransportResponse response) { for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, response); } } @Override public void onResponseSent(long requestId, String action, Exception error) { for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, error); } } @Override public void onRequestSent( DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions finalOptions ) { for (TransportMessageListener listener : listeners) { listener.onRequestSent(node, requestId, action, request, finalOptions); } } @Override public void onResponseReceived(long requestId, Transport.ResponseContext holder) { for (TransportMessageListener listener : listeners) { listener.onResponseReceived(requestId, holder); } } } }