From 90ac0918aae1cc3c748a29f329317e7a64b97830 Mon Sep 17 00:00:00 2001 From: "avi@robusta.dev" Date: Wed, 17 Jul 2024 17:25:14 +0300 Subject: [PATCH 1/4] added avg_cpu and memory for the past hour for nodes --- .../core/sinks/robusta/dal/supabase_dal.py | 4 ++- ...ecker.py => prometheus_discovery_utils.py} | 34 +++++++++++++++---- .../core/sinks/robusta/robusta_sink.py | 11 +++--- 3 files changed, 36 insertions(+), 13 deletions(-) rename src/robusta/core/sinks/robusta/{prometheus_health_checker.py => prometheus_discovery_utils.py} (73%) diff --git a/src/robusta/core/sinks/robusta/dal/supabase_dal.py b/src/robusta/core/sinks/robusta/dal/supabase_dal.py index 654e63504..e10ecdc26 100644 --- a/src/robusta/core/sinks/robusta/dal/supabase_dal.py +++ b/src/robusta/core/sinks/robusta/dal/supabase_dal.py @@ -556,12 +556,14 @@ def publish_namespaces(self, namespaces: List[NamespaceInfo]): logging.error(f"Failed to persist namespaces {namespaces} error: {e}") raise - def publish_cluster_nodes(self, node_count: int, pod_count: int): + def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: float, avg_mem: float): data = { "_account_id": self.account_id, "_cluster_id": self.cluster, "_node_count": node_count, "_pod_count": pod_count, + "_avg_cpu": avg_cpu, + "_avg_mem": avg_mem } try: self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute() diff --git a/src/robusta/core/sinks/robusta/prometheus_health_checker.py b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py similarity index 73% rename from src/robusta/core/sinks/robusta/prometheus_health_checker.py rename to src/robusta/core/sinks/robusta/prometheus_discovery_utils.py index d11102932..c474811fc 100644 --- a/src/robusta/core/sinks/robusta/prometheus_health_checker.py +++ b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py @@ -1,17 +1,15 @@ import logging import threading import time +from typing import Optional -from prometrix import PrometheusNotFound, VictoriaMetricsNotFound, PrometheusFlagsConnectionError +from prometrix import PrometheusFlagsConnectionError, PrometheusNotFound, VictoriaMetricsNotFound from pydantic import BaseModel -from robusta.core.exceptions import ( - AlertsManagerNotFound, - NoAlertManagerUrlFound, - NoPrometheusUrlFound, -) +from robusta.core.exceptions import AlertsManagerNotFound, NoAlertManagerUrlFound, NoPrometheusUrlFound from robusta.core.model.base_params import PrometheusParams from robusta.core.model.env_vars import PROMETHEUS_ERROR_LOG_PERIOD_SEC +from robusta.core.playbooks.prometheus_enrichment_utils import run_prometheus_query from robusta.integrations.prometheus.utils import get_prometheus_connect, get_prometheus_flags from robusta.utils.silence_utils import AlertManagerParams, get_alertmanager_silences_connection @@ -22,7 +20,7 @@ class PrometheusHealthStatus(BaseModel): alertmanager: bool = True -class PrometheusHealthChecker: +class PrometheusDiscoveryUtils: def __init__(self, discovery_period_sec: int, global_config: dict): self.status: PrometheusHealthStatus = PrometheusHealthStatus() self.__discovery_period_sec = discovery_period_sec @@ -39,6 +37,28 @@ def __init__(self, discovery_period_sec: int, global_config: dict): def get_status(self) -> PrometheusHealthStatus: return self.status + def get_cluster_avg_cpu(self, hours_back=1) -> Optional[float]: + cpu_query = f'avg_over_time(sum(irate(container_cpu_usage_seconds_total{{}}[5m]))[{hours_back}h:])' + return self._get_query_prometheus_value(query=cpu_query) + + def get_cluster_avg_memory(self, hours_back=1) -> Optional[float]: + memory_query = f'avg_over_time(sum(container_memory_usage_bytes{{}})[{hours_back}h:])' + return self._get_query_prometheus_value(query=memory_query) + + def _get_query_prometheus_value(self, query: str) -> Optional[float]: + global_config = self.__global_config + prometheus_params = PrometheusParams(**global_config) + query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query) + try: + if query_result.result_type == "error" or query_result.vector_result is None: + logging.error(f"PrometheusDiscoveryUtils failed to get prometheus results.") + return + value = query_result.vector_result[0].value.value + return float('%.2f' % float(value)) + except: + logging.exception(f"PrometheusDiscoveryUtils failed to get prometheus results.") + return + def __run_checks(self): while True: try: diff --git a/src/robusta/core/sinks/robusta/robusta_sink.py b/src/robusta/core/sinks/robusta/robusta_sink.py index 8a9a9dcbf..1d4ff4f1f 100644 --- a/src/robusta/core/sinks/robusta/robusta_sink.py +++ b/src/robusta/core/sinks/robusta/robusta_sink.py @@ -32,7 +32,7 @@ from robusta.core.reporting.base import Finding from robusta.core.reporting.consts import ScanState, ScanType from robusta.core.sinks.robusta.discovery_metrics import DiscoveryMetrics -from robusta.core.sinks.robusta.prometheus_health_checker import PrometheusHealthChecker +from robusta.core.sinks.robusta.prometheus_discovery_utils import PrometheusDiscoveryUtils from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaToken from robusta.core.sinks.robusta.rrm.rrm import RRM from robusta.core.sinks.sink_base import SinkBase @@ -51,7 +51,6 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry): self.token = sink_config.robusta_sink.token self.ttl_hours = sink_config.robusta_sink.ttl_hours self.persist_events = sink_config.robusta_sink.persist_events - robusta_token = RobustaToken(**json.loads(base64.b64decode(self.token))) if self.account_id != robusta_token.account_id: logging.error( @@ -79,7 +78,7 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry): self.__discovery_period_sec = DISCOVERY_PERIOD_SEC global_config = self.get_global_config() - self.__prometheus_health_checker = PrometheusHealthChecker( + self.__prometheus_discovery_util = PrometheusDiscoveryUtils( discovery_period_sec=self.__discovery_period_sec, global_config=global_config ) self.__rrm_checker = RRM(dal=self.dal, cluster=self.cluster_name, account_id=self.account_id) @@ -495,7 +494,7 @@ def __publish_new_helm_releases(self, active_helm_releases: List[HelmRelease]): def __update_cluster_status(self): self.last_send_time = time.time() - prometheus_health_checker_status = self.__prometheus_health_checker.get_status() + prometheus_health_checker_status = self.__prometheus_discovery_util.get_status() activity_stats = ActivityStats( relayConnection=False, alertManagerConnection=prometheus_health_checker_status.alertmanager, @@ -526,7 +525,9 @@ def __update_cluster_status(self): ) self.dal.publish_cluster_status(cluster_status) - self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count) + avg_cpu = self.__prometheus_discovery_util.get_cluster_avg_cpu() + avg_mem = self.__prometheus_discovery_util.get_cluster_avg_memory() + self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count, avg_cpu, avg_mem) except Exception: logging.exception( f"Failed to run periodic update cluster status for {self.sink_name}", From 98d26b57f8bd40a6f79323bf8cf91df78214ebbe Mon Sep 17 00:00:00 2001 From: "avi@robusta.dev" Date: Thu, 18 Jul 2024 10:22:44 +0300 Subject: [PATCH 2/4] db changes --- src/robusta/core/sinks/robusta/dal/supabase_dal.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/robusta/core/sinks/robusta/dal/supabase_dal.py b/src/robusta/core/sinks/robusta/dal/supabase_dal.py index e10ecdc26..e6ab65e66 100644 --- a/src/robusta/core/sinks/robusta/dal/supabase_dal.py +++ b/src/robusta/core/sinks/robusta/dal/supabase_dal.py @@ -41,7 +41,7 @@ JOBS_TABLE = "Jobs" HELM_RELEASES_TABLE = "HelmReleases" NAMESPACES_TABLE = "Namespaces" -UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count" +UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count_v2" SCANS_RESULT_TABLE = "ScansResults" SCANS_META_TABLE = "ScansMeta" RESOURCE_EVENTS = "ResourceEvents" @@ -561,9 +561,9 @@ def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: float, "_account_id": self.account_id, "_cluster_id": self.cluster, "_node_count": node_count, - "_pod_count": pod_count, - "_avg_cpu": avg_cpu, - "_avg_mem": avg_mem + "_cpu_utilization": avg_cpu, + "_memory_utilization": int(avg_mem), + "_pod_count": pod_count } try: self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute() From 360acce909725bac88d3723ead5feeb18d3e1e1f Mon Sep 17 00:00:00 2001 From: "avi@robusta.dev" Date: Thu, 18 Jul 2024 16:11:03 +0300 Subject: [PATCH 3/4] made queries overridable, some accounts might need to configure this --- .../core/sinks/robusta/prometheus_discovery_utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py index c474811fc..a49722a2e 100644 --- a/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py +++ b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py @@ -1,4 +1,5 @@ import logging +import os import threading import time from typing import Optional @@ -37,12 +38,14 @@ def __init__(self, discovery_period_sec: int, global_config: dict): def get_status(self) -> PrometheusHealthStatus: return self.status - def get_cluster_avg_cpu(self, hours_back=1) -> Optional[float]: - cpu_query = f'avg_over_time(sum(irate(container_cpu_usage_seconds_total{{}}[5m]))[{hours_back}h:])' + def get_cluster_avg_cpu(self) -> Optional[float]: + cpu_query = os.getenv("OVERRIDE_CLUSTER_CPU_AVG_QUERY", + f'avg_over_time(sum(irate(container_cpu_usage_seconds_total{{}}[5m]))[1h:])') return self._get_query_prometheus_value(query=cpu_query) - def get_cluster_avg_memory(self, hours_back=1) -> Optional[float]: - memory_query = f'avg_over_time(sum(container_memory_usage_bytes{{}})[{hours_back}h:])' + def get_cluster_avg_memory(self) -> Optional[float]: + memory_query = os.getenv("OVERRIDE_CLUSTER_MEM_AVG_QUERY", + f'avg_over_time(sum(container_memory_usage_bytes{{}})[1h:])') return self._get_query_prometheus_value(query=memory_query) def _get_query_prometheus_value(self, query: str) -> Optional[float]: From e302a5ce554b0afca0e9a7f5a6ddb4db7967fc3f Mon Sep 17 00:00:00 2001 From: "avi@robusta.dev" Date: Tue, 23 Jul 2024 13:29:09 +0300 Subject: [PATCH 4/4] updated queries --- src/robusta/core/sinks/robusta/dal/supabase_dal.py | 4 ++-- .../core/sinks/robusta/prometheus_discovery_utils.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/robusta/core/sinks/robusta/dal/supabase_dal.py b/src/robusta/core/sinks/robusta/dal/supabase_dal.py index e6ab65e66..0d2be61d3 100644 --- a/src/robusta/core/sinks/robusta/dal/supabase_dal.py +++ b/src/robusta/core/sinks/robusta/dal/supabase_dal.py @@ -556,13 +556,13 @@ def publish_namespaces(self, namespaces: List[NamespaceInfo]): logging.error(f"Failed to persist namespaces {namespaces} error: {e}") raise - def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: float, avg_mem: float): + def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: Optional[float] = None, avg_mem: Optional[float] = None): data = { "_account_id": self.account_id, "_cluster_id": self.cluster, "_node_count": node_count, "_cpu_utilization": avg_cpu, - "_memory_utilization": int(avg_mem), + "_memory_utilization": avg_mem, "_pod_count": pod_count } try: diff --git a/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py index a49722a2e..1d3c9930d 100644 --- a/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py +++ b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py @@ -40,19 +40,19 @@ def get_status(self) -> PrometheusHealthStatus: def get_cluster_avg_cpu(self) -> Optional[float]: cpu_query = os.getenv("OVERRIDE_CLUSTER_CPU_AVG_QUERY", - f'avg_over_time(sum(irate(container_cpu_usage_seconds_total{{}}[5m]))[1h:])') + f'100 * sum(rate(node_cpu_seconds_total{{mode!="idle"}}[1h])) / sum(machine_cpu_cores{{}})') return self._get_query_prometheus_value(query=cpu_query) def get_cluster_avg_memory(self) -> Optional[float]: memory_query = os.getenv("OVERRIDE_CLUSTER_MEM_AVG_QUERY", - f'avg_over_time(sum(container_memory_usage_bytes{{}})[1h:])') + f'100 * (1 - sum(avg_over_time(node_memory_MemAvailable_bytes{{}}[1h])) / sum(machine_memory_bytes{{}}))') return self._get_query_prometheus_value(query=memory_query) def _get_query_prometheus_value(self, query: str) -> Optional[float]: - global_config = self.__global_config - prometheus_params = PrometheusParams(**global_config) - query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query) try: + global_config = self.__global_config + prometheus_params = PrometheusParams(**global_config) + query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query) if query_result.result_type == "error" or query_result.vector_result is None: logging.error(f"PrometheusDiscoveryUtils failed to get prometheus results.") return