/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.transport; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.collect.MapBuilder; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.core.transport.TransportResponse; import java.io.Closeable; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; /** * OpenSearch Transport Interface * * @opensearch.internal */ public interface Transport extends LifecycleComponent { /** * Registers a new request handler */ default void registerRequestHandler(RequestHandlerRegistry reg) { getRequestHandlers().registerHandler(reg); } void setMessageListener(TransportMessageListener listener); default void setSlowLogThreshold(TimeValue slowLogThreshold) {} default boolean isSecure() { return false; } /** * The address the transport is bound on. */ BoundTransportAddress boundAddress(); /** * Further profile bound addresses * @return null iff profiles are unsupported, otherwise a map with name of profile and its bound transport address */ Map profileBoundAddresses(); /** * Returns an address from its string representation. */ TransportAddress[] addressesFromString(String address) throws UnknownHostException; /** * Returns a list of all local addresses for this transport */ List getDefaultSeedAddresses(); /** * Opens a new connection to the given node. When the connection is fully connected, the listener is called. * The ActionListener will be called on the calling thread or the generic thread pool. */ void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener); TransportStats getStats(); ResponseHandlers getResponseHandlers(); RequestHandlers getRequestHandlers(); /** * A unidirectional connection to a {@link DiscoveryNode} */ interface Connection extends Closeable { /** * The node this connection is associated with */ DiscoveryNode getNode(); /** * Sends the request to the node this connection is associated with * @param requestId see {@link ResponseHandlers#add(ResponseContext)} for details * @param action the action to execute * @param request the request to send * @param options request options to apply * @throws NodeNotConnectedException if the given node is not connected */ void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; /** * The listener's {@link ActionListener#onResponse(Object)} method will be called when this * connection is closed. No implementations currently throw an exception during close, so * {@link ActionListener#onFailure(Exception)} will not be called. * * @param listener to be called */ void addCloseListener(ActionListener listener); boolean isClosed(); /** * Returns the version of the node this connection was established with. */ default Version getVersion() { return getNode().getVersion(); } /** * Returns a key that this connection can be cached on. Delegating subclasses must delegate method call to * the original connection. */ default Object getCacheKey() { return this; } @Override void close(); } /** * This class represents a response context that encapsulates the actual response handler, the action and the connection it was * executed on. */ final class ResponseContext { private final TransportResponseHandler handler; private final Connection connection; private final String action; ResponseContext(TransportResponseHandler handler, Connection connection, String action) { this.handler = handler; this.connection = connection; this.action = action; } public TransportResponseHandler handler() { return handler; } public Connection connection() { return this.connection; } public String action() { return this.action; } } /** * This class is a registry that allows */ final class ResponseHandlers { private final ConcurrentMapLong> handlers = ConcurrentCollections .newConcurrentMapLongWithAggressiveConcurrency(); private final AtomicLong requestIdGenerator = new AtomicLong(); /** * Returns true if the give request ID has a context associated with it. */ public boolean contains(long requestId) { return handlers.containsKey(requestId); } /** * Removes and return the {@link ResponseContext} for the given request ID or returns * null if no context is associated with this request ID. */ public ResponseContext remove(long requestId) { return handlers.remove(requestId); } /** * Adds a new response context and associates it with a new request ID. * @return the new request ID * @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions) */ public long add(ResponseContext holder) { long requestId = newRequestId(); ResponseContext existing = handlers.put(requestId, holder); assert existing == null : "request ID already in use: " + requestId; return requestId; } /** * Returns a new request ID to use when sending a message via {@link Connection#sendRequest(long, String, * TransportRequest, TransportRequestOptions)} */ long newRequestId() { return requestIdGenerator.incrementAndGet(); } /** * Removes and returns all {@link ResponseContext} instances that match the predicate */ public List> prune(Predicate> predicate) { final List> holders = new ArrayList<>(); for (Map.Entry> entry : handlers.entrySet()) { ResponseContext holder = entry.getValue(); if (predicate.test(holder)) { ResponseContext remove = handlers.remove(entry.getKey()); if (remove != null) { holders.add(holder); } } } return holders; } /** * called by the {@link Transport} implementation when a response or an exception has been received for a previously * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ public TransportResponseHandler onResponseReceived( final long requestId, final TransportMessageListener listener ) { ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { return null; } else { return context.handler(); } } } /** * Request handler implementations * * @opensearch.internal */ final class RequestHandlers { private volatile Map> requestHandlers = Collections.emptyMap(); synchronized void registerHandler(RequestHandlerRegistry reg) { if (requestHandlers.containsKey(reg.getAction())) { throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered"); } requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } // TODO: Only visible for testing. Perhaps move StubbableTransport from // org.opensearch.test.transport to org.opensearch.transport public synchronized void forceRegister(RequestHandlerRegistry reg) { requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } @SuppressWarnings("unchecked") public RequestHandlerRegistry getHandler(String action) { return (RequestHandlerRegistry) requestHandlers.get(action); } } }