Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAIN-1863] - avg utilization of mem/cpu #1502

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
JOBS_TABLE = "Jobs"
HELM_RELEASES_TABLE = "HelmReleases"
NAMESPACES_TABLE = "Namespaces"
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count"
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count_v2"
SCANS_RESULT_TABLE = "ScansResults"
SCANS_META_TABLE = "ScansMeta"
RESOURCE_EVENTS = "ResourceEvents"
Expand Down Expand Up @@ -556,12 +556,14 @@ def publish_namespaces(self, namespaces: List[NamespaceInfo]):
logging.error(f"Failed to persist namespaces {namespaces} error: {e}")
raise

def publish_cluster_nodes(self, node_count: int, pod_count: int):
def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: Optional[float] = None, avg_mem: Optional[float] = None):
data = {
"_account_id": self.account_id,
"_cluster_id": self.cluster,
"_node_count": node_count,
"_pod_count": pod_count,
"_cpu_utilization": avg_cpu,
Avi-Robusta marked this conversation as resolved.
Show resolved Hide resolved
"_memory_utilization": avg_mem,
"_pod_count": pod_count
}
try:
self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
import os
import threading
import time
from typing import Optional

from prometrix import PrometheusNotFound, VictoriaMetricsNotFound, PrometheusFlagsConnectionError
from prometrix import PrometheusFlagsConnectionError, PrometheusNotFound, VictoriaMetricsNotFound
from pydantic import BaseModel

from robusta.core.exceptions import (
AlertsManagerNotFound,
NoAlertManagerUrlFound,
NoPrometheusUrlFound,
)
from robusta.core.exceptions import AlertsManagerNotFound, NoAlertManagerUrlFound, NoPrometheusUrlFound
from robusta.core.model.base_params import PrometheusParams
from robusta.core.model.env_vars import PROMETHEUS_ERROR_LOG_PERIOD_SEC
from robusta.core.playbooks.prometheus_enrichment_utils import run_prometheus_query
from robusta.integrations.prometheus.utils import get_prometheus_connect, get_prometheus_flags
from robusta.utils.silence_utils import AlertManagerParams, get_alertmanager_silences_connection

Expand All @@ -22,7 +21,7 @@ class PrometheusHealthStatus(BaseModel):
alertmanager: bool = True


class PrometheusHealthChecker:
class PrometheusDiscoveryUtils:
def __init__(self, discovery_period_sec: int, global_config: dict):
self.status: PrometheusHealthStatus = PrometheusHealthStatus()
self.__discovery_period_sec = discovery_period_sec
Expand All @@ -39,6 +38,30 @@ def __init__(self, discovery_period_sec: int, global_config: dict):
def get_status(self) -> PrometheusHealthStatus:
return self.status

def get_cluster_avg_cpu(self) -> Optional[float]:
cpu_query = os.getenv("OVERRIDE_CLUSTER_CPU_AVG_QUERY",
f'100 * sum(rate(node_cpu_seconds_total{{mode!="idle"}}[1h])) / sum(machine_cpu_cores{{}})')
return self._get_query_prometheus_value(query=cpu_query)

def get_cluster_avg_memory(self) -> Optional[float]:
memory_query = os.getenv("OVERRIDE_CLUSTER_MEM_AVG_QUERY",
f'100 * (1 - sum(avg_over_time(node_memory_MemAvailable_bytes{{}}[1h])) / sum(machine_memory_bytes{{}}))')
return self._get_query_prometheus_value(query=memory_query)

def _get_query_prometheus_value(self, query: str) -> Optional[float]:
try:
global_config = self.__global_config
prometheus_params = PrometheusParams(**global_config)
query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query)
if query_result.result_type == "error" or query_result.vector_result is None:
logging.error(f"PrometheusDiscoveryUtils failed to get prometheus results.")
return
value = query_result.vector_result[0].value.value
return float('%.2f' % float(value))
except:
logging.exception(f"PrometheusDiscoveryUtils failed to get prometheus results.")
return

def __run_checks(self):
while True:
try:
Expand Down
11 changes: 6 additions & 5 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from robusta.core.reporting.base import Finding
from robusta.core.reporting.consts import ScanState, ScanType
from robusta.core.sinks.robusta.discovery_metrics import DiscoveryMetrics
from robusta.core.sinks.robusta.prometheus_health_checker import PrometheusHealthChecker
from robusta.core.sinks.robusta.prometheus_discovery_utils import PrometheusDiscoveryUtils
from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaToken
from robusta.core.sinks.robusta.rrm.rrm import RRM
from robusta.core.sinks.sink_base import SinkBase
Expand All @@ -51,7 +51,6 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.token = sink_config.robusta_sink.token
self.ttl_hours = sink_config.robusta_sink.ttl_hours
self.persist_events = sink_config.robusta_sink.persist_events

robusta_token = RobustaToken(**json.loads(base64.b64decode(self.token)))
if self.account_id != robusta_token.account_id:
logging.error(
Expand Down Expand Up @@ -79,7 +78,7 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.__discovery_period_sec = DISCOVERY_PERIOD_SEC

global_config = self.get_global_config()
self.__prometheus_health_checker = PrometheusHealthChecker(
self.__prometheus_discovery_util = PrometheusDiscoveryUtils(
discovery_period_sec=self.__discovery_period_sec, global_config=global_config
)
self.__rrm_checker = RRM(dal=self.dal, cluster=self.cluster_name, account_id=self.account_id)
Expand Down Expand Up @@ -495,7 +494,7 @@ def __publish_new_helm_releases(self, active_helm_releases: List[HelmRelease]):

def __update_cluster_status(self):
self.last_send_time = time.time()
prometheus_health_checker_status = self.__prometheus_health_checker.get_status()
prometheus_health_checker_status = self.__prometheus_discovery_util.get_status()
activity_stats = ActivityStats(
relayConnection=False,
alertManagerConnection=prometheus_health_checker_status.alertmanager,
Expand Down Expand Up @@ -526,7 +525,9 @@ def __update_cluster_status(self):
)

self.dal.publish_cluster_status(cluster_status)
self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count)
avg_cpu = self.__prometheus_discovery_util.get_cluster_avg_cpu()
Avi-Robusta marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the db function work with None as cpu_utilization or memory_utilization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

avg_mem = self.__prometheus_discovery_util.get_cluster_avg_memory()
self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count, avg_cpu, avg_mem)
except Exception:
logging.exception(
f"Failed to run periodic update cluster status for {self.sink_name}",
Expand Down
Loading