/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.cluster.coordination; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.unit.TimeValue; import org.opensearch.transport.TransportException; import org.opensearch.core.transport.TransportResponse; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.LongSupplier; import java.util.stream.Collectors; /** * Publication task * * @opensearch.internal */ public abstract class Publication { protected final Logger logger = LogManager.getLogger(getClass()); private final List publicationTargets; private final PublishRequest publishRequest; private final AckListener ackListener; private final LongSupplier currentTimeSupplier; private final long startTime; private Optional applyCommitRequest; // set when state is committed private boolean isCompleted; // set when publication is completed private boolean cancelled; // set when publication is cancelled public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { this.publishRequest = publishRequest; this.ackListener = ackListener; this.currentTimeSupplier = currentTimeSupplier; startTime = currentTimeSupplier.getAsLong(); applyCommitRequest = Optional.empty(); publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); publishRequest.getAcceptedState() .getNodes() .clusterManagersFirstStream() .forEach(n -> publicationTargets.add(new PublicationTarget(n))); } public void start(Set faultyNodes) { logger.trace("publishing {} to {}", publishRequest, publicationTargets); for (final DiscoveryNode faultyNode : faultyNodes) { onFaultyNode(faultyNode); } onPossibleCommitFailure(); publicationTargets.forEach(PublicationTarget::sendPublishRequest); } public void cancel(String reason) { if (isCompleted) { return; } assert cancelled == false; cancelled = true; if (applyCommitRequest.isPresent() == false) { logger.debug("cancel: [{}] cancelled before committing (reason: {})", this, reason); // fail all current publications final Exception e = new OpenSearchException("publication cancelled before committing: " + reason); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); } onPossibleCompletion(); } public void onFaultyNode(DiscoveryNode faultyNode) { publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); onPossibleCompletion(); } public List completedNodes() { return publicationTargets.stream() .filter(PublicationTarget::isSuccessfullyCompleted) .map(PublicationTarget::getDiscoveryNode) .collect(Collectors.toList()); } public boolean isCommitted() { return applyCommitRequest.isPresent(); } private void onPossibleCompletion() { if (isCompleted) { return; } if (cancelled == false) { for (final PublicationTarget target : publicationTargets) { if (target.isActive()) { return; } } } if (applyCommitRequest.isPresent() == false) { logger.debug("onPossibleCompletion: [{}] commit failed", this); assert isCompleted == false; isCompleted = true; onCompletion(false); return; } assert isCompleted == false; isCompleted = true; onCompletion(true); assert applyCommitRequest.isPresent(); logger.trace("onPossibleCompletion: [{}] was successful", this); } // For assertions only: verify that this invariant holds private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() { if (cancelled == false) { for (final PublicationTarget target : publicationTargets) { if (target.isActive()) { return isCompleted == false; } } } return isCompleted; } // For assertions ClusterState publishedState() { return publishRequest.getAcceptedState(); } private void onPossibleCommitFailure() { if (applyCommitRequest.isPresent()) { onPossibleCompletion(); return; } final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); for (PublicationTarget publicationTarget : publicationTargets) { if (publicationTarget.mayCommitInFuture()) { possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); } else { assert publicationTarget.isFailed() : publicationTarget; } } if (isPublishQuorum(possiblySuccessfulNodes) == false) { logger.debug( "onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed", possiblySuccessfulNodes, this ); Exception e = new FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); onPossibleCompletion(); } } protected abstract void onCompletion(boolean committed); protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); protected abstract Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); protected abstract void onJoin(Join join); protected abstract void onMissingJoin(DiscoveryNode discoveryNode); protected abstract void sendPublishRequest( DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener ); protected abstract void sendApplyCommit( DiscoveryNode destination, ApplyCommitRequest applyCommit, ActionListener responseActionListener ); @Override public String toString() { return "Publication{term=" + publishRequest.getAcceptedState().term() + ", version=" + publishRequest.getAcceptedState().version() + '}'; } void logIncompleteNodes(Level level) { final String message = publicationTargets.stream() .filter(PublicationTarget::isActive) .map(publicationTarget -> publicationTarget.getDiscoveryNode() + " [" + publicationTarget.getState() + "]") .collect(Collectors.joining(", ")); if (message.isEmpty() == false) { final TimeValue elapsedTime = TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime); logger.log( level, "after [{}] publication of cluster state version [{}] is still waiting for {}", elapsedTime, publishRequest.getAcceptedState().version(), message ); } } enum PublicationTargetState { NOT_STARTED, FAILED, SENT_PUBLISH_REQUEST, WAITING_FOR_QUORUM, SENT_APPLY_COMMIT, APPLIED_COMMIT, } /** * A publication target. * * @opensearch.internal */ class PublicationTarget { private final DiscoveryNode discoveryNode; private boolean ackIsPending = true; private PublicationTargetState state = PublicationTargetState.NOT_STARTED; PublicationTarget(DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; } PublicationTargetState getState() { return state; } @Override public String toString() { return "PublicationTarget{" + "discoveryNode=" + discoveryNode + ", state=" + state + ", ackIsPending=" + ackIsPending + '}'; } void sendPublishRequest() { if (isFailed()) { return; } assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST; state = PublicationTargetState.SENT_PUBLISH_REQUEST; Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } void handlePublishResponse(PublishResponse publishResponse) { assert isWaitingForQuorum() : this; logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); if (applyCommitRequest.isPresent()) { sendApplyCommit(); } else { try { Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { assert applyCommitRequest.isPresent() == false; applyCommitRequest = Optional.of(applyCommit); ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); publicationTargets.stream() .filter(PublicationTarget::isWaitingForQuorum) .forEach(PublicationTarget::sendApplyCommit); }); } catch (Exception e) { setFailed(e); onPossibleCommitFailure(); } } } void sendApplyCommit() { assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT; state = PublicationTargetState.SENT_APPLY_COMMIT; assert applyCommitRequest.isPresent(); Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } void setAppliedCommit() { assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; state = PublicationTargetState.APPLIED_COMMIT; ackOnce(null); } void setFailed(Exception e) { assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED; state = PublicationTargetState.FAILED; ackOnce(e); } void onFaultyNode(DiscoveryNode faultyNode) { if (isActive() && discoveryNode.equals(faultyNode)) { logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this); setFailed(new OpenSearchException("faulty node")); onPossibleCommitFailure(); } } DiscoveryNode getDiscoveryNode() { return discoveryNode; } private void ackOnce(Exception e) { if (ackIsPending) { ackIsPending = false; ackListener.onNodeAck(discoveryNode, e); } } boolean isActive() { return state != PublicationTargetState.FAILED && state != PublicationTargetState.APPLIED_COMMIT; } boolean isSuccessfullyCompleted() { return state == PublicationTargetState.APPLIED_COMMIT; } boolean isWaitingForQuorum() { return state == PublicationTargetState.WAITING_FOR_QUORUM; } boolean mayCommitInFuture() { return (state == PublicationTargetState.NOT_STARTED || state == PublicationTargetState.SENT_PUBLISH_REQUEST || state == PublicationTargetState.WAITING_FOR_QUORUM); } boolean isFailed() { return state == PublicationTargetState.FAILED; } /** * A handler for a publish response. * * @opensearch.internal */ private class PublishResponseHandler implements ActionListener { @Override public void onResponse(PublishWithJoinResponse response) { if (isFailed()) { logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); return; } if (response.getJoin().isPresent()) { final Join join = response.getJoin().get(); assert discoveryNode.equals(join.getSourceNode()); assert join.getTerm() == response.getPublishResponse().getTerm() : response; logger.trace("handling join within publish response: {}", join); onJoin(join); } else { logger.trace("publish response from {} contained no join", discoveryNode); onMissingJoin(discoveryNode); } assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; state = PublicationTargetState.WAITING_FOR_QUORUM; handlePublishResponse(response.getPublishResponse()); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } @Override public void onFailure(Exception e) { assert e instanceof TransportException; final TransportException exp = (TransportException) e; logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); assert ((TransportException) e).getRootCause() instanceof Exception; setFailed((Exception) exp.getRootCause()); onPossibleCommitFailure(); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } } /** * An apply commit response handler. * * @opensearch.internal */ private class ApplyCommitResponseHandler implements ActionListener { @Override public void onResponse(TransportResponse.Empty ignored) { if (isFailed()) { logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); return; } setAppliedCommit(); onPossibleCompletion(); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } @Override public void onFailure(Exception e) { assert e instanceof TransportException; final TransportException exp = (TransportException) e; logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); assert ((TransportException) e).getRootCause() instanceof Exception; setFailed((Exception) exp.getRootCause()); onPossibleCompletion(); assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } } } }