diff --git a/curl-format.txt b/curl-format.txt new file mode 100644 index 000000000..eba37aee8 --- /dev/null +++ b/curl-format.txt @@ -0,0 +1,8 @@ + time_namelookup: %{time_namelookup}s\n + time_connect: %{time_connect}s\n + time_appconnect: %{time_appconnect}s\n + time_pretransfer: %{time_pretransfer}s\n + time_redirect: %{time_redirect}s\n + time_starttransfer: %{time_starttransfer}s\n + ----------\n + time_total: %{time_total}s\n diff --git a/keep/api/core/db.py b/keep/api/core/db.py index d17817421..f200b3e68 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -1260,6 +1260,7 @@ def get_last_alerts( lower_timestamp=None, with_incidents=False, fingerprints=None, + optimized=False, ) -> list[Alert]: with Session(engine) as session: @@ -1368,8 +1369,18 @@ def get_last_alerts( stmt = stmt.order_by(desc(Alert.timestamp)).limit(limit) # Execute the query - alerts_with_start = session.execute(stmt).all() - + if not optimized: + # ORM + alerts_with_start = session.execute(stmt).all() + else: + # Raw SQL + compiled = stmt.compile( + dialect=engine.dialect, compile_kwargs={"literal_binds": True} + ) + compiled_results = session.execute(text(str(compiled))).all() + alerts_with_start = [ + rebuild_alert_from_raw(row) for row in compiled_results + ] # Process results based on dialect alerts = [] for alert_data in alerts_with_start: @@ -1394,6 +1405,27 @@ def get_last_alerts( return alerts +def rebuild_alert_from_raw(row): + # Create Alert object from the raw tuple + alert = Alert( + id=UUID(row[0]), + tenant_id=row[1], + timestamp=row[2], + provider_type=row[3], + provider_id=row[4], + event=json.loads(row[5]) if isinstance(row[5], str) else row[5], + fingerprint=row[6], + alert_hash=row[7], + ) + + # Get startedAt (first_timestamp) + startedAt = row[8] + incident_id = row[9] + + # Return tuple of (Alert, startedAt, incident_id) to match ORM structure + return (alert, startedAt, incident_id) + + def get_alerts_by_fingerprint( tenant_id: str, fingerprint: str, limit=1, status=None ) -> List[Alert]: diff --git a/keep/api/core/tenant_configuration.py b/keep/api/core/tenant_configuration.py index b948e597f..5d00a95ab 100644 --- a/keep/api/core/tenant_configuration.py +++ b/keep/api/core/tenant_configuration.py @@ -19,9 +19,9 @@ def __init__(self): ) def _load_tenant_configurations(self): - self.logger.debug("Loading tenants configurations") + self.logger.info("Loading tenants configurations") tenants_configuration = get_tenants_configurations() - self.logger.debug( + self.logger.info( "Tenants configurations loaded", extra={ "number_of_tenants": len(tenants_configuration), diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 31f77b09e..90e9abdef 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,6 +1,6 @@ import json import logging -from typing import Optional, List +from typing import List, Optional import celpy import celpy.c7nlib @@ -12,16 +12,25 @@ from keep.api.bl.incidents_bl import IncidentBl from keep.api.core.db import ( assign_alert_to_incident, - get_incident_for_grouping_rule, create_incident_for_grouping_rule, + enrich_incidents_with_alerts, + get_incident_for_grouping_rule, +) +from keep.api.core.db import get_rules as get_rules_db +from keep.api.core.db import ( + is_all_alerts_in_status, is_all_alerts_resolved, is_first_incident_alert_resolved, is_last_incident_alert_resolved, - is_all_alerts_in_status, enrich_incidents_with_alerts, ) -from keep.api.core.db import get_rules as get_rules_db from keep.api.core.dependencies import get_pusher_client -from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus +from keep.api.models.alert import ( + AlertDto, + AlertSeverity, + AlertStatus, + IncidentDto, + IncidentStatus, +) from keep.api.models.db.alert import Incident from keep.api.models.db.rule import ResolveOn, Rule from keep.api.utils.cel_utils import preprocess_cel_expression @@ -49,7 +58,13 @@ class RulesEngine: def __init__(self, tenant_id=None): self.tenant_id = tenant_id self.logger = logging.getLogger(__name__) - self.env = celpy.Environment() + self._env = None + + @property + def env(self): + if not self._env: + self._env = celpy.Environment() + return self._env def run_rules( self, events: list[AlertDto], session: Optional[Session] = None @@ -101,8 +116,13 @@ def run_rules( rule_groups = self._extract_subrules(rule.definition_cel) - if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == len(matched_rules)): - self.logger.info("Single event is enough, so creating incident") + if rule.create_on == "any" or ( + rule.create_on == "all" + and len(rule_groups) == len(matched_rules) + ): + self.logger.info( + "Single event is enough, so creating incident" + ) incident.is_confirmed = True elif rule.create_on == "all": incident = self._process_event_for_history_based_rule( @@ -111,7 +131,9 @@ def run_rules( send_created_event = incident.is_confirmed - incident = self._resolve_incident_if_require(rule, incident, session) + incident = self._resolve_incident_if_require( + rule, incident, session + ) session.add(incident) session.commit() @@ -137,7 +159,6 @@ def run_rules( return list(incidents_dto.values()) - def _get_or_create_incident(self, rule, rule_fingerprint, session): incident = get_incident_for_grouping_rule( self.tenant_id, @@ -155,14 +176,9 @@ def _get_or_create_incident(self, rule, rule_fingerprint, session): return incident def _process_event_for_history_based_rule( - self, - incident: Incident, - rule: Rule, - session: Session + self, incident: Incident, rule: Rule, session: Session ) -> Incident: - self.logger.info( - "Multiple events required for the incident to start" - ) + self.logger.info("Multiple events required for the incident to start") enrich_incidents_with_alerts( tenant_id=self.tenant_id, @@ -178,7 +194,9 @@ def _process_event_for_history_based_rule( matched_sub_rules = set() for alert in incident.alerts: - matched_sub_rules = matched_sub_rules.union(self._check_if_rule_apply(rule, AlertDto(**alert.event))) + matched_sub_rules = matched_sub_rules.union( + self._check_if_rule_apply(rule, AlertDto(**alert.event)) + ) if all_sub_rules == matched_sub_rules: is_all_conditions_met = True break @@ -193,13 +211,14 @@ def _process_event_for_history_based_rule( return incident @staticmethod - def _resolve_incident_if_require(rule: Rule, incident: Incident, session: Session) -> Incident: + def _resolve_incident_if_require( + rule: Rule, incident: Incident, session: Session + ) -> Incident: should_resolve = False - if ( - rule.resolve_on == ResolveOn.ALL.value - and is_all_alerts_resolved(incident=incident, session=session) + if rule.resolve_on == ResolveOn.ALL.value and is_all_alerts_resolved( + incident=incident, session=session ): should_resolve = True @@ -336,18 +355,20 @@ def filter_alerts( Returns: list[AlertDto]: list of alerts that are related to the cel """ - logger = logging.getLogger(__name__) + self.logger.info(f"Filtering alerts with CEL expression {cel}") # if the cel is empty, return all the alerts if cel == "": return alerts # if the cel is empty, return all the alerts if not cel: - logger.debug("No CEL expression provided") + self.logger.info("No CEL expression provided") return alerts # preprocess the cel expression + self.logger.info(f"Preprocessing the CEL expression {cel}") cel = preprocess_cel_expression(cel) ast = self.env.compile(cel) prgm = self.env.program(ast) + self.logger.info(f"CEL expression {cel} compiled successfully") filtered_alerts = [] for i, alert in enumerate(alerts): @@ -363,33 +384,35 @@ def filter_alerts( continue # unknown elif "no such overload" in str(e): - logger.debug( + self.logger.debug( f"Type mismtach between operator and operand in the CEL expression {cel} for alert {alert.id}" ) continue elif "found no matching overload" in str(e): - logger.debug( + self.logger.debug( f"Type mismtach between operator and operand in the CEL expression {cel} for alert {alert.id}" ) continue - logger.warning( + self.logger.warning( f"Failed to evaluate the CEL expression {cel} for alert {alert.id} - {e}" ) continue except Exception: - logger.exception( + self.logger.exception( f"Failed to evaluate the CEL expression {cel} for alert {alert.id}" ) continue if r: filtered_alerts.append(alert) + self.logger.info(f"Filtered {len(filtered_alerts)} alerts") return filtered_alerts - def _send_workflow_event(self, session: Session, incident_dto: IncidentDto, action: str): + def _send_workflow_event( + self, session: Session, incident_dto: IncidentDto, action: str + ): pusher_client = get_pusher_client() incident_bl = IncidentBl(self.tenant_id, session, pusher_client) incident_bl.send_workflow_event(incident_dto, action) incident_bl.update_client_on_incident_change(incident_dto.id) - diff --git a/keep/searchengine/searchengine.py b/keep/searchengine/searchengine.py index c123fdfd5..dfe45a867 100644 --- a/keep/searchengine/searchengine.py +++ b/keep/searchengine/searchengine.py @@ -71,7 +71,9 @@ def _get_last_alerts( lower_timestamp=lower_timestamp, upper_timestamp=upper_timestamp, with_incidents=True, + optimized=True, ) + self.logger.info("Finished querying last alerts, converting to DTO") # convert the alerts to DTO alerts_dto = convert_db_alerts_to_dto_alerts(alerts) self.logger.info(