/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ package org.opensearch.search.backpressure.trackers; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import java.io.IOException; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; /** * CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed. * * @opensearch.internal */ public class CpuUsageTracker extends TaskResourceUsageTracker { private final LongSupplier thresholdSupplier; public CpuUsageTracker(LongSupplier thresholdSupplier) { this.thresholdSupplier = thresholdSupplier; } @Override public String name() { return CPU_USAGE_TRACKER.getName(); } @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); long threshold = thresholdSupplier.getAsLong(); if (usage < threshold) { return Optional.empty(); } return Optional.of( new TaskCancellation.Reason( "cpu usage exceeded [" + new TimeValue(usage, TimeUnit.NANOSECONDS) + " >= " + new TimeValue(threshold, TimeUnit.NANOSECONDS) + "]", 1 // TODO: fine-tune the cancellation score/weight ) ); } @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).average().orElse(0); return new Stats(getCancellations(), currentMax, currentAvg); } /** * Stats related to CpuUsageTracker. */ public static class Stats implements TaskResourceUsageTracker.Stats { private final long cancellationCount; private final long currentMax; private final long currentAvg; public Stats(long cancellationCount, long currentMax, long currentAvg) { this.cancellationCount = cancellationCount; this.currentMax = currentMax; this.currentAvg = currentAvg; } public Stats(StreamInput in) throws IOException { this(in.readVLong(), in.readVLong(), in.readVLong()); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.startObject() .field("cancellation_count", cancellationCount) .humanReadableField("current_max_millis", "current_max", new TimeValue(currentMax, TimeUnit.NANOSECONDS)) .humanReadableField("current_avg_millis", "current_avg", new TimeValue(currentAvg, TimeUnit.NANOSECONDS)) .endObject(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(cancellationCount); out.writeVLong(currentMax); out.writeVLong(currentAvg); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Stats stats = (Stats) o; return cancellationCount == stats.cancellationCount && currentMax == stats.currentMax && currentAvg == stats.currentAvg; } @Override public int hashCode() { return Objects.hash(cancellationCount, currentMax, currentAvg); } } }