package org.opensearch.migrations.replay;
import com.google.protobuf.ByteString;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
public class Utils {
public static final int MAX_BYTES_SHOWN_FOR_TO_STRING = 128;
/**
* See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
*/
public static Collector
foldLeft(final B seedValue, final BiFunction super B, ? super A, ? extends B> f) {
return Collectors.collectingAndThen(
Collectors.reducing(
Function.identity(),
a -> b -> f.apply(b, a),
Function::andThen),
finisherArg -> finisherArg.apply(seedValue)
);
}
public static String packetsToStringTruncated(List packetStream) {
return packetsToStringTruncated(Optional.ofNullable(packetStream).map(p->p.stream()).orElse(null),
MAX_BYTES_SHOWN_FOR_TO_STRING);
}
public static String packetsToStringTruncated(Stream packetStream, int maxBytesToShow) {
if (packetStream == null) { return "null"; }
return packetStream.map(bArr-> {
var str = IntStream.range(0, bArr.length).map(idx -> bArr[idx])
.limit(maxBytesToShow)
.mapToObj(b -> "" + (char) b)
.collect(Collectors.joining());
return "[" + (bArr.length > maxBytesToShow ? str + "..." : str) + "]";
})
.collect(Collectors.joining(","));
}
public static String packetsToCompressedTrafficStream(Stream packetStream) {
var tsb = TrafficStream.newBuilder()
.setNumberOfThisLastChunk(1);
var trafficStreamOfReads =
packetStream.map(bArr->ReadObservation.newBuilder().setData(ByteString.copyFrom(bArr)).build())
.map(r->TrafficObservation.newBuilder().setRead(r))
.collect(foldLeft(tsb, (existing,newObs)->tsb.addSubStream(newObs)))
.build();
try (var baos = new ByteArrayOutputStream()) {
try (var gzStream = new GZIPOutputStream(baos)) {
trafficStreamOfReads.writeTo(gzStream);
}
baos.flush();
var binaryContents = baos.toByteArray();
return Base64.getEncoder().encodeToString(binaryContents);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}