/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.indexmanagement.rollup import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.action.DocWriteRequest import org.opensearch.action.DocWriteResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client import org.opensearch.common.Rounding import org.opensearch.common.time.DateFormatter import org.opensearch.common.time.DateFormatters import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.model.RollupStats import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.search.aggregations.bucket.composite.InternalComposite import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import org.opensearch.transport.RemoteTransportException import java.time.Instant // TODO: Wrap client calls in retry for transient failures // Service that handles CRUD operations for rollup metadata @Suppress("TooManyFunctions") class RollupMetadataService(val client: Client, val xContentRegistry: NamedXContentRegistry) { private val logger = LogManager.getLogger(javaClass) // If the job does not have a metadataID then we need to initialize the first metadata // document for this job otherwise we should get the existing metadata document @Suppress("ReturnCount", "ComplexMethod", "NestedBlockDepth") suspend fun init(rollup: Rollup): MetadataResult { if (rollup.metadataID != null) { val existingMetadata = when (val getMetadataResult = getExistingMetadata(rollup)) { is MetadataResult.Success -> getMetadataResult.metadata is MetadataResult.NoMetadata -> null is MetadataResult.Failure -> return getMetadataResult } if (existingMetadata != null) { if (existingMetadata.status == RollupMetadata.Status.RETRY) { val recoveredMetadata = when (val recoverMetadataResult = recoverRetryMetadata(rollup, existingMetadata)) { is MetadataResult.Success -> recoverMetadataResult.metadata // NoMetadata here means that there were no documents when initializing start time // for a continuous rollup so we will propagate the response to no-op in the runner is MetadataResult.NoMetadata -> return recoverMetadataResult // In case of failure, return early with the result is MetadataResult.Failure -> return recoverMetadataResult } // Update to the recovered metadata if recovery was successful return submitMetadataUpdate(recoveredMetadata, true) } else { // If metadata exists and was not in RETRY status, return the existing metadata return MetadataResult.Success(existingMetadata) } } else { // The existing metadata was not found, create a new metadata in FAILED status return submitMetadataUpdate( RollupMetadata( rollupID = rollup.id, lastUpdatedTime = Instant.now(), status = RollupMetadata.Status.FAILED, failureReason = "Not able to get the rollup metadata [${rollup.metadataID}]", stats = RollupStats(0, 0, 0, 0, 0) ), false ) } } val createdMetadataResult = if (rollup.continuous) createContinuousMetadata(rollup) else createNonContinuousMetadata(rollup) return when (createdMetadataResult) { is MetadataResult.Success -> submitMetadataUpdate(createdMetadataResult.metadata, false) // Hitting this case means that there were no documents when initializing start time for a continuous rollup is MetadataResult.NoMetadata -> createdMetadataResult is MetadataResult.Failure -> createdMetadataResult } } @Suppress("ReturnCount") private suspend fun recoverRetryMetadata(rollup: Rollup, metadata: RollupMetadata): MetadataResult { var continuousMetadata = metadata.continuous if (rollup.continuous && metadata.continuous == null) { val nextWindowStartTime = when (val initStartTimeResult = getInitialStartTime(rollup)) { is StartingTimeResult.Success -> initStartTimeResult.startingTime is StartingTimeResult.NoDocumentsFound -> return MetadataResult.NoMetadata is StartingTimeResult.Failure -> return MetadataResult.Failure("Failed to initialize start time for retried rollup job [${rollup.id}]", initStartTimeResult.e) } val nextWindowEndTime = getShiftedTime(nextWindowStartTime, rollup) continuousMetadata = ContinuousMetadata(nextWindowStartTime, nextWindowEndTime) } return MetadataResult.Success( metadata.copy( continuous = continuousMetadata, status = RollupMetadata.Status.STARTED ) ) } // This returns the first instantiation of a RollupMetadata for a non-continuous rollup private fun createNonContinuousMetadata(rollup: Rollup): MetadataResult = MetadataResult.Success( RollupMetadata( rollupID = rollup.id, lastUpdatedTime = Instant.now(), status = RollupMetadata.Status.INIT, stats = RollupStats(0, 0, 0, 0, 0) ) ) // This updates the metadata for a non-continuous rollup after an execution of the composite search and ingestion of rollup data private fun getUpdatedNonContinuousMetadata( metadata: RollupMetadata, internalComposite: InternalComposite ): RollupMetadata { val afterKey = internalComposite.afterKey() return metadata.copy( afterKey = afterKey, lastUpdatedTime = Instant.now(), status = if (afterKey == null) RollupMetadata.Status.FINISHED else RollupMetadata.Status.STARTED ) } // This returns the first instantiation of a RollupMetadata for a continuous rollup @Suppress("ReturnCount") private suspend fun createContinuousMetadata(rollup: Rollup): MetadataResult { val nextWindowStartTime = when (val initStartTimeResult = getInitialStartTime(rollup)) { is StartingTimeResult.Success -> initStartTimeResult.startingTime is StartingTimeResult.NoDocumentsFound -> return MetadataResult.NoMetadata is StartingTimeResult.Failure -> return MetadataResult.Failure("Failed to initialize start time for rollup [${rollup.id}]", initStartTimeResult.e) } // The first end time is just the next window start time val nextWindowEndTime = getShiftedTime(nextWindowStartTime, rollup) return MetadataResult.Success( RollupMetadata( rollupID = rollup.id, afterKey = null, lastUpdatedTime = Instant.now(), continuous = ContinuousMetadata(nextWindowStartTime, nextWindowEndTime), status = RollupMetadata.Status.INIT, failureReason = null, stats = RollupStats(0, 0, 0, 0, 0) ) ) } // TODO: Let User specify their own filter query that is applied to the composite agg search @Suppress("ReturnCount") @Throws(Exception::class) private suspend fun getInitialStartTime(rollup: Rollup): StartingTimeResult { try { // Rollup requires the first dimension to be the date histogram val dateHistogram = rollup.dimensions.first() as DateHistogram val searchSourceBuilder = SearchSourceBuilder() .size(1) .query(MatchAllQueryBuilder()) .sort(dateHistogram.sourceField, SortOrder.ASC) // TODO: figure out where nulls are sorted .trackTotalHits(false) .fetchSource(false) .docValueField(dateHistogram.sourceField, DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT) val searchRequest = SearchRequest(rollup.sourceIndex) .source(searchSourceBuilder) .allowPartialSearchResults(false) val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } if (response.hits.hits.isEmpty()) { // Empty doc hits will result in a no-op from the runner return StartingTimeResult.NoDocumentsFound } // Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis // If the doc value is null or empty it will be treated the same as empty doc hits val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>() ?: return StartingTimeResult.NoDocumentsFound // Parse date and extract epochMillis val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT) val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli() return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram)) } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException") return StartingTimeResult.Failure(unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $e") return StartingTimeResult.Failure(e) } } /** * Return time rounded down to the nearest unit of time the interval is based on. * This should map to the equivalent bucket a document with the given timestamp would fall into for the date histogram. */ private fun getRoundedTime(timestamp: Long, dateHistogram: DateHistogram): Instant { val roundingStrategy = getRoundingStrategy(dateHistogram) val roundedMillis = roundingStrategy .prepare(timestamp, timestamp) .round(timestamp) return Instant.ofEpochMilli(roundedMillis) } /** Takes an existing start or end time and returns the value for the next window based on the rollup interval */ private fun getShiftedTime(time: Instant, rollup: Rollup): Instant { val dateHistogram = rollup.dimensions.first() as DateHistogram val roundingStrategy = getRoundingStrategy(dateHistogram) val timeInMillis = time.toEpochMilli() val nextRoundedMillis = roundingStrategy .prepare(timeInMillis, timeInMillis) .nextRoundingValue(timeInMillis) return Instant.ofEpochMilli(nextRoundedMillis) } /** * Get the rounding strategy for the given time interval in the DateHistogram. * This is used to calculate time windows by rounding the given time based on the interval. */ // TODO: Could make this an extension function of DateHistogram and add to some utility file private fun getRoundingStrategy(dateHistogram: DateHistogram): Rounding { val intervalString = (dateHistogram.calendarInterval ?: dateHistogram.fixedInterval) as String // TODO: Make sure the interval string is validated before getting here so we don't get errors return if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.containsKey(intervalString)) { // Calendar intervals should be handled here val intervalUnit: Rounding.DateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS[intervalString]!! Rounding.builder(intervalUnit) .timeZone(dateHistogram.timezone) .build() } else { // Fixed intervals are handled here val timeValue = TimeValue.parseTimeValue(intervalString, "RollupMetadataService#getRoundingStrategy") Rounding.builder(timeValue) .timeZone(dateHistogram.timezone) .build() } } // This updates the metadata for a continuous rollup after an execution of the composite search and ingestion of rollup data fun getUpdatedContinuousMetadata( rollup: Rollup, metadata: RollupMetadata, internalComposite: InternalComposite ): RollupMetadata { val afterKey = internalComposite.afterKey() // TODO: get rid of !! val nextStart = if (afterKey == null) { getShiftedTime(metadata.continuous!!.nextWindowStartTime, rollup) } else metadata.continuous!!.nextWindowStartTime val nextEnd = if (afterKey == null) { getShiftedTime(metadata.continuous.nextWindowEndTime, rollup) } else metadata.continuous.nextWindowEndTime return metadata.copy( afterKey = internalComposite.afterKey(), lastUpdatedTime = Instant.now(), continuous = ContinuousMetadata(nextStart, nextEnd), status = RollupMetadata.Status.STARTED ) } @Suppress("BlockingMethodInNonBlockingContext", "ReturnCount") suspend fun getExistingMetadata(rollup: Rollup): MetadataResult { val errorMessage = "Error when getting rollup metadata [${rollup.metadataID}]" try { var rollupMetadata: RollupMetadata? = null val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, rollup.metadataID).routing(rollup.id) val response: GetResponse = client.suspendUntil { get(getRequest, it) } if (!response.isExists) return MetadataResult.NoMetadata val metadataSource = response.sourceAsBytesRef metadataSource?.let { withContext(Dispatchers.IO) { val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, metadataSource, XContentType.JSON) rollupMetadata = xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, RollupMetadata.Companion::parse) } } return if (rollupMetadata != null) { MetadataResult.Success(rollupMetadata!!) } else MetadataResult.NoMetadata } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception logger.debug("$errorMessage: $unwrappedException") return MetadataResult.Failure(errorMessage, unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular logger.debug("$errorMessage: $e") return MetadataResult.Failure(errorMessage, e) } } suspend fun updateMetadata(rollup: Rollup, metadata: RollupMetadata, internalComposite: InternalComposite): RollupMetadata { val updatedMetadata = if (rollup.continuous) { getUpdatedContinuousMetadata(rollup, metadata, internalComposite) } else { getUpdatedNonContinuousMetadata(metadata, internalComposite) } return updateMetadata(updatedMetadata) } suspend fun updateMetadata(metadata: RollupMetadata): RollupMetadata { return when (val metadataUpdateResult = submitMetadataUpdate(metadata, metadata.id != NO_ID)) { is MetadataResult.Success -> metadataUpdateResult.metadata is MetadataResult.Failure -> throw RollupMetadataException("Failed to update rollup metadata [${metadata.id}]", metadataUpdateResult.cause) // NoMetadata is not expected from submitMetadataUpdate here is MetadataResult.NoMetadata -> throw RollupMetadataException("Unexpected state when updating rollup metadata [${metadata.id}]", null) } } /** * Sets a failure metadata for the rollup job with the given reason. * Can provide an existing metadata to update, if none are provided a new metadata is created * to replace the current one for the job. */ suspend fun setFailedMetadata(job: Rollup, reason: String, existingMetadata: RollupMetadata? = null): MetadataResult { val updatedMetadata: RollupMetadata? if (existingMetadata == null) { // Create new metadata updatedMetadata = RollupMetadata( rollupID = job.id, status = RollupMetadata.Status.FAILED, failureReason = reason, lastUpdatedTime = Instant.now(), stats = RollupStats(0, 0, 0, 0, 0) ) } else { // Update the given existing metadata updatedMetadata = existingMetadata.copy( status = RollupMetadata.Status.FAILED, failureReason = reason, lastUpdatedTime = Instant.now() ) } return submitMetadataUpdate(updatedMetadata, updatedMetadata.id != NO_ID) } @Suppress("ComplexMethod", "ReturnCount") private suspend fun submitMetadataUpdate(metadata: RollupMetadata, updating: Boolean): MetadataResult { val errorMessage = "An error occurred when ${if (updating) "updating" else "creating"} rollup metadata" try { @Suppress("BlockingMethodInNonBlockingContext") val builder = XContentFactory.jsonBuilder().startObject() .field(RollupMetadata.ROLLUP_METADATA_TYPE, metadata) .endObject() val indexRequest = IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX).source(builder).routing(metadata.rollupID) if (updating) { indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) } else { indexRequest.opType(DocWriteRequest.OpType.CREATE) } val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } var status: RollupMetadata.Status = metadata.status var failureReason: String? = metadata.failureReason when (response.result) { DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { // noop } DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { status = RollupMetadata.Status.FAILED failureReason = "The create metadata call failed with a ${response.result?.lowercase} result" } } // TODO: Is seqno/prim and id returned for all? return MetadataResult.Success( metadata.copy( id = response.id, seqNo = response.seqNo, primaryTerm = response.primaryTerm, status = status, failureReason = failureReason ) ) } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception return MetadataResult.Failure(errorMessage, unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular return MetadataResult.Failure(errorMessage, e) } } } sealed class MetadataResult { // A successful MetadataResult just means a metadata was returned, // it can still have a FAILED status data class Success(val metadata: RollupMetadata) : MetadataResult() data class Failure(val message: String = "An error occurred for rollup metadata", val cause: Exception) : MetadataResult() object NoMetadata : MetadataResult() } sealed class StartingTimeResult { data class Success(val startingTime: Instant) : StartingTimeResult() data class Failure(val e: Exception) : StartingTimeResult() object NoDocumentsFound : StartingTimeResult() } class RollupMetadataException(message: String, cause: Throwable?) : Exception(message, cause)