/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.transport; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; import java.util.function.Function; /** * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second * node that acts as a request proxy to the target node. This is useful if a node is not directly connected to a target node but is * connected to an intermediate node that establishes a transitive connection. * * @opensearch.internal */ public final class TransportActionProxy { private TransportActionProxy() {} // no instance /** * Handler for proxy requests * * @opensearch.internal */ private static class ProxyRequestHandler implements TransportRequestHandler { private final TransportService service; private final String action; private final Function> responseFunction; ProxyRequestHandler( TransportService service, String action, Function> responseFunction ) { this.service = service; this.action = action; this.responseFunction = responseFunction; } @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { DiscoveryNode targetNode = request.targetNode; TransportRequest wrappedRequest = request.wrapped; service.sendRequest( targetNode, action, wrappedRequest, new ProxyResponseHandler<>(channel, responseFunction.apply(wrappedRequest)) ); } } /** * Handler for the proxy response * * @opensearch.internal */ private static class ProxyResponseHandler implements TransportResponseHandler { private final Writeable.Reader reader; private final TransportChannel channel; ProxyResponseHandler(TransportChannel channel, Writeable.Reader reader) { this.reader = reader; this.channel = channel; } @Override public T read(StreamInput in) throws IOException { return reader.read(in); } @Override public void handleResponse(T response) { try { channel.sendResponse(response); } catch (IOException e) { throw new UncheckedIOException(e); } } @Override public void handleException(TransportException exp) { try { channel.sendResponse(exp); } catch (IOException e) { throw new UncheckedIOException(e); } } @Override public String executor() { return ThreadPool.Names.SAME; } } /** * The proxy request * * @opensearch.internal */ static class ProxyRequest extends TransportRequest { final T wrapped; final DiscoveryNode targetNode; ProxyRequest(T wrapped, DiscoveryNode targetNode) { this.wrapped = wrapped; this.targetNode = targetNode; } ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); targetNode.writeTo(out); wrapped.writeTo(out); } } /** * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type changes based on the upcoming request (quite rare) */ public static void registerProxyActionWithDynamicResponseType( TransportService service, String action, Function> responseFunction ) { RequestHandlerRegistry requestHandler = service.getRequestHandler(action); service.registerRequestHandler( getProxyAction(action), ThreadPool.Names.SAME, true, false, in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, responseFunction) ); } /** * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type is always the same (most of the cases). */ public static void registerProxyAction(TransportService service, String action, Writeable.Reader reader) { RequestHandlerRegistry requestHandler = service.getRequestHandler(action); service.registerRequestHandler( getProxyAction(action), ThreadPool.Names.SAME, true, false, in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, request -> reader) ); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; /** * Returns the corresponding proxy action for the given action */ public static String getProxyAction(String action) { return PROXY_ACTION_PREFIX + action; } /** * Wraps the actual request in a proxy request object that encodes the target node. */ public static TransportRequest wrapRequest(DiscoveryNode node, TransportRequest request) { return new ProxyRequest<>(request, node); } /** * Unwraps a proxy request and returns the original request */ public static TransportRequest unwrapRequest(TransportRequest request) { if (request instanceof ProxyRequest) { return ((ProxyRequest) request).wrapped; } return request; } /** * Unwraps a proxy action and returns the underlying action */ public static String unwrapAction(String action) { assert isProxyAction(action) : "Attempted to unwrap non-proxy action: " + action; return action.substring(PROXY_ACTION_PREFIX.length()); } /** * Returns true iff the given action is a proxy action */ public static boolean isProxyAction(String action) { return action.startsWith(PROXY_ACTION_PREFIX); } /** * Returns true iff the given request is a proxy request */ public static boolean isProxyRequest(TransportRequest request) { return request instanceof ProxyRequest; } }