diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt index 23f2ea6f..8c770bbc 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt @@ -37,6 +37,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler @@ -200,7 +201,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R } override fun getNamedXContent(): List { - return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY) + return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, LocalUriInput.XCONTENT_REGISTRY) } override fun createComponents( diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index 872589d1..9b3812fc 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.elasticapi.InjectorContextElement @@ -56,8 +57,10 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettin import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils import com.amazon.opendistroforelasticsearch.alerting.util.addUserBackendRolesFilter +import com.amazon.opendistroforelasticsearch.alerting.util.executeTransportAction import com.amazon.opendistroforelasticsearch.alerting.util.isADMonitor import com.amazon.opendistroforelasticsearch.alerting.util.isAllowed +import com.amazon.opendistroforelasticsearch.alerting.util.toMap import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -336,6 +339,13 @@ class MonitorRunner( val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } results += searchResponse.convertToMap() } + is LocalUriInput -> { + logger.debug("LocalUriInput path: ${input.toConstructedUri().path}") + val response = executeTransportAction(input, client) + results += withContext(Dispatchers.IO) { + response.toMap() + } + } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Monitor.kt index 6aa9d711..690c612a 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Monitor.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model import com.amazon.opendistroforelasticsearch.alerting.core.model.CronSchedule import com.amazon.opendistroforelasticsearch.alerting.core.model.Input +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput @@ -25,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeFie import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalUserField import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS +import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import com.amazon.opendistroforelasticsearch.alerting.util._ID import com.amazon.opendistroforelasticsearch.alerting.util._VERSION @@ -197,7 +199,11 @@ data class Monitor( INPUTS_FIELD -> { ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_ARRAY) { - inputs.add(Input.parse(xcp)) + val input = Input.parse(xcp) + if (input is LocalUriInput) { + SupportedApiSettings.validatePath(input.toConstructedUri().path) + } + inputs.add(input) } } TRIGGERS_FIELD -> { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/settings/SupportedApiSettings.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/settings/SupportedApiSettings.kt new file mode 100644 index 00000000..a2891888 --- /dev/null +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/settings/SupportedApiSettings.kt @@ -0,0 +1,91 @@ +package com.amazon.opendistroforelasticsearch.alerting.settings + +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput +import org.elasticsearch.action.ActionRequest +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest +import org.elasticsearch.common.xcontent.XContentHelper +import org.elasticsearch.common.xcontent.json.JsonXContent + +/** + * A class that supports storing a unique set of API paths that can be accessed by general users. + */ +class SupportedApiSettings { + companion object { + const val CLUSTER_HEALTH_PATH = "/_cluster/health" + const val CLUSTER_STATS_PATH = "/_cluster/stats" + + private const val RESOURCE_FILE = "supported_json_payloads.json" + + /** + * The key in this map represents the path to call an API. + * + * NOTE: Paths should conform to the following pattern: + * "/_cluster/stats" + * + * The value in these maps represents a path root mapped to a list of paths to field values. + * If the value mapped to an API is an empty map, no fields will be redacted from the API response. + * + * NOTE: Keys in this map should consist of root components of the response body; e.g.,: + * "indices" + * + * Values in these maps should consist of the remaining fields in the path + * to the supported value separated by periods; e.g.,: + * "shards.total", + * "shards.index.shards.min" + * + * In this example for ClusterStats, the response will only include + * the values at the end of these two paths: + * "/_cluster/stats": { + * "indices": [ + * "shards.total", + * "shards.index.shards.min" + * ] + * } + */ + private var supportedApiList = HashMap>>() + + init { + val supportedJsonPayloads = SupportedApiSettings::class.java.getResource(RESOURCE_FILE) + @Suppress("UNCHECKED_CAST") + if (supportedJsonPayloads != null) supportedApiList = + XContentHelper.convertToMap( + JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap>> + } + + /** + * Returns the map of all supported json payload associated with the provided path from supportedApiList. + * @param path The path for the requested API. + * @return The map of all supported json payload for the requested API. + * @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key. + */ + fun getSupportedJsonPayload(path: String): Map> { + return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList: $path") + } + + /** + * Will then return an [ActionRequest] for the API associated with that path. + * Will otherwise throw an exception. + * @param localUriInput The [LocalUriInput] to resolve. + * @throws IllegalArgumentException when the requested API is not supported. + * @return The [ActionRequest] for the API associated with the provided [LocalUriInput]. + */ + fun resolveToActionRequest(localUriInput: LocalUriInput): ActionRequest { + return when (val path = localUriInput.toConstructedUri().path) { + CLUSTER_HEALTH_PATH -> ClusterHealthRequest() + CLUSTER_STATS_PATH -> ClusterStatsRequest() + else -> throw IllegalArgumentException("Unsupported API: $path") + } + } + + /** + * Confirms whether the provided path is in [supportedApiList]. + * Throws an exception if the provided path is not on the list; otherwise performs no action. + * @param path The path to validate. + * @throws IllegalArgumentException when supportedApiList does not contain the provided path. + */ + fun validatePath(path: String) { + if (!supportedApiList.contains(path)) throw IllegalArgumentException("API path not in supportedApiList: $path") + } + } +} diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensions.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensions.kt new file mode 100644 index 00000000..a6cb53b6 --- /dev/null +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensions.kt @@ -0,0 +1,64 @@ +package com.amazon.opendistroforelasticsearch.alerting.util + +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap +import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings +import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings.Companion.resolveToActionRequest +import org.elasticsearch.action.ActionResponse +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse +import org.elasticsearch.client.Client +import org.elasticsearch.common.xcontent.support.XContentMapValues + +/** + * Calls the appropriate transport action for the API requested in the [localUriInput]. + * @param localUriInput The [LocalUriInput] to resolve. + * @param client The [Client] used to call the respective transport action. + * @throws IllegalArgumentException When the requested API is not supported by this feature. + */ +fun executeTransportAction(localUriInput: LocalUriInput, client: Client): ActionResponse { + return when (val request = resolveToActionRequest(localUriInput)) { + is ClusterHealthRequest -> client.admin().cluster().health(request).get() + is ClusterStatsRequest -> client.admin().cluster().clusterStats(request).get() + else -> throw IllegalArgumentException("Unsupported API request: ${request.javaClass.name}") + } +} + +/** + * Populates a [HashMap] with the values in the [ActionResponse]. + * @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature. + */ +fun ActionResponse.toMap(): Map { + return when (this) { + is ClusterHealthResponse -> redactFieldsFromResponse(this.convertToMap(), + SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_HEALTH_PATH)) + is ClusterStatsResponse -> redactFieldsFromResponse(this.convertToMap(), + SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_STATS_PATH)) + else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}") + } +} + +/** + * Populates a [HashMap] with only the values that support being exposed to users. + */ +@Suppress("UNCHECKED_CAST") +fun redactFieldsFromResponse(mappedActionResponse: Map, supportedJsonPayload: Map>): Map { + return when { + supportedJsonPayload.isEmpty() -> mappedActionResponse + else -> { + val output = hashMapOf() + for ((key, value) in supportedJsonPayload) { + when (val mappedValue = mappedActionResponse[key]) { + is Map<*, *> -> output[key] = XContentMapValues.filter( + mappedActionResponse[key] as MutableMap?, + value.toTypedArray(), arrayOf() + ) + else -> output[key] = mappedValue ?: hashMapOf() + } + } + output + } + } +} diff --git a/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/settings/supported_json_payloads.json b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/settings/supported_json_payloads.json new file mode 100644 index 00000000..b2b63f15 --- /dev/null +++ b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/settings/supported_json_payloads.json @@ -0,0 +1,4 @@ +{ + "/_cluster/health": {}, + "/_cluster/stats": {} +} \ No newline at end of file diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt index 5e8c1261..a0725f11 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt @@ -807,6 +807,125 @@ class MonitorRunnerIT : AlertingRestTestCase() { } } + fun `test LocalUriInput monitor with ClusterHealth API`() { + // GIVEN + val path = "/_cluster/health" + val clusterIndex = randomInt(clusterHosts.size - 1) + val input = randomLocalUriInput( + scheme = clusterHosts[clusterIndex].schemeName, + path = path + ) + val monitor = createMonitor(randomMonitor(inputs = listOf(input))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0] + val errorMessage = inputResults["error"] + + assertEquals(monitor.name, output["monitor_name"]) + assertTrue("Monitor results should contain cluster_name, but found: $resultsContent", + resultsContent.toString().contains("cluster_name")) + assertNull("There should not be an error message, but found: $errorMessage", errorMessage) + } + + fun `test LocalUriInput monitor with ClusterStats API`() { + // GIVEN + val path = "/_cluster/stats" + val clusterIndex = randomInt(clusterHosts.size - 1) + val input = randomLocalUriInput( + scheme = clusterHosts[clusterIndex].schemeName, + path = path + ) + val monitor = createMonitor(randomMonitor(inputs = listOf(input))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0] + val errorMessage = inputResults["error"] + + assertEquals(monitor.name, output["monitor_name"]) + assertTrue("Monitor results should contain cluster_name, but found: $resultsContent", + resultsContent.toString().contains("memory_size_in_bytes")) + assertNull("There should not be an error message, but found: $errorMessage", errorMessage) + } + + fun `test LocalUriInput monitor with alert triggered`() { + // GIVEN + putAlertMappings() + val trigger = randomTrigger(condition = Script(""" + return ctx.results[0].number_of_pending_tasks < 1 + """.trimIndent()), destinationId = createDestination().id) + val path = "/_cluster/health" + val clusterIndex = randomInt(clusterHosts.size - 1) + val input = randomLocalUriInput( + scheme = clusterHosts[clusterIndex].schemeName, + path = path + ) + val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results").values + for (triggerResult in triggerResults) { + assertTrue("This triggerResult should be triggered: $triggerResult", + triggerResult.objectMap("action_results").isNotEmpty()) + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert not saved, $output", 1, alerts.size) + verifyAlert(alerts.single(), monitor, ACTIVE) + } + + fun `test LocalUriInput monitor with no alert triggered`() { + // GIVEN + putAlertMappings() + val trigger = randomTrigger(condition = Script(""" + return ctx.results[0].status.equals("red") + """.trimIndent())) + val path = "/_cluster/stats" + val clusterIndex = randomInt(clusterHosts.size - 1) + val input = randomLocalUriInput( + scheme = clusterHosts[clusterIndex].schemeName, + path = path + ) + val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results").values + for (triggerResult in triggerResults) { + assertTrue("This triggerResult should not be triggered: $triggerResult", + triggerResult.objectMap("action_results").isEmpty()) + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size) + } + + // TODO: Once an API is implemented that supports adding/removing entries on the + // SupportedApiSettings::supportedApiList, create an test that simulates executing + // a preexisting LocalUriInput monitor for an API that has been removed from the supportedApiList. + // This will likely involve adding an API to the list before creating the monitor, and then removing + // the API from the list before executing the monitor. + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt index 235e5c46..1c69e1ad 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.alerting import com.amazon.opendistroforelasticsearch.alerting.core.model.Input import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule +import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string @@ -229,6 +230,19 @@ fun randomUserEmpty(): User { return User("", listOf(), listOf(), listOf()) } +fun randomLocalUriInput( + scheme: String = if (randomInt(3) >= 2) "http" else "https", + host: String = LocalUriInput.SUPPORTED_HOST, + port: Int = LocalUriInput.SUPPORTED_PORT, + path: String, + queryParams: Map = hashMapOf(), + url: String = "", + connectionTimeout: Int = 1 + randomInt(LocalUriInput.MAX_CONNECTION_TIMEOUT - 1), + socketTimeout: Int = 1 + randomInt(LocalUriInput.MAX_SOCKET_TIMEOUT - 1) +): LocalUriInput { + return LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) +} + fun EmailAccount.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder).string() diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensionsTests.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensionsTests.kt new file mode 100644 index 00000000..a01bd7aa --- /dev/null +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/SupportedApiSettingsExtensionsTests.kt @@ -0,0 +1,105 @@ +package com.amazon.opendistroforelasticsearch.alerting.util + +import org.elasticsearch.test.ESTestCase + +class SupportedApiSettingsExtensionsTests : ESTestCase() { + private var expectedResponse = hashMapOf() + private var mappedResponse = hashMapOf() + private var supportedJsonPayload = hashMapOf>() + + fun `test redactFieldsFromResponse with non-empty supportedJsonPayload`() { + // GIVEN + mappedResponse = hashMapOf( + ("pathRoot1" to hashMapOf( + ("pathRoot1_subPath1" to 11), + ("pathRoot1_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath1" to 121), + ("pathRoot1_subPath2_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath2_subPath1" to 1221) + )) + )) + )), + ("pathRoot2" to hashMapOf( + ("pathRoot2_subPath1" to 21), + ("pathRoot2_subPath2" to setOf(221, 222, "223string")) + )), + ("pathRoot3" to hashMapOf( + ("pathRoot3_subPath1" to 31), + ("pathRoot3_subPath2" to setOf(321, 322, "323string")) + ))) + + supportedJsonPayload = hashMapOf( + ("pathRoot1" to arrayListOf( + "pathRoot1_subPath1", + "pathRoot1_subPath2.pathRoot1_subPath2_subPath2.pathRoot1_subPath2_subPath2_subPath1" + )), + ("pathRoot2" to arrayListOf( + "pathRoot2_subPath2" + )), + ("pathRoot3" to arrayListOf())) + + expectedResponse = hashMapOf( + ("pathRoot1" to hashMapOf( + ("pathRoot1_subPath1" to 11), + ("pathRoot1_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath2_subPath1" to 1221) + )) + )) + )), + ("pathRoot2" to hashMapOf( + ("pathRoot2_subPath2" to setOf(221, 222, "223string")) + )), + ("pathRoot3" to hashMapOf( + ("pathRoot3_subPath1" to 31), + ("pathRoot3_subPath2" to setOf(321, 322, "323string")) + ))) + + // WHEN + val result = redactFieldsFromResponse(mappedResponse, supportedJsonPayload) + + // THEN + assertEquals(expectedResponse, result) + } + + fun `test redactFieldsFromResponse with empty supportedJsonPayload`() { + // GIVEN + mappedResponse = hashMapOf( + ("pathRoot1" to hashMapOf( + ("pathRoot1_subPath1" to 11), + ("pathRoot1_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath1" to 121), + ("pathRoot1_subPath2_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath2_subPath1" to 1221) + )) + )) + )), + ("pathRoot2" to hashMapOf( + ("pathRoot2_subPath1" to 21), + ("pathRoot2_subPath2" to setOf(221, 222, "223string")) + )), + ("pathRoot3" to 3)) + + expectedResponse = hashMapOf( + ("pathRoot1" to hashMapOf( + ("pathRoot1_subPath1" to 11), + ("pathRoot1_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath1" to 121), + ("pathRoot1_subPath2_subPath2" to hashMapOf( + ("pathRoot1_subPath2_subPath2_subPath1" to 1221) + )) + )) + )), + ("pathRoot2" to hashMapOf( + ("pathRoot2_subPath1" to 21), + ("pathRoot2_subPath2" to setOf(221, 222, "223string")) + )), + ("pathRoot3" to 3)) + + // WHEN + val result = redactFieldsFromResponse(mappedResponse, supportedJsonPayload) + + // THEN + assertEquals(expectedResponse, result) + } +} \ No newline at end of file diff --git a/core/build.gradle b/core/build.gradle index 066d4098..d79c6c5c 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -26,6 +26,7 @@ dependencies { compile "org.elasticsearch.client:elasticsearch-rest-client:${es_version}" compile 'com.google.googlejavaformat:google-java-format:1.3' compile "com.amazon.opendistroforelasticsearch:common-utils:1.13.0.0" + compile 'commons-validator:commons-validator:1.7' testImplementation "org.elasticsearch.test:framework:${es_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInput.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInput.kt new file mode 100644 index 00000000..dda3dbee --- /dev/null +++ b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInput.kt @@ -0,0 +1,178 @@ +package com.amazon.opendistroforelasticsearch.alerting.core.model + +import org.apache.commons.validator.routines.UrlValidator +import org.apache.http.client.utils.URIBuilder +import org.elasticsearch.common.CheckedFunction +import org.elasticsearch.common.ParseField +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import java.io.IOException +import java.net.URI + +/** + * This is a data class for a URI type of input for Monitors. + */ +data class LocalUriInput( + val scheme: String, + val host: String, + val port: Int, + val path: String, + val query_params: Map, + val url: String, + val connection_timeout: Int, + val socket_timeout: Int +) : Input { + + // Verify parameters are valid during creation + init { + require(validateFields()) { + "Either the url field, or scheme + host + port + path + params can be set." + } + require(host == "" || host.toLowerCase() == SUPPORTED_HOST) { + "Only host '$SUPPORTED_HOST' is supported. Host: $host" + } + require(port == -1 || port == SUPPORTED_PORT) { + "Only port '$SUPPORTED_PORT' is supported. Port: $port" + } + require(connection_timeout in MIN_CONNECTION_TIMEOUT..MAX_CONNECTION_TIMEOUT) { + "Connection timeout: $connection_timeout is not in the range of $MIN_CONNECTION_TIMEOUT - $MAX_CONNECTION_TIMEOUT" + } + require(socket_timeout in MIN_SOCKET_TIMEOUT..MAX_SOCKET_TIMEOUT) { + "Socket timeout: $socket_timeout is not in the range of $MIN_SOCKET_TIMEOUT - $MAX_SOCKET_TIMEOUT" + } + + // Create an UrlValidator that only accepts "http" and "https" as valid scheme and allows local URLs. + val urlValidator = UrlValidator(arrayOf("http", "https"), UrlValidator.ALLOW_LOCAL_URLS) + + // Build url field by field if not provided as whole. + val constructedUrl = toConstructedUri() + + require(urlValidator.isValid(constructedUrl.toString())) { + "Invalid url: $constructedUrl" + } + require(constructedUrl.host.toLowerCase() == SUPPORTED_HOST) { + "Only host '$SUPPORTED_HOST' is supported. Host: $host" + } + require(constructedUrl.port == SUPPORTED_PORT) { + "Only port '$SUPPORTED_PORT' is supported. Port: $port" + } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(URI_FIELD) + .field(SCHEME_FIELD, scheme) + .field(HOST_FIELD, host) + .field(PORT_FIELD, port) + .field(PATH_FIELD, path) + .field(PARAMS_FIELD, this.query_params) + .field(URL_FIELD, url) + .field(CONNECTION_TIMEOUT_FIELD, connection_timeout) + .field(SOCKET_TIMEOUT_FIELD, socket_timeout) + .endObject() + .endObject() + } + + override fun name(): String { + return URI_FIELD + } + + override fun writeTo(out: StreamOutput) { + out.writeString(scheme) + out.writeString(host) + out.writeInt(port) + out.writeString(path) + out.writeMap(query_params) + out.writeString(url) + out.writeInt(connection_timeout) + out.writeInt(socket_timeout) + } + + companion object { + const val MIN_CONNECTION_TIMEOUT = 1 + const val MAX_CONNECTION_TIMEOUT = 5 + const val MIN_PORT = 1 + const val MAX_PORT = 65535 + const val MIN_SOCKET_TIMEOUT = 1 + const val MAX_SOCKET_TIMEOUT = 60 + + const val SUPPORTED_HOST = "localhost" + const val SUPPORTED_PORT = 9200 + + const val SCHEME_FIELD = "scheme" + const val HOST_FIELD = "host" + const val PORT_FIELD = "port" + const val PATH_FIELD = "path" + const val PARAMS_FIELD = "params" + const val URL_FIELD = "url" + const val CONNECTION_TIMEOUT_FIELD = "connection_timeout" + const val SOCKET_TIMEOUT_FIELD = "socket_timeout" + const val URI_FIELD = "uri" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField("uri"), CheckedFunction { parseInner(it) }) + + /** + * This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [LocalUriInput] object + */ + @JvmStatic @Throws(IOException::class) + private fun parseInner(xcp: XContentParser): LocalUriInput { + var scheme = "http" + var host = "" + var port: Int = -1 + var path = "" + var params: Map = mutableMapOf() + var url = "" + var connectionTimeout = MAX_CONNECTION_TIMEOUT + var socketTimeout = MAX_SOCKET_TIMEOUT + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + SCHEME_FIELD -> scheme = xcp.text() + HOST_FIELD -> host = xcp.text() + PORT_FIELD -> port = xcp.intValue() + PATH_FIELD -> path = xcp.text() + PARAMS_FIELD -> params = xcp.mapStrings() + URL_FIELD -> url = xcp.text() + CONNECTION_TIMEOUT_FIELD -> connectionTimeout = xcp.intValue() + SOCKET_TIMEOUT_FIELD -> socketTimeout = xcp.intValue() + } + } + return LocalUriInput(scheme, host, port, path, params, url, connectionTimeout, socketTimeout) + } + } + + /** + * Constructs the [URI] either using [url] or using [scheme]+[host]+[port]+[path]+[query_params]. + */ + fun toConstructedUri(): URI { + return if (url.isEmpty()) { + val uriBuilder = URIBuilder() + .setScheme(if (scheme.isNotEmpty()) scheme else "http") + .setHost(if (host.isNotEmpty()) host else SUPPORTED_HOST) + .setPort(if (port != -1) port else SUPPORTED_PORT) + .setPath(path) + for (e in query_params.entries) + uriBuilder.addParameter(e.key, e.value) + uriBuilder.build() + } else { + URIBuilder(url).build() + } + } + + /** + * Helper function to confirm at least [url], or [scheme]+[host]+[port]+[path]+[query_params] is defined. + * The ELSE statement currently only checks [path] as it's the only field without a default value. + */ + private fun validateFields(): Boolean { + return if (url.isNotEmpty()) host.isEmpty() && (port == -1) && path.isEmpty() && query_params.isEmpty() + else path.isNotEmpty() + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInputTests.kt b/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInputTests.kt new file mode 100644 index 00000000..0bbcba78 --- /dev/null +++ b/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/LocalUriInputTests.kt @@ -0,0 +1,254 @@ +package com.amazon.opendistroforelasticsearch.alerting.core.model + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class LocalUriInputTests { + private var scheme = "http" + private var host = "localhost" + private var port = 9200 + private var path = "/_cluster/health" + private var queryParams = hashMapOf() + private var url = "" + private var connectionTimeout = 5 + private var socketTimeout = 5 + + @Test + fun `test valid LocalUriInput creation using HTTP URI component fields`() { + // GIVEN + WHEN + val localUriInput = LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) + + // THEN + assertEquals(scheme, localUriInput.scheme) + assertEquals(host, localUriInput.host) + assertEquals(port, localUriInput.port) + assertEquals(path, localUriInput.path) + assertEquals(queryParams, localUriInput.query_params) + assertEquals(url, localUriInput.url) + assertEquals(connectionTimeout, localUriInput.connection_timeout) + assertEquals(socketTimeout, localUriInput.socket_timeout) + } + + @Test + fun `test valid LocalUriInput creation using HTTP url field`() { + // GIVEN + scheme = "" + host = "" + port = -1 + path = "" + url = "http://localhost:9200/_cluster/health" + + // WHEN + val localUriInput = LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) + + // THEN + assertEquals(url, localUriInput.url) + } + + @Test + fun `test valid LocalUriInput creation using HTTPS URI component fields`() { + // GIVEN + scheme = "https" + + // WHEN + val localUriInput = LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) + + // THEN + assertEquals(scheme, localUriInput.scheme) + assertEquals(host, localUriInput.host) + assertEquals(port, localUriInput.port) + assertEquals(path, localUriInput.path) + assertEquals(queryParams, localUriInput.query_params) + assertEquals(url, localUriInput.url) + assertEquals(connectionTimeout, localUriInput.connection_timeout) + assertEquals(socketTimeout, localUriInput.socket_timeout) + } + + @Test + fun `test valid LocalUriInput creation using HTTPS url field`() { + // GIVEN + scheme = "" + host = "" + port = -1 + + // WHEN + val localUriInput = LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) + + // THEN + assertEquals(url, localUriInput.url) + } + + @Test + fun `test valid LocalUriInput creation with path, but empty scheme, host, and port fields`() { + // GIVEN + scheme = "" + host = "" + port = -1 + + // WHEN + val localUriInput = LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) + + // THEN + assertEquals(path, localUriInput.path) + assertEquals(localUriInput.toConstructedUri().toString(), "http://localhost:9200/_cluster/health") + } + + @Test + fun `test invalid scheme`() { + // GIVEN + scheme = "invalidScheme" + + // WHEN + THEN + assertFailsWith("Invalid url: $scheme://$host:$port$path") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid host`() { + // GIVEN + host = "loco//host" + + // WHEN + THEN + assertFailsWith("Invalid url: $scheme://$host:$port$path") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid host is not localhost`() { + // GIVEN + host = "127.0.0.1" + + // WHEN + THEN + assertFailsWith( + "Only host '${LocalUriInput.SUPPORTED_HOST}' is supported. Host: $host") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid path`() { + // GIVEN + path = "///" + + // WHEN + THEN + assertFailsWith("Invalid url: $scheme://$host:$port$path") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid port`() { + // GIVEN + port = LocalUriInput.SUPPORTED_PORT + 1 + + // WHEN + THEN + assertFailsWith( + "Only port '${LocalUriInput.SUPPORTED_PORT}' is supported. Port: $port") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid connection timeout that's too low`() { + // GIVEN + connectionTimeout = LocalUriInput.MIN_CONNECTION_TIMEOUT - 1 + + // WHEN + THEN + assertFailsWith( + "Connection timeout: $connectionTimeout is not in the range of ${LocalUriInput.MIN_CONNECTION_TIMEOUT} - ${LocalUriInput.MIN_CONNECTION_TIMEOUT}") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid connection timeout that's too high`() { + // GIVEN + connectionTimeout = LocalUriInput.MAX_CONNECTION_TIMEOUT + 1 + + // WHEN + THEN + assertFailsWith( + "Connection timeout: $connectionTimeout is not in the range of ${LocalUriInput.MIN_CONNECTION_TIMEOUT} - ${LocalUriInput.MIN_CONNECTION_TIMEOUT}") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid socket timeout that's too low`() { + // GIVEN + socketTimeout = LocalUriInput.MIN_SOCKET_TIMEOUT - 1 + + // WHEN + THEN + assertFailsWith( + "Socket timeout: $socketTimeout is not in the range of ${LocalUriInput.MIN_SOCKET_TIMEOUT} - ${LocalUriInput.MAX_SOCKET_TIMEOUT}") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid socket timeout that's too high`() { + // GIVEN + socketTimeout = LocalUriInput.MAX_SOCKET_TIMEOUT + 1 + + // WHEN + THEN + assertFailsWith( + "Socket timeout: $socketTimeout is not in the range of ${LocalUriInput.MIN_SOCKET_TIMEOUT} - ${LocalUriInput.MAX_SOCKET_TIMEOUT}") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid url`() { + // GIVEN + url = "///" + + // WHEN + THEN + assertFailsWith("Invalid url: $scheme://$host:$port$path") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test setting other fields in addition to url field`() { + // GIVEN + url = "http://localhost:9200/_cluster/health" + + // WHEN + THEN + assertFailsWith("Either the url field, or scheme + host + port + path + params can be set.") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test LocalUriInput creation when all inputs are empty`() { + // GIVEN + scheme = "" + host = "" + port = -1 + path = "" + url = "" + + // WHEN + THEN + assertFailsWith("Either the url field, or scheme + host + port + path + params can be set.") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid host in url field`() { + // GIVEN + scheme = "" + host = "" + port = -1 + path = "" + url = "http://127.0.0.1:9200/_cluster/health" + + // WHEN + THEN + assertFailsWith("Only host '${LocalUriInput.SUPPORTED_HOST}' is supported. Host: $host") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } + + @Test + fun `test invalid port in url field`() { + // GIVEN + scheme = "" + host = "" + port = -1 + path = "" + url = "http://localhost:${LocalUriInput.SUPPORTED_PORT + 1}/_cluster/health" + + // WHEN + THEN + assertFailsWith("Only port '${LocalUriInput.SUPPORTED_PORT}' is supported. Port: $port") { + LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout) } + } +} \ No newline at end of file