package org.opensearch.commons.alerting.model import org.opensearch.common.CheckedFunction import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_INPUTS import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_TRIGGERS import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION import org.opensearch.commons.alerting.util.IndexUtils.Companion.supportedClusterMetricsSettings import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.commons.alerting.util.optionalUserField import org.opensearch.commons.authuser.User import org.opensearch.core.ParseField import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException import java.time.Instant import java.util.Locale data class Monitor( override val id: String = NO_ID, override val version: Long = NO_VERSION, override val name: String, override val enabled: Boolean, override val schedule: Schedule, override val lastUpdateTime: Instant, override val enabledTime: Instant?, // TODO: Check how this behaves during rolling upgrade/multi-version cluster // Can read/write and parsing break if it's done from an old -> new version of the plugin? val monitorType: MonitorType, val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, val triggers: List, val uiMetadata: Map, val dataSources: DataSources = DataSources(), val owner: String? = "alerting" ) : ScheduledJob { override val type = MONITOR_TYPE init { // Ensure that trigger ids are unique within a monitor val triggerIds = mutableSetOf() triggers.forEach { trigger -> // NoOpTrigger is only used in "Monitor Error Alerts" as a placeholder require(trigger !is NoOpTrigger) require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." } // Verify Trigger type based on Monitor type when (monitorType) { MonitorType.QUERY_LEVEL_MONITOR -> require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.BUCKET_LEVEL_MONITOR -> require(trigger is BucketLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.CLUSTER_METRICS_MONITOR -> require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.DOC_LEVEL_MONITOR -> require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } } } if (enabled) { requireNotNull(enabledTime) } else { require(enabledTime == null) } require(inputs.size <= MONITOR_MAX_INPUTS) { "Monitors can only have $MONITOR_MAX_INPUTS search input." } require(triggers.size <= MONITOR_MAX_TRIGGERS) { "Monitors can only support up to $MONITOR_MAX_TRIGGERS triggers." } if (this.isBucketLevelMonitor()) { inputs.forEach { input -> require(input is SearchInput) { "Unsupported input [$input] for Monitor" } // TODO: Keeping query validation simple for now, only term aggregations have full support for the "group by" on the // initial release. Should either add tests for other aggregation types or add validation to prevent using them. require(input.query.aggregations() != null && !input.query.aggregations().aggregatorFactories.isEmpty()) { "At least one aggregation is required for the input [$input]" } } } } @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), version = sin.readLong(), name = sin.readString(), enabled = sin.readBoolean(), schedule = Schedule.readFrom(sin), lastUpdateTime = sin.readInstant(), enabledTime = sin.readOptionalInstant(), monitorType = sin.readEnum(MonitorType::class.java), user = if (sin.readBoolean()) { User(sin) } else null, schemaVersion = sin.readInt(), inputs = sin.readList((Input)::readFrom), triggers = sin.readList((Trigger)::readFrom), uiMetadata = suppressWarning(sin.readMap()), dataSources = if (sin.readBoolean()) { DataSources(sin) } else { DataSources() }, owner = sin.readOptionalString() ) // This enum classifies different Monitors // This is different from 'type' which denotes the Scheduled Job type enum class MonitorType(val value: String) { QUERY_LEVEL_MONITOR("query_level_monitor"), BUCKET_LEVEL_MONITOR("bucket_level_monitor"), CLUSTER_METRICS_MONITOR("cluster_metrics_monitor"), DOC_LEVEL_MONITOR("doc_level_monitor"); override fun toString(): String { return value } } /** Returns a representation of the monitor suitable for passing into painless and mustache scripts. */ fun asTemplateArg(): Map { return mapOf(_ID to id, _VERSION to version, NAME_FIELD to name, ENABLED_FIELD to enabled) } fun toXContentWithUser(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return createXContentBuilder(builder, params, false) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return createXContentBuilder(builder, params, true) } private fun createXContentBuilder(builder: XContentBuilder, params: ToXContent.Params, secure: Boolean): XContentBuilder { builder.startObject() if (params.paramAsBoolean("with_type", false)) builder.startObject(type) builder.field(TYPE_FIELD, type) .field(SCHEMA_VERSION_FIELD, schemaVersion) .field(NAME_FIELD, name) .field(MONITOR_TYPE_FIELD, monitorType) if (!secure) { builder.optionalUserField(USER_FIELD, user) } builder.field(ENABLED_FIELD, enabled) .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) .field(SCHEDULE_FIELD, schedule) .field(INPUTS_FIELD, inputs.toTypedArray()) .field(TRIGGERS_FIELD, triggers.toTypedArray()) .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata) builder.field(DATA_SOURCES_FIELD, dataSources) builder.field(OWNER_FIELD, owner) if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() } override fun fromDocument(id: String, version: Long): Monitor = copy(id = id, version = version) @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(id) out.writeLong(version) out.writeString(name) out.writeBoolean(enabled) if (schedule is CronSchedule) { out.writeEnum(Schedule.TYPE.CRON) } else { out.writeEnum(Schedule.TYPE.INTERVAL) } schedule.writeTo(out) out.writeInstant(lastUpdateTime) out.writeOptionalInstant(enabledTime) out.writeEnum(monitorType) out.writeBoolean(user != null) user?.writeTo(out) out.writeInt(schemaVersion) // Outputting type with each Input so that the generic Input.readFrom() can read it out.writeVInt(inputs.size) inputs.forEach { if (it is SearchInput) out.writeEnum(Input.Type.SEARCH_INPUT) else out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) it.writeTo(out) } // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeVInt(triggers.size) triggers.forEach { when (it) { is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) } it.writeTo(out) } out.writeMap(uiMetadata) out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field dataSources.writeTo(out) out.writeOptionalString(owner) } companion object { const val MONITOR_TYPE = "monitor" const val TYPE_FIELD = "type" const val MONITOR_TYPE_FIELD = "monitor_type" const val SCHEMA_VERSION_FIELD = "schema_version" const val NAME_FIELD = "name" const val USER_FIELD = "user" const val ENABLED_FIELD = "enabled" const val SCHEDULE_FIELD = "schedule" const val TRIGGERS_FIELD = "triggers" const val NO_ID = "" const val NO_VERSION = 1L const val INPUTS_FIELD = "inputs" const val LAST_UPDATE_TIME_FIELD = "last_update_time" const val UI_METADATA_FIELD = "ui_metadata" const val DATA_SOURCES_FIELD = "data_sources" const val ENABLED_TIME_FIELD = "enabled_time" const val OWNER_FIELD = "owner" // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all // the different subclasses and creating circular dependencies val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( ScheduledJob::class.java, ParseField(MONITOR_TYPE), CheckedFunction { parse(it) } ) @JvmStatic @JvmOverloads @Throws(IOException::class) fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Monitor { var name: String? = null // Default to QUERY_LEVEL_MONITOR to cover Monitors that existed before the addition of MonitorType var monitorType: String = MonitorType.QUERY_LEVEL_MONITOR.toString() var user: User? = null var schedule: Schedule? = null var lastUpdateTime: Instant? = null var enabledTime: Instant? = null var uiMetadata: Map = mapOf() var enabled = true var schemaVersion = NO_SCHEMA_VERSION val triggers: MutableList = mutableListOf() val inputs: MutableList = mutableListOf() var dataSources = DataSources() var owner = "alerting" XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() xcp.nextToken() when (fieldName) { SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() NAME_FIELD -> name = xcp.text() MONITOR_TYPE_FIELD -> { monitorType = xcp.text() val allowedTypes = MonitorType.values().map { it.value } if (!allowedTypes.contains(monitorType)) { throw IllegalStateException("Monitor type should be one of $allowedTypes") } } USER_FIELD -> user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) ENABLED_FIELD -> enabled = xcp.booleanValue() SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) INPUTS_FIELD -> { XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp ) while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { val input = Input.parse(xcp) if (input is ClusterMetricsInput) supportedClusterMetricsSettings?.validateApiType(input) inputs.add(input) } } TRIGGERS_FIELD -> { XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp ) while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { triggers.add(Trigger.parse(xcp)) } } ENABLED_TIME_FIELD -> enabledTime = xcp.instant() LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() UI_METADATA_FIELD -> uiMetadata = xcp.map() DATA_SOURCES_FIELD -> dataSources = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) DataSources() else DataSources.parse(xcp) OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() else -> { xcp.skipChildren() } } } if (enabled && enabledTime == null) { enabledTime = Instant.now() } else if (!enabled) { enabledTime = null } return Monitor( id, version, requireNotNull(name) { "Monitor name is null" }, enabled, requireNotNull(schedule) { "Monitor schedule is null" }, lastUpdateTime ?: Instant.now(), enabledTime, MonitorType.valueOf(monitorType.uppercase(Locale.ROOT)), user, schemaVersion, inputs.toList(), triggers.toList(), uiMetadata, dataSources, owner ) } @JvmStatic @Throws(IOException::class) fun readFrom(sin: StreamInput): Monitor? { return Monitor(sin) } @Suppress("UNCHECKED_CAST") fun suppressWarning(map: MutableMap?): MutableMap { return map as MutableMap } } }