/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.ad.caching;
import java.time.Clock;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.timeseries.annotation.Generated;
/**
* A priority tracker for entities. Read docs/entity-priority.pdf for details.
*
* HC detectors use a 1-pass algorithm for estimating heavy hitters in a stream.
* Our method maintains a time-decayed count for each entity, which allows us to
* compare the frequencies/priorities of entities from different detectors in the
* stream.
* This class contains the heavy-hitter tracking logic. When an entity is hit,
* a user calls PriorityTracker.updatePriority to update the entity's priority.
* The user can find the most frequently occurring entities in the stream using
* PriorityTracker.getTopNEntities. A typical usage is listed below:
*
*
* PriorityTracker tracker = ...
*
* // at time t1
* tracker.updatePriority(entity1);
* tracker.updatePriority(entity3);
*
* // at time t2
* tracker.updatePriority(entity1);
* tracker.updatePriority(entity2);
*
* // we should have entity 1, 2, 3 in order. 2 comes before 3 because it happens later
* List<String> top3 = tracker.getTopNEntities(3);
*
*
*/
public class PriorityTracker {
private static final Logger LOG = LogManager.getLogger(PriorityTracker.class);
// data structure for an entity and its priority
static class PriorityNode {
// entity key
private String key;
// time-decayed priority
private float priority;
PriorityNode(String key, float priority) {
this.priority = priority;
this.key = key;
}
@Generated
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
if (obj instanceof PriorityNode) {
PriorityNode other = (PriorityNode) obj;
EqualsBuilder equalsBuilder = new EqualsBuilder();
equalsBuilder.append(key, other.key);
return equalsBuilder.isEquals();
}
return false;
}
@Generated
@Override
public int hashCode() {
return new HashCodeBuilder().append(key).toHashCode();
}
@Generated
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this);
builder.append("key", key);
builder.append("priority", priority);
return builder.toString();
}
}
// Comparator between two entities. Used to sort entities in a priority queue
static class PriorityNodeComparator implements Comparator {
@Override
public int compare(PriorityNode priority, PriorityNode priority2) {
int equality = priority.key.compareTo(priority2.key);
if (equality == 0) {
// this is consistent with PriorityNode's equals method
return 0;
}
// if not equal, first check priority
int cmp = Float.compare(priority.priority, priority2.priority);
if (cmp == 0) {
// if priority is equal, use lexicographical order of key
cmp = equality;
}
return cmp;
}
}
// key -> Priority node
private final ConcurrentHashMap key2Priority;
// when detector is created. Can be reset. Unit: seconds
private long landmarkEpoch;
// a list of priority nodes
private final ConcurrentSkipListSet priorityList;
// Used to get current time.
private final Clock clock;
// length of seconds in one interval. Used to compute elapsed periods
// since the detector has been enabled.
private final long intervalSecs;
// determines how fast the decay is
// We use the decay constant 0.125. The half life (https://en.wikipedia.org/wiki/Exponential_decay)
// is 8* ln(2). This means the old value falls to one half with roughly 5.6 intervals.
// We chose 0.125 because multiplying 0.125 can be implemented efficiently using 3 right
// shift and the half life is not too fast or slow .
private final int DECAY_CONSTANT;
// the max number of entities to track
private final int maxEntities;
/**
* Create a priority tracker for a detector. Detector and priority tracker
* have 1:1 mapping.
*
* @param clock Used to get current time.
* @param intervalSecs Detector interval seconds.
* @param landmarkEpoch The epoch time when the priority tracking starts.
* @param maxEntities the max number of entities to track
*/
public PriorityTracker(Clock clock, long intervalSecs, long landmarkEpoch, int maxEntities) {
this.key2Priority = new ConcurrentHashMap<>();
this.clock = clock;
this.intervalSecs = intervalSecs;
this.landmarkEpoch = landmarkEpoch;
this.priorityList = new ConcurrentSkipListSet<>(new PriorityNodeComparator());
this.DECAY_CONSTANT = 3;
this.maxEntities = maxEntities;
}
/**
* Get the minimum priority entity and compute its scaled priority.
* Used to compare entity priorities among detectors.
* @return the minimum priority entity's ID and scaled priority or Optional.empty
* if the priority list is empty
*/
public Optional> getMinimumScaledPriority() {
if (priorityList.isEmpty()) {
return Optional.empty();
}
PriorityNode smallest = priorityList.first();
return Optional.of(new SimpleImmutableEntry<>(smallest.key, getScaledPriority(smallest.priority)));
}
/**
* Get the minimum priority entity and compute its scaled priority.
* Used to compare entity priorities within the same detector.
* @return the minimum priority entity's ID and scaled priority or Optional.empty
* if the priority list is empty
*/
public Optional> getMinimumPriority() {
if (priorityList.isEmpty()) {
return Optional.empty();
}
PriorityNode smallest = priorityList.first();
return Optional.of(new SimpleImmutableEntry<>(smallest.key, smallest.priority));
}
/**
*
* @return the minimum priority entity's Id or Optional.empty
* if the priority list is empty
*/
public Optional getMinimumPriorityEntityId() {
if (priorityList.isEmpty()) {
return Optional.empty();
}
return Optional.of(priorityList).map(list -> list.first()).map(node -> node.key);
}
/**
*
* @return Get maximum priority entity's Id
*/
public Optional getHighestPriorityEntityId() {
if (priorityList.isEmpty()) {
return Optional.empty();
}
return Optional.of(priorityList).map(list -> list.last()).map(node -> node.key);
}
/**
* Update an entity's priority with count increment
* @param entityId Entity Id
*/
public void updatePriority(String entityId) {
PriorityNode node = key2Priority.computeIfAbsent(entityId, k -> new PriorityNode(entityId, 0f));
// reposition this node
this.priorityList.remove(node);
node.priority = getUpdatedPriority(node.priority);
this.priorityList.add(node);
adjustSizeIfRequired();
}
/**
* Associate the specified priority with the entity Id
* @param entityId Entity Id
* @param priority priority
*/
protected void addPriority(String entityId, float priority) {
PriorityNode node = new PriorityNode(entityId, priority);
key2Priority.put(entityId, node);
priorityList.add(node);
adjustSizeIfRequired();
}
/**
* Adjust tracking list if the size exceeded the limit
*/
private void adjustSizeIfRequired() {
if (key2Priority.size() > maxEntities) {
Optional minPriorityId = getMinimumPriorityEntityId();
if (minPriorityId.isPresent()) {
removePriority(minPriorityId.get());
}
}
}
/**
* Remove an entity in the tracker
* @param entityId Entity Id
*/
protected void removePriority(String entityId) {
// remove if the key matches; priority does not matter
priorityList.remove(new PriorityNode(entityId, 0));
key2Priority.remove(entityId);
}
/**
* Remove all of entities
*/
protected void clearPriority() {
key2Priority.clear();
priorityList.clear();
}
/**
* Return the updated priority with new priority increment. Used when comparing
* entities' priorities within the same detector.
*
* Each detector maintains an ordered map, filled by entities's accumulated sum of g(i−L),
* which is what this function computes.
*
* g(n) = e^{0.125n}. i is current period. L is the landmark: period 0 when the
* detector is enabled. i - L measures the elapsed periods since detector starts.
* 0.125 is the decay constant.
*
* Since g(i−L) is changing and they are the same for all entities of the same detector,
* we can compare entities' priorities by considering the accumulated sum of g(i−L).
*
* @param oldPriority Existing priority
*
* @return new priority
*/
float getUpdatedPriority(float oldPriority) {
long increment = computeWeightedPriorityIncrement();
oldPriority += Math.log(1 + Math.exp(increment - oldPriority));
// if overflow happens, using the most recent decayed count instead.
if (oldPriority == Float.POSITIVE_INFINITY) {
oldPriority = increment;
}
return oldPriority;
}
/**
* Return the scaled priority. Used when comparing entities' priorities among
* different detectors.
*
* Updated priority = current priority - log(g(t - L)), where g(n) = e^{0.125n},
* t is current time, and L is the landmark. t - L measures the number of elapsed
* periods relative to the landmark.
*
* When replacing an entity, we query the minimum from each ordered map and
* compute w(i,p) for each minimum entity by scaling the sum by g(p−L). Notice g(p−L)
* can be different if detectors start at different timestamps. The minimum of the minimum
* is selected to be replaced. The number of multi-entity detectors is limited (we consider
* to support ten currently), so the computation is cheap.
*
* @param currentPriority Current priority
* @return the scaled priority
*/
float getScaledPriority(float currentPriority) {
return currentPriority - computeWeightedPriorityIncrement();
}
/**
* Compute the weighted priority increment using 0.125n, where n is the number of
* periods relative to the landmark.
* Each detector has its own landmark L: period 0 when the detector is enabled.
*
* @return the weighted priority increment used in the priority update step.
*/
long computeWeightedPriorityIncrement() {
long periods = (clock.instant().getEpochSecond() - landmarkEpoch) / intervalSecs;
return periods >> DECAY_CONSTANT;
}
/**
*
* @param n the number of entities to return. Can be less than n if there are not enough entities stored.
* @return top entities in the descending order of priority
*/
public List getTopNEntities(int n) {
List entities = new ArrayList<>();
Iterator entityIterator = priorityList.descendingIterator();
for (int i = 0; i < n && entityIterator.hasNext(); i++) {
entities.add(entityIterator.next().key);
}
return entities;
}
/**
*
* @return the number of tracked entities
*/
public int size() {
return key2Priority.size();
}
}