Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Added support for LocalUriInput monitors #380

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ea60c94
cleanup: removed compile warnings, moved log msg, added missing copyr…
skkosuri-amzn Mar 24, 2021
dc0c449
Implemented LocalUriInput and extension class
AWSHurneyt Mar 30, 2021
f33f9a4
Implemented SupportedApiSettings and extensions. Implemented logic fo…
AWSHurneyt Mar 30, 2021
546b384
Implemented SupportedApiSettings and extensions. Implemented logic fo…
AWSHurneyt Mar 30, 2021
ab030dc
Merge branch 'local-uri-input-monitor-support' of github.com:AWSHurne…
AWSHurneyt Apr 2, 2021
c9a73fd
Implemented unit tests and integration tests for LocalUriInput
AWSHurneyt Apr 6, 2021
708d00b
Refactored SupportedApiSettings and SupportedApiSettingsExtensions ba…
AWSHurneyt Apr 14, 2021
3ec3d71
Implemented unit tests and integration tests for LocalUriInput
AWSHurneyt Apr 6, 2021
aa789dd
Refactored LocalUriInput and tests to confirm host is always localhos…
AWSHurneyt Apr 13, 2021
e819bfb
Refactored SupportedApiSettings and SupportedApiSettingsExtensions ba…
AWSHurneyt Apr 14, 2021
c50ab98
Merge branch 'local-uri-input-monitor-support' of github.com:AWSHurne…
AWSHurneyt Apr 14, 2021
6111184
Refactored LocalUriInput to support receiving only a path as input, a…
AWSHurneyt Apr 15, 2021
28d3fba
Implemented support for configuring SupportedApiSettings::supportedAp…
AWSHurneyt Apr 15, 2021
c05884a
Refactored resolveToActionRequest to remove redundant call to validat…
AWSHurneyt Apr 22, 2021
1d03c61
Refactored an erroneous value assignment in the SupportedApiSettingsE…
AWSHurneyt May 3, 2021
c9e42bd
Add groupByField to Monitor (#361)
qreshi Mar 29, 2021
b03d1ed
Refactor MonitorRunner (#363)
qreshi Mar 29, 2021
7e6780a
Added support for LocalUriInput monitors.
AWSHurneyt May 20, 2021
324c23f
Merge branch 'doc-level-alerting-dev' of https://github.com/opendistr…
AWSHurneyt May 22, 2021
7e1c031
Resolving ktlint errors.
AWSHurneyt May 22, 2021
c5dd627
Merge branch 'doc-level-alerting-dev' into doc-level-alerting-dev-loc…
AWSHurneyt Jun 1, 2021
b412300
Merged LocalUriInput code with Doc-level alerting backend. Refactored…
AWSHurneyt Jun 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -124,7 +125,7 @@ import java.util.function.Supplier
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [TraditionalTrigger.XCONTENT_REGISTRY],
* [AggregationTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
* [AggregationTrigger.XCONTENT_REGISTRY], [LocalUriInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {
override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
Expand Down Expand Up @@ -210,7 +211,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
TraditionalTrigger.XCONTENT_REGISTRY,
AggregationTrigger.XCONTENT_REGISTRY)
AggregationTrigger.XCONTENT_REGISTRY,
LocalUriInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.model.InputRunResults
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.util.AggregationQueryRewriter
import com.amazon.opendistroforelasticsearch.alerting.util.addUserBackendRolesFilter
import com.amazon.opendistroforelasticsearch.alerting.util.executeTransportAction
import com.amazon.opendistroforelasticsearch.alerting.util.toMap
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
Expand Down Expand Up @@ -75,6 +80,13 @@ class InputService(
aggTriggerAfterKeys += AggregationQueryRewriter.getAfterKeysFromSearchResponse(searchResponse, monitor.triggers)
results += searchResponse.convertToMap()
}
is LocalUriInput -> {
logger.debug("LocalUriInput path: ${input.toConstructedUri().path}")
val response = executeTransportAction(input, client)
results += withContext(Dispatchers.IO) {
response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,7 @@ data class Alert(
.optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime)
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
if (aggregationResultBucket == null) {
builder.nullField(AggregationResultBucket.CONFIG_NAME)
} else {
aggregationResultBucket.innerXContent(builder, params)
}
aggregationResultBucket?.innerXContent(builder)
builder.endObject()
return builder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.core.model.CronSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand All @@ -25,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeFie
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalUserField
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
Expand Down Expand Up @@ -238,7 +240,11 @@ data class Monitor(
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
val input = Input.parse(xcp)
if (input is LocalUriInput) {
SupportedApiSettings.validatePath(input.toConstructedUri().path)
}
inputs.add(input)
}
}
TRIGGERS_FIELD -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.net.InetAddress
import java.time.Instant
import java.util.Locale

Expand Down Expand Up @@ -318,6 +319,8 @@ data class Destination(

private fun validateDestinationUri(destinationMessage: BaseMessage, denyHostRanges: List<String>) {
if (destinationMessage.isHostInDenylist(denyHostRanges)) {
logger.error("Host: {} resolves to: {} which is in denylist: {}.", destinationMessage.uri.host,
InetAddress.getByName(destinationMessage.uri.host), denyHostRanges)
throw IOException("The destination address is invalid.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.amazon.opendistroforelasticsearch.alerting.settings

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.json.JsonXContent

/**
* A class that supports storing a unique set of API paths that can be accessed by general users.
*/
class SupportedApiSettings {
companion object {
const val CLUSTER_HEALTH_PATH = "/_cluster/health"
const val CLUSTER_STATS_PATH = "/_cluster/stats"

private const val RESOURCE_FILE = "supported_json_payloads.json"

/**
* The key in this map represents the path to call an API.
*
* NOTE: Paths should conform to the following pattern:
* "/_cluster/stats"
*
* The value in these maps represents a path root mapped to a list of paths to field values.
* If the value mapped to an API is an empty map, no fields will be redacted from the API response.
*
* NOTE: Keys in this map should consist of root components of the response body; e.g.,:
* "indices"
*
* Values in these maps should consist of the remaining fields in the path
* to the supported value separated by periods; e.g.,:
* "shards.total",
* "shards.index.shards.min"
*
* In this example for ClusterStats, the response will only include
* the values at the end of these two paths:
* "/_cluster/stats": {
* "indices": [
* "shards.total",
* "shards.index.shards.min"
* ]
* }
*/
private var supportedApiList = HashMap<String, Map<String, ArrayList<String>>>()

init {
val supportedJsonPayloads = SupportedApiSettings::class.java.getResource(RESOURCE_FILE)
@Suppress("UNCHECKED_CAST")
if (supportedJsonPayloads != null) supportedApiList =
XContentHelper.convertToMap(
JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap<String, Map<String, ArrayList<String>>>
}

/**
* Returns the map of all supported json payload associated with the provided path from supportedApiList.
* @param path The path for the requested API.
* @return The map of all supported json payload for the requested API.
* @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key.
*/
fun getSupportedJsonPayload(path: String): Map<String, ArrayList<String>> {
return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList: $path")
}

/**
* Will then return an [ActionRequest] for the API associated with that path.
* Will otherwise throw an exception.
* @param localUriInput The [LocalUriInput] to resolve.
* @throws IllegalArgumentException when the requested API is not supported.
* @return The [ActionRequest] for the API associated with the provided [LocalUriInput].
*/
fun resolveToActionRequest(localUriInput: LocalUriInput): ActionRequest {
return when (val path = localUriInput.toConstructedUri().path) {
CLUSTER_HEALTH_PATH -> ClusterHealthRequest()
CLUSTER_STATS_PATH -> ClusterStatsRequest()
else -> throw IllegalArgumentException("Unsupported API: $path")
}
}

/**
* Confirms whether the provided path is in [supportedApiList].
* Throws an exception if the provided path is not on the list; otherwise performs no action.
* @param path The path to validate.
* @throws IllegalArgumentException when supportedApiList does not contain the provided path.
*/
fun validatePath(path: String) {
if (!supportedApiList.contains(path)) throw IllegalArgumentException("API path not in supportedApiList: $path")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class TransportGetAlertsAction @Inject constructor(
} else {
// security is enabled and filterby is enabled.
try {
log.info("Filtering result by: ${user?.backendRoles}")
addFilter(user as User, searchSourceBuilder, "monitor_user.backend_roles.keyword")
log.info("Filtering result by: ${user.backendRoles}")
addFilter(user, searchSourceBuilder, "monitor_user.backend_roles.keyword")
search(searchSourceBuilder, actionListener)
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class TransportGetDestinationsAction @Inject constructor(
} else {
// security is enabled and filterby is enabled.
try {
log.info("Filtering result by: ${user?.backendRoles}")
addFilter(user as User, searchSourceBuilder, "destination.user.backend_roles.keyword")
log.info("Filtering result by: ${user.backendRoles}")
addFilter(user, searchSourceBuilder, "destination.user.backend_roles.keyword")
search(searchSourceBuilder, actionListener)
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class TransportSearchMonitorAction @Inject constructor(
search(searchMonitorRequest.searchRequest, actionListener)
} else {
// security is enabled and filterby is enabled.
log.info("Filtering result by: ${user?.backendRoles}")
addFilter(user as User, searchMonitorRequest.searchRequest.source(), "monitor.user.backend_roles.keyword")
log.info("Filtering result by: ${user.backendRoles}")
addFilter(user, searchMonitorRequest.searchRequest.source(), "monitor.user.backend_roles.keyword")
search(searchMonitorRequest.searchRequest, actionListener)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.rest.RestStatus
import inet.ipaddr.IPAddressString
import java.net.InetAddress
import org.elasticsearch.transport.TransportChannel.logger

/**
* RFC 5322 compliant pattern matching: https://www.ietf.org/rfc/rfc5322.txt
Expand All @@ -55,7 +53,6 @@ fun BaseMessage.isHostInDenylist(networks: List<String>): Boolean {
for (network in networks) {
val netStr = IPAddressString(network)
if (netStr.contains(ipStr)) {
logger.error("Host: {} resolves to: {} which is in denylist: {}.", uri.getHost(), InetAddress.getByName(uri.getHost()), netStr)
return true
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.amazon.opendistroforelasticsearch.alerting.util

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings.Companion.resolveToActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse
import org.elasticsearch.client.Client
import org.elasticsearch.common.xcontent.support.XContentMapValues

/**
* Calls the appropriate transport action for the API requested in the [localUriInput].
* @param localUriInput The [LocalUriInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(localUriInput: LocalUriInput, client: Client): ActionResponse {
return when (val request = resolveToActionRequest(localUriInput)) {
is ClusterHealthRequest -> client.admin().cluster().health(request).get()
is ClusterStatsRequest -> client.admin().cluster().clusterStats(request).get()
else -> throw IllegalArgumentException("Unsupported API request: ${request.javaClass.name}")
}
}

/**
* Populates a [HashMap] with the values in the [ActionResponse].
* @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature.
*/
fun ActionResponse.toMap(): Map<String, Any> {
return when (this) {
is ClusterHealthResponse -> redactFieldsFromResponse(this.convertToMap(),
SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_HEALTH_PATH))
is ClusterStatsResponse -> redactFieldsFromResponse(this.convertToMap(),
SupportedApiSettings.getSupportedJsonPayload(SupportedApiSettings.CLUSTER_STATS_PATH))
else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}")
}
}

/**
* Populates a [HashMap] with only the values that support being exposed to users.
*/
@Suppress("UNCHECKED_CAST")
fun redactFieldsFromResponse(mappedActionResponse: Map<String, Any>, supportedJsonPayload: Map<String, ArrayList<String>>): Map<String, Any> {
return when {
supportedJsonPayload.isEmpty() -> mappedActionResponse
else -> {
val output = hashMapOf<String, Any>()
for ((key, value) in supportedJsonPayload) {
when (val mappedValue = mappedActionResponse[key]) {
is Map<*, *> -> output[key] = XContentMapValues.filter(
mappedActionResponse[key] as MutableMap<String, *>?,
value.toTypedArray(), arrayOf()
)
else -> output[key] = mappedValue ?: hashMapOf<String, Any>()
}
}
output
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"/_cluster/health": {},
"/_cluster/stats": {}
}
Loading