package org.opensearch.migrations.replay; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.opensearch.migrations.replay.traffic.expiration.BehavioralPolicy; import org.opensearch.migrations.replay.traffic.expiration.ExpiringTrafficStreamMap; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.function.Function; import java.util.stream.Collectors; class ExpiringTrafficStreamMapSequentialTest { public static final String TEST_NODE_ID_STRING = "test_node_id"; public static void testLinearExpirations(Function connectionGenerator, int window, int granularity, int expectedExpirationCounts[]) { var expiredAccumulations = new ArrayList(); var expiringMap = new ExpiringTrafficStreamMap(Duration.ofSeconds(window), Duration.ofSeconds(granularity), new BehavioralPolicy() { @Override public void onExpireAccumulation(String partitionId, Accumulation accumulation) { expiredAccumulations.add(accumulation); } }); var createdAccumulations = new ArrayList(); var expiredCountsPerLoop = new ArrayList(); for (int i=0; i""+i).collect(Collectors.joining()), expiredCountsPerLoop.stream().map(i->""+i).collect(Collectors.joining())); } @Test public void testLinearConnectionsAreExpired() { testLinearExpirations(i->"connectionId_"+i, 5,1, new int[] {0, 0, 0, 0, 0, 0, 1, 2, 3}); } @Test public void testLinearConnectionsWithGreaterGranulatityAreExpired() { testLinearExpirations(i->"connectionId_"+i, 3,2, new int[] {0, 0, 0, 0, 1, 1, 2, 2, 3}); } @Test public void testLinearActivityWillPersist() { var zeroArray = new int[10]; testLinearExpirations(i -> "connectionId", 5, 1, zeroArray); } }