package com.amazonaws.services.kinesisanalytics.domain; import com.amazonaws.services.kinesisanalytics.avro.RoomTemperature; import com.amazonaws.services.kinesisanalytics.avro.TemperatureSample; import org.apache.flink.api.common.functions.AggregateFunction; import java.time.Instant; /** * AggregateFunction aggregating the room average temperature * (provided only for completeness, but not relevant for the AVRO-Glue Schema Registry example) */ public class RoomAverageTemperatureCalculator implements AggregateFunction { private Instant maxInstant(Instant a, Instant b) { return Instant.ofEpochMilli( Math.max(a.toEpochMilli(), b.toEpochMilli())); } @Override public RoomTemperature createAccumulator() { return RoomTemperature.newBuilder() .setRoom("").setSampleCount(0).setLastSampleTime(Instant.EPOCH).setTemperature(0).build(); } @Override public RoomTemperature add(TemperatureSample sample, RoomTemperature accumulator) { final int sampleCount = accumulator.getSampleCount(); final float roomAvgTemp = accumulator.getTemperature(); final Instant lastSampleTime = maxInstant(accumulator.getLastSampleTime(), sample.getSampleTime()); final float newRoomAvgTemp = (roomAvgTemp * sampleCount + sample.getTemperature()) / (float) (sampleCount + 1); accumulator.setRoom(sample.getRoom()); accumulator.setSampleCount(sampleCount + 1); accumulator.setTemperature(newRoomAvgTemp); accumulator.setLastSampleTime(lastSampleTime); return accumulator; } @Override public RoomTemperature getResult(RoomTemperature accumulator) { return accumulator; } @Override public RoomTemperature merge(RoomTemperature a, RoomTemperature b) { final int totalSampleCount = a.getSampleCount() + b.getSampleCount(); final float avgTemperature = (a.getTemperature() * a.getSampleCount() + b.getTemperature() * b.getSampleCount()) / (float) (totalSampleCount); return RoomTemperature.newBuilder() .setRoom(a.getRoom()) .setSampleCount(totalSampleCount) .setTemperature(avgTemperature) .setLastSampleTime(maxInstant(a.getLastSampleTime(), b.getLastSampleTime())) .build(); } }