/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */

@file:Suppress("TooManyFunctions")

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.BoostingQueryBuilder
import org.opensearch.index.query.ConstantScoreQueryBuilder
import org.opensearch.index.query.DisMaxQueryBuilder
import org.opensearch.index.query.MatchAllQueryBuilder
import org.opensearch.index.query.MatchPhraseQueryBuilder
import org.opensearch.index.query.QueryBuilder
import org.opensearch.index.query.QueryStringQueryBuilder
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Dimension
import org.opensearch.indexmanagement.common.model.dimension.Histogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.rollup.RollupMapperService
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.model.metric.Average
import org.opensearch.indexmanagement.rollup.model.metric.Max
import org.opensearch.indexmanagement.rollup.model.metric.Min
import org.opensearch.indexmanagement.rollup.model.metric.Sum
import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder
import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder
import org.opensearch.search.aggregations.metrics.MinAggregationBuilder
import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder

const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"

@Suppress("ReturnCount")
fun isRollupIndex(index: String, clusterState: ClusterState): Boolean {
    val idx = clusterState.metadata.index(index)
    if (idx == null) {
        return false
    } else if (RollupSettings.ROLLUP_INDEX.get(idx.settings)) {
        return true
    } else if (LegacyOpenDistroRollupSettings.ROLLUP_INDEX.get(idx.settings)) {
        return true
    }
    return false
}

fun Rollup.isTargetIndexAlias(): Boolean {
    return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex)
}

fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
    val query = if (metadata.continuous != null) {
        RangeQueryBuilder(this.getDateHistogram().sourceField)
            .from(metadata.continuous.nextWindowStartTime.toEpochMilli(), true)
            .to(metadata.continuous.nextWindowEndTime.toEpochMilli(), false)
            .format(DATE_FIELD_EPOCH_MILLIS_FORMAT)
    } else {
        MatchAllQueryBuilder()
    }
    val searchSourceBuilder = SearchSourceBuilder()
        .trackTotalHits(false)
        .size(0)
        .aggregation(this.getCompositeAggregationBuilder(metadata.afterKey))
        .query(query)
    return SearchRequest(this.sourceIndex)
        .source(searchSourceBuilder)
        .allowPartialSearchResults(false)
}

@Suppress("ComplexMethod", "NestedBlockDepth")
fun Rollup.getCompositeAggregationBuilder(afterKey: Map<String, Any>?): CompositeAggregationBuilder {
    val sources = mutableListOf<CompositeValuesSourceBuilder<*>>()
    this.dimensions.forEach { dimension -> sources.add(dimension.toSourceBuilder(appendType = true)) }
    return CompositeAggregationBuilder(this.id, sources).size(this.pageSize).also { compositeAgg ->
        afterKey?.let { compositeAgg.aggregateAfter(it) }
        this.metrics.forEach { metric ->
            val subAggs = metric.metrics.flatMap { agg ->
                when (agg) {
                    is Average -> {
                        listOf(
                            SumAggregationBuilder(metric.targetFieldWithType(agg) + ".sum").field(metric.sourceField),
                            ValueCountAggregationBuilder(metric.targetFieldWithType(agg) + ".value_count").field(metric.sourceField)
                        )
                    }
                    is Sum -> listOf(SumAggregationBuilder(metric.targetFieldWithType(agg)).field(metric.sourceField))
                    is Max -> listOf(MaxAggregationBuilder(metric.targetFieldWithType(agg)).field(metric.sourceField))
                    is Min -> listOf(MinAggregationBuilder(metric.targetFieldWithType(agg)).field(metric.sourceField))
                    is ValueCount -> listOf(ValueCountAggregationBuilder(metric.targetFieldWithType(agg)).field(metric.sourceField))
                    // This shouldn't be possible as rollup will fail to initialize with an unsupported metric
                    else -> throw IllegalArgumentException("Found unsupported metric aggregation ${agg.type.type}")
                }
            }
            subAggs.forEach { compositeAgg.subAggregation(it) }
        }
    }
}

// There can only be one date histogram in Rollup and it should always be in the first position of dimensions
// This is validated in the rollup init itself, but need to redo it here to correctly return date histogram
fun Rollup.getDateHistogram(): DateHistogram {
    val dimension = this.dimensions.first()
    require(dimension is DateHistogram) { "The first dimension in rollup must be a date histogram" }
    return dimension
}

fun Rollup.findMatchingDimension(field: String, type: Dimension.Type): Dimension? =
    this.dimensions.find { dimension -> dimension.sourceField == field && dimension.type == type }

// This method is only to be used after its confirmed the search/aggs is valid and these exist
@Suppress("NestedBlockDepth")
inline fun <reified T> Rollup.findMatchingMetricField(field: String): String {
    for (rollupMetrics in this.metrics) {
        if (rollupMetrics.sourceField == field) {
            for (metric in rollupMetrics.metrics) {
                if (metric is T) {
                    return rollupMetrics.targetFieldWithType(metric)
                }
            }
        }
    }
    error("Did not find matching rollup metric")
}

@Suppress("NestedBlockDepth", "ComplexMethod")
fun IndexMetadata.getRollupJobs(): List<Rollup>? {
    val rollupJobs = mutableListOf<Rollup>()
    val source = this.mapping()?.source() ?: return null
    val xcp = XContentHelper
        .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.compressedReference(), XContentType.JSON)
    ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) // start of block
    ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) // _doc
    ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) // start of _doc block
    while (xcp.nextToken() != Token.END_OBJECT) {
        val fieldName = xcp.currentName()
        xcp.nextToken()

        when (fieldName) {
            IndexUtils._META -> {
                ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
                while (xcp.nextToken() != Token.END_OBJECT) {
                    val metaField = xcp.currentName()
                    xcp.nextToken()

                    when (metaField) {
                        RollupMapperService.ROLLUPS -> {
                            ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
                            while (xcp.nextToken() != Token.END_OBJECT) {
                                val rollupID = xcp.currentName()
                                xcp.nextToken()
                                rollupJobs.add(Rollup.parse(xcp, rollupID))
                            }
                        }
                        else -> xcp.skipChildren()
                    }
                }
            }
            else -> xcp.skipChildren()
        }
    }
    // This method is the authoritative source for getting the rollupJobs for an index and if there are none found this method should return null
    return if (rollupJobs.size > 0) rollupJobs else null
}

// TODO: If we have to set this manually for each aggregation builder then it means we could miss new ones settings in the future
@Suppress("ComplexMethod", "LongMethod")
fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): AggregationBuilder {
    val aggFactory = AggregatorFactories.builder().also { factories ->
        aggregationBuilder.subAggregations.forEach {
            factories.addAggregator(this.rewriteAggregationBuilder(it))
        }
    }

    return when (aggregationBuilder) {
        is TermsAggregationBuilder -> {
            val dim = this.findMatchingDimension(aggregationBuilder.field(), Dimension.Type.TERMS) as Terms
            dim.getRewrittenAggregation(aggregationBuilder, aggFactory)
        }
        is DateHistogramAggregationBuilder -> {
            val dim = this.findMatchingDimension(aggregationBuilder.field(), Dimension.Type.DATE_HISTOGRAM) as DateHistogram
            dim.getRewrittenAggregation(aggregationBuilder, aggFactory)
        }
        is HistogramAggregationBuilder -> {
            val dim = this.findMatchingDimension(aggregationBuilder.field(), Dimension.Type.HISTOGRAM) as Histogram
            dim.getRewrittenAggregation(aggregationBuilder, aggFactory)
        }
        is SumAggregationBuilder -> {
            SumAggregationBuilder(aggregationBuilder.name)
                .field(this.findMatchingMetricField<Sum>(aggregationBuilder.field()))
        }
        is AvgAggregationBuilder -> {
            ScriptedMetricAggregationBuilder(aggregationBuilder.name)
                .initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0D; state.counts = 0L;", emptyMap()))
                .mapScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "state.sums += doc[\"${this.findMatchingMetricField<Average>(aggregationBuilder.field()) + ".sum"}\"].value; " +
                            "state.counts += doc[\"${this.findMatchingMetricField<Average>(aggregationBuilder.field()) + ".value_count"}\"" +
                            "].value",
                        emptyMap()
                    )
                )
                .combineScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "def d = new double[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap()
                    )
                )
                .reduceScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "double sum = 0; double count = 0; for (a in states) { sum += a[0]; count += a[1]; } return sum/count", emptyMap()
                    )
                )
        }
        is MaxAggregationBuilder -> {
            MaxAggregationBuilder(aggregationBuilder.name)
                .field(this.findMatchingMetricField<Max>(aggregationBuilder.field()))
        }
        is MinAggregationBuilder -> {
            MinAggregationBuilder(aggregationBuilder.name)
                .field(this.findMatchingMetricField<Min>(aggregationBuilder.field()))
        }
        is ValueCountAggregationBuilder -> {
            /*
            * A value count aggs of a pre-computed value count is incorrect as it just returns the number of
            * pre-computed value counts instead of their sum. Unfortunately can't just use the sum aggregation
            * because I was not able to find a way to cast the result of that to a long (instead of the returned float)
            * and the 3893 vs 3893.0 was bothering me.. so this is the next best I can think of. Hopefully there is a better
            * way and we can use that in the future.
            * */
            ScriptedMetricAggregationBuilder(aggregationBuilder.name)
                .initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.valueCounts = []", emptyMap()))
                .mapScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "state.valueCounts.add(doc[\"${this.findMatchingMetricField<ValueCount>(aggregationBuilder.field())}\"].value)",
                        emptyMap()
                    )
                )
                .combineScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "long valueCount = 0; for (vc in state.valueCounts) { valueCount += vc } return valueCount", emptyMap()
                    )
                )
                .reduceScript(
                    Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "long valueCount = 0; for (vc in states) { valueCount += vc } return valueCount", emptyMap()
                    )
                )
        }
        // We do nothing otherwise, the validation logic should have already verified so not throwing an exception
        else -> aggregationBuilder
    }
}

@Suppress("ComplexMethod", "LongMethod")
fun Rollup.rewriteQueryBuilder(
    queryBuilder: QueryBuilder,
    fieldNameMappingTypeMap: Map<String, String>,
    concreteIndexName: String = ""
): QueryBuilder {
    return when (queryBuilder) {
        is TermQueryBuilder -> {
            val updatedFieldName = queryBuilder.fieldName() + "." + Dimension.Type.TERMS.type
            val updatedTermQueryBuilder = TermQueryBuilder(updatedFieldName, queryBuilder.value())
            updatedTermQueryBuilder.boost(queryBuilder.boost())
            updatedTermQueryBuilder.queryName(queryBuilder.queryName())
        }
        is TermsQueryBuilder -> {
            val updatedFieldName = queryBuilder.fieldName() + "." + Dimension.Type.TERMS.type
            val updatedTermsQueryBuilder = TermsQueryBuilder(updatedFieldName, queryBuilder.values())
            updatedTermsQueryBuilder.boost(queryBuilder.boost())
            updatedTermsQueryBuilder.queryName(queryBuilder.queryName())
        }
        is RangeQueryBuilder -> {
            val updatedFieldName = queryBuilder.fieldName() + "." + fieldNameMappingTypeMap.getValue(queryBuilder.fieldName())
            val updatedRangeQueryBuilder = RangeQueryBuilder(updatedFieldName)
            updatedRangeQueryBuilder.includeLower(queryBuilder.includeLower())
            updatedRangeQueryBuilder.includeUpper(queryBuilder.includeUpper())
            updatedRangeQueryBuilder.from(queryBuilder.from())
            updatedRangeQueryBuilder.to(queryBuilder.to())
            if (queryBuilder.timeZone() != null) updatedRangeQueryBuilder.timeZone(queryBuilder.timeZone())
            if (queryBuilder.format() != null) updatedRangeQueryBuilder.format(queryBuilder.format())
            if (queryBuilder.relation()?.relationName != null) updatedRangeQueryBuilder.relation(queryBuilder.relation().relationName)
            updatedRangeQueryBuilder.queryName(queryBuilder.queryName())
            updatedRangeQueryBuilder.boost(queryBuilder.boost())
        }
        is BoolQueryBuilder -> {
            val newBoolQueryBuilder = BoolQueryBuilder()
            queryBuilder.must()?.forEach {
                val newMustQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)
                newBoolQueryBuilder.must(newMustQueryBuilder)
            }
            queryBuilder.mustNot()?.forEach {
                val newMustNotQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)
                newBoolQueryBuilder.mustNot(newMustNotQueryBuilder)
            }
            queryBuilder.should()?.forEach {
                val newShouldQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)
                newBoolQueryBuilder.should(newShouldQueryBuilder)
            }
            queryBuilder.filter()?.forEach {
                val newFilterQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)
                newBoolQueryBuilder.filter(newFilterQueryBuilder)
            }
            newBoolQueryBuilder.minimumShouldMatch(queryBuilder.minimumShouldMatch())
            newBoolQueryBuilder.adjustPureNegative(queryBuilder.adjustPureNegative())
            newBoolQueryBuilder.queryName(queryBuilder.queryName())
            newBoolQueryBuilder.boost(queryBuilder.boost())
        }
        is BoostingQueryBuilder -> {
            val newPositiveQueryBuilder = this.rewriteQueryBuilder(queryBuilder.positiveQuery(), fieldNameMappingTypeMap, concreteIndexName)
            val newNegativeQueryBuilder = this.rewriteQueryBuilder(queryBuilder.negativeQuery(), fieldNameMappingTypeMap, concreteIndexName)
            val newBoostingQueryBuilder = BoostingQueryBuilder(newPositiveQueryBuilder, newNegativeQueryBuilder)
            if (queryBuilder.negativeBoost() >= 0) newBoostingQueryBuilder.negativeBoost(queryBuilder.negativeBoost())
            newBoostingQueryBuilder.queryName(queryBuilder.queryName())
            newBoostingQueryBuilder.boost(queryBuilder.boost())
        }
        is ConstantScoreQueryBuilder -> {
            val newInnerQueryBuilder = this.rewriteQueryBuilder(queryBuilder.innerQuery(), fieldNameMappingTypeMap, concreteIndexName)
            val newConstantScoreQueryBuilder = ConstantScoreQueryBuilder(newInnerQueryBuilder)
            newConstantScoreQueryBuilder.boost(queryBuilder.boost())
            newConstantScoreQueryBuilder.queryName(queryBuilder.queryName())
        }
        is DisMaxQueryBuilder -> {
            val newDisMaxQueryBuilder = DisMaxQueryBuilder()
            queryBuilder.innerQueries().forEach {
                newDisMaxQueryBuilder.add(this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName))
            }
            newDisMaxQueryBuilder.tieBreaker(queryBuilder.tieBreaker())
            newDisMaxQueryBuilder.queryName(queryBuilder.queryName())
            newDisMaxQueryBuilder.boost(queryBuilder.boost())
        }
        is MatchPhraseQueryBuilder -> {
            val newFieldName = queryBuilder.fieldName() + "." + Dimension.Type.TERMS.type
            val newMatchPhraseQueryBuilder = MatchPhraseQueryBuilder(newFieldName, queryBuilder.value())
            newMatchPhraseQueryBuilder.queryName(queryBuilder.queryName())
            newMatchPhraseQueryBuilder.boost(queryBuilder.boost())
        }
        is QueryStringQueryBuilder -> {
            QueryStringQueryUtil.rewriteQueryStringQuery(queryBuilder, concreteIndexName)
        }
        // We do nothing otherwise, the validation logic should have already verified so not throwing an exception
        else -> queryBuilder
    }
}

fun Set<Rollup>.buildRollupQuery(fieldNameMappingTypeMap: Map<String, String>, oldQuery: QueryBuilder, targetIndexName: String = ""): QueryBuilder {
    val wrappedQueryBuilder = BoolQueryBuilder()
    wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap, targetIndexName))
    wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id }))
    wrappedQueryBuilder.minimumShouldMatch(1)
    return wrappedQueryBuilder
}

fun Rollup.populateFieldMappings(): Set<RollupFieldMapping> {
    val fieldMappings = mutableSetOf<RollupFieldMapping>()
    this.dimensions.forEach {
        fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.sourceField, it.type.type))
    }
    this.metrics.forEach { rollupMetric ->
        rollupMetric.metrics.forEach { metric ->
            fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.METRIC, rollupMetric.sourceField, metric.type.type))
        }
    }
    return fieldMappings
}

// TODO: Not a fan of this.. but I can't find a way to overwrite the aggregations on the shallow copy or original
//  so we need to instantiate a new one so we can add the rewritten aggregation builders
@Suppress("ComplexMethod")
fun SearchSourceBuilder.rewriteSearchSourceBuilder(
    jobs: Set<Rollup>,
    fieldNameMappingTypeMap: Map<String, String>,
    concreteIndexName: String
): SearchSourceBuilder {
    val ssb = SearchSourceBuilder()
    // can use first() here as all jobs in the set will have a superset of the query's terms
    this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(jobs.first().rewriteAggregationBuilder(it)) }
    if (this.explain() != null) ssb.explain(this.explain())
    if (this.ext() != null) ssb.ext(this.ext())
    ssb.fetchSource(this.fetchSource())
    this.docValueFields()?.forEach { ssb.docValueField(it.field, it.format) }
    ssb.storedFields(this.storedFields())
    if (this.from() >= 0) ssb.from(this.from())
    ssb.highlighter(this.highlighter())
    this.indexBoosts()?.forEach { ssb.indexBoost(it.index, it.boost) }
    if (this.minScore() != null) ssb.minScore(this.minScore())
    if (this.postFilter() != null) ssb.postFilter(this.postFilter())
    ssb.profile(this.profile())
    if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query(), concreteIndexName))
    this.rescores()?.forEach { ssb.addRescorer(it) }
    this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) }
    if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter())
    if (this.slice() != null) ssb.slice(this.slice())
    if (this.size() >= 0) ssb.size(this.size())
    this.sorts()?.forEach { ssb.sort(it) }
    if (this.stats() != null) ssb.stats(this.stats())
    if (this.suggest() != null) ssb.suggest(this.suggest())
    if (this.terminateAfter() >= 0) ssb.terminateAfter(this.terminateAfter())
    if (this.timeout() != null) ssb.timeout(this.timeout())
    ssb.trackScores(this.trackScores())
    this.trackTotalHitsUpTo()?.let { ssb.trackTotalHitsUpTo(it) }
    if (this.version() != null) ssb.version(this.version())
    if (this.seqNoAndPrimaryTerm() != null) ssb.seqNoAndPrimaryTerm(this.seqNoAndPrimaryTerm())
    if (this.collapse() != null) ssb.collapse(this.collapse())
    return ssb
}

fun SearchSourceBuilder.rewriteSearchSourceBuilder(
    job: Rollup,
    fieldNameMappingTypeMap: Map<String, String>,
    concreteIndexName: String
): SearchSourceBuilder {
    return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap, concreteIndexName)
}

fun Rollup.getInitialDocValues(docCount: Long): MutableMap<String, Any?> =
    mutableMapOf(
        Rollup.ROLLUP_DOC_ID_FIELD to this.id,
        Rollup.ROLLUP_DOC_COUNT_FIELD to docCount,
        Rollup.ROLLUP_DOC_SCHEMA_VERSION_FIELD to this.schemaVersion
    )

fun parseRollup(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): Rollup {
    val xcp = XContentHelper.createParser(
        xContentRegistry, LoggingDeprecationHandler.INSTANCE,
        response.sourceAsBytesRef, XContentType.JSON
    )

    return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse)
}