Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Local uri input monitor support #369

Open
wants to merge 14 commits into
base: local-uri-input-monitor-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -200,7 +201,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, LocalUriInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.InjectorContextElement
Expand Down Expand Up @@ -56,8 +57,10 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettin
import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import com.amazon.opendistroforelasticsearch.alerting.util.addUserBackendRolesFilter
import com.amazon.opendistroforelasticsearch.alerting.util.executeTransportAction
import com.amazon.opendistroforelasticsearch.alerting.util.isADMonitor
import com.amazon.opendistroforelasticsearch.alerting.util.isAllowed
import com.amazon.opendistroforelasticsearch.alerting.util.toMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -336,6 +339,13 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
is LocalUriInput -> {
logger.debug("LocalUriInput path: ${input.toConstructedUri().path}")
val response = executeTransportAction(input, client)
results += withContext(Dispatchers.IO) {
response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.core.model.CronSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand All @@ -25,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeFie
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalUserField
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
Expand Down Expand Up @@ -197,7 +199,11 @@ data class Monitor(
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
val input = Input.parse(xcp)
if (input is LocalUriInput) {
SupportedApiSettings.validatePath(input.toConstructedUri().path)
}
inputs.add(input)
}
}
TRIGGERS_FIELD -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.amazon.opendistroforelasticsearch.alerting.settings

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.json.JsonXContent

/**
* A class that supports storing a unique set of API paths that can be accessed by general users.
*/
class SupportedApiSettings {
companion object {
const val CLUSTER_HEALTH_PATH = "/_cluster/health"
const val CLUSTER_STATS_PATH = "/_cluster/stats"

private const val RESOURCE_FILE = "supported_json_payloads.json"

/**
* The key in this map represents the path to call an API.
* NOTE: Paths should conform to the following pattern:
* "/_cluster/stats"
*
* The value in these maps represents a path root mapped to a list of paths to field values.
* NOTE: Keys in this map should consist of root components of the response body; e.g.,:
* "indices"
*
* Values in these maps should consist of the remaining fields in the path
* to the supported value separated by periods; e.g.,:
* "shards.total",
* "shards.index.shards.min"
*
* In this example for ClusterStats, the response will only include
* the values at the end of these two paths:
* "/_cluster/stats": {
* "indices": [
* "shards.total",
* "shards.index.shards.min"
* ]
* }
*/
private var supportedApiList = HashMap<String, Map<String, ArrayList<String>>>()

init {
val supportedJsonPayloads = SupportedApiSettings::class.java.getResource(RESOURCE_FILE)
@Suppress("UNCHECKED_CAST")
if (supportedJsonPayloads != null) supportedApiList =
XContentHelper.convertToMap(
JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap<String, Map<String, ArrayList<String>>>
Comment on lines +49 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want to make this a configurable setting for the customers? What is in the supported_json_payloads.json can be the default values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For P0, the supportedApiList is configured using the JSON; users will need to make changes to the resource file in order to update that list. As the plan was to only support ClusterHealth and ClusterStats for P0, an API to configure the list was not a high priority for P0.

}

/**
* Returns the map of all supported json payload associated with the provided path from supportedApiList.
* @param path The path for the requested API.
* @return The map of all supported json payload for the requested API.
* @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key.
*/
fun getSupportedJsonPayload(path: String): Map<String, ArrayList<String>> {
return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList: $path")
}

/**
* Calls [validatePath] to confirm whether the provided [LocalUriInput]'s path is in [supportedApiList].
* Will then return an [ActionRequest] for the API associated with that path.
* Will otherwise throw an exception.
* @param localUriInput The [LocalUriInput] to resolve.
* @throws IllegalArgumentException when the requested API is not supported.
* @return The [ActionRequest] for the API associated with the provided [LocalUriInput].
*/
fun resolveToActionRequest(localUriInput: LocalUriInput): ActionRequest {
val path = localUriInput.toConstructedUri().path
validatePath(path)
return when (path) {
CLUSTER_HEALTH_PATH -> ClusterHealthRequest()
CLUSTER_STATS_PATH -> ClusterStatsRequest()
else -> throw IllegalArgumentException("Unsupported API: $path")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to have validatePath() if we have the when validation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a really good point. Thank you for pointing that out! I removed the call to validatePath.

}

/**
* Confirms whether the provided path is in [supportedApiList].
* Throws an exception if the provided path is not on the list; otherwise performs no action.
* @param path The path to validate.
* @throws IllegalArgumentException when supportedApiList does not contain the provided path.
*/
fun validatePath(path: String) {
if (!supportedApiList.contains(path)) throw IllegalArgumentException("API path not in supportedApiList: $path")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.amazon.opendistroforelasticsearch.alerting.util

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings.Companion.resolveToActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse
import org.elasticsearch.client.Client
import org.elasticsearch.common.xcontent.support.XContentMapValues

/**
* Calls the appropriate transport action for the API requested in the [localUriInput].
* @param localUriInput The [LocalUriInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(localUriInput: LocalUriInput, client: Client): ActionResponse {
return when (val request = resolveToActionRequest(localUriInput)) {
is ClusterHealthRequest -> client.admin().cluster().health(request).get()
is ClusterStatsRequest -> client.admin().cluster().clusterStats(request).get()
else -> throw IllegalArgumentException("Unsupported API request: ${request.javaClass.name}")
}
}

/**
* Populates a [HashMap] with the values in the [ActionResponse].
* @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature.
*/
fun ActionResponse.toMap(): Map<String, Any> {
return when (this) {
is ClusterHealthResponse -> redactFieldsFromResponse(this.convertToMap(),
SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_HEALTH_PATH))
is ClusterStatsResponse -> redactFieldsFromResponse(this.convertToMap(),
SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_STATS_PATH))
else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}")
}
}

/**
* Populates a [HashMap] with only the values that support being exposed to users.
*/
@Suppress("UNCHECKED_CAST")
fun redactFieldsFromResponse(mappedActionResponse: Map<String, Any>, supportedJsonPayload: Map<String, ArrayList<String>>): Map<String, Any> {
return when {
supportedJsonPayload.isEmpty() -> mappedActionResponse
else -> {
val output = hashMapOf<String, Any>()
for ((key, value) in supportedJsonPayload) {
when (mappedActionResponse[key]) {
is Map<*, *> -> output[key] = XContentMapValues.filter(
mappedActionResponse[key] as MutableMap<String, *>?,
value.toTypedArray(), arrayOf()
)
else -> output[key] = value
}
}
output
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"/_cluster/health": {},
"/_cluster/stats": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,125 @@ class MonitorRunnerIT : AlertingRestTestCase() {
}
}

fun `test LocalUriInput monitor with ClusterHealth API`() {
// GIVEN
val path = "/_cluster/health"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
val inputResults = output.stringMap("input_results")
val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0]
val errorMessage = inputResults["error"]

assertEquals(monitor.name, output["monitor_name"])
assertTrue("Monitor results should contain cluster_name, but found: $resultsContent",
resultsContent.toString().contains("cluster_name"))
assertNull("There should not be an error message, but found: $errorMessage", errorMessage)
}

fun `test LocalUriInput monitor with ClusterStats API`() {
// GIVEN
val path = "/_cluster/stats"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
val inputResults = output.stringMap("input_results")
val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0]
val errorMessage = inputResults["error"]

assertEquals(monitor.name, output["monitor_name"])
assertTrue("Monitor results should contain cluster_name, but found: $resultsContent",
resultsContent.toString().contains("memory_size_in_bytes"))
assertNull("There should not be an error message, but found: $errorMessage", errorMessage)
}

fun `test LocalUriInput monitor with alert triggered`() {
// GIVEN
putAlertMappings()
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].number_of_pending_tasks < 1
""".trimIndent()), destinationId = createDestination().id)
val path = "/_cluster/health"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

val triggerResults = output.objectMap("trigger_results").values
for (triggerResult in triggerResults) {
assertTrue("This triggerResult should be triggered: $triggerResult",
triggerResult.objectMap("action_results").isNotEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert not saved, $output", 1, alerts.size)
verifyAlert(alerts.single(), monitor, ACTIVE)
}

fun `test LocalUriInput monitor with no alert triggered`() {
// GIVEN
putAlertMappings()
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].status.equals("red")
""".trimIndent()))
val path = "/_cluster/stats"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

val triggerResults = output.objectMap("trigger_results").values
for (triggerResult in triggerResults) {
assertTrue("This triggerResult should not be triggered: $triggerResult",
triggerResult.objectMap("action_results").isEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size)
}

// TODO: Once an API is implemented that supports adding/removing entries on the
// SupportedApiSettings::supportedApiList, create an test that simulates executing
// a preexisting LocalUriInput monitor for an API that has been removed from the supportedApiList.
// This will likely involve adding an API to the list before creating the monitor, and then removing
// the API from the list before executing the monitor.

private fun prepareTestAnomalyResult(detectorId: String, user: User) {
val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17"
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string
Expand Down Expand Up @@ -229,6 +230,19 @@ fun randomUserEmpty(): User {
return User("", listOf(), listOf(), listOf())
}

fun randomLocalUriInput(
scheme: String = if (randomInt(3) >= 2) "http" else "https",
host: String = LocalUriInput.SUPPORTED_HOST,
port: Int = LocalUriInput.SUPPORTED_PORT,
path: String,
queryParams: Map<String, String> = hashMapOf(),
url: String = "",
connectionTimeout: Int = 1 + randomInt(LocalUriInput.MAX_CONNECTION_TIMEOUT - 1),
socketTimeout: Int = 1 + randomInt(LocalUriInput.MAX_SOCKET_TIMEOUT - 1)
): LocalUriInput {
return LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout)
}

fun EmailAccount.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder).string()
Expand Down
Loading