// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package services.kinesisanalytics.operators import com.amazonaws.services.timestreamwrite.model.MeasureValueType import com.google.common.reflect.TypeToken import com.google.gson.Gson import org.apache.flink.api.common.functions.RichMapFunction import org.slf4j.LoggerFactory import services.timestream.TimestreamPoint import java.util.* class JsonToTimestreamPayloadFn : RichMapFunction>() { companion object { private val LOG = LoggerFactory.getLogger(JsonToTimestreamPayloadFn::class.java) } @Override @Throws(Exception::class) override fun map(jsonString: String): List { val map = Gson().fromJson>( jsonString, object : TypeToken>() {}.type ) val basePoint = TimestreamPoint() val measures = HashMap(map.size) for ((key, value) in map) { if (key.lowercase(Locale.ENGLISH).endsWith("_measure")) { measures[key] = value continue } when (key.lowercase(Locale.ENGLISH)) { "time" -> basePoint.time = value.toLong() "timeunit" -> basePoint.timeUnit = value else -> basePoint.addDimension(key, value) } } LOG.trace("mapped to point {}", basePoint) return measures.entries.asSequence() .map { basePoint.copy( measureName = it.key, measureValue = it.value, measureValueType = MeasureValueType.DOUBLE ) } .toList() } }