/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.persistent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.opensearch.tasks.TaskId; import java.util.Map; import java.util.function.Predicate; /** * An executor of tasks that can survive restart of requesting or executing node. * These tasks are using cluster state rather than only transport service to send requests and responses. * * @opensearch.internal */ public abstract class PersistentTasksExecutor { private final String executor; private final String taskName; protected PersistentTasksExecutor(String taskName, String executor) { this.taskName = taskName; this.executor = executor; } public String getTaskName() { return taskName; } public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment"); /** * Returns the node id where the params has to be executed, *

* The default implementation returns the least loaded data node */ public Assignment getAssignment(Params params, ClusterState clusterState) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode); if (discoveryNode == null) { return NO_NODE_FOUND; } else { return new Assignment(discoveryNode.getId(), ""); } } /** * Finds the least loaded node that satisfies the selector criteria */ protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate selector) { long minLoad = Long.MAX_VALUE; DiscoveryNode minLoadedNode = null; PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { if (selector.test(node)) { if (persistentTasks == null) { // We don't have any task running yet, pick the first available node return node; } long numberOfTasks = persistentTasks.getNumberOfTasksOnNode(node.getId(), taskName); if (minLoad > numberOfTasks) { minLoad = numberOfTasks; minLoadedNode = node; } } } return minLoadedNode; } /** * Checks the current cluster state for compatibility with the params *

* Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ public void validate(Params params, ClusterState clusterState) {} /** * Creates a AllocatedPersistentTask for communicating with task manager */ protected AllocatedPersistentTask createTask( long id, String type, String action, TaskId parentTaskId, PersistentTask taskInProgress, Map headers ) { return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); } /** * Returns task description that will be available via task manager */ protected String getDescription(PersistentTask taskInProgress) { return "id=" + taskInProgress.getId(); } /** * This operation will be executed on the executor node. *

* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable PersistentTaskState state); public String getExecutor() { return executor; } }