Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimization #2960

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions curl-format.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
time_namelookup: %{time_namelookup}s\n
time_connect: %{time_connect}s\n
time_appconnect: %{time_appconnect}s\n
time_pretransfer: %{time_pretransfer}s\n
time_redirect: %{time_redirect}s\n
time_starttransfer: %{time_starttransfer}s\n
----------\n
time_total: %{time_total}s\n
36 changes: 34 additions & 2 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@
lower_timestamp=None,
with_incidents=False,
fingerprints=None,
optimized=False,
) -> list[Alert]:

with Session(engine) as session:
Expand Down Expand Up @@ -1368,8 +1369,18 @@
stmt = stmt.order_by(desc(Alert.timestamp)).limit(limit)

# Execute the query
alerts_with_start = session.execute(stmt).all()

if not optimized:

Check warning on line 1372 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1372

Added line #L1372 was not covered by tests
# ORM
alerts_with_start = session.execute(stmt).all()

Check warning on line 1374 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1374

Added line #L1374 was not covered by tests
else:
# Raw SQL
compiled = stmt.compile(

Check warning on line 1377 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1377

Added line #L1377 was not covered by tests
dialect=engine.dialect, compile_kwargs={"literal_binds": True}
)
compiled_results = session.execute(text(str(compiled))).all()
alerts_with_start = [

Check warning on line 1381 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1380-L1381

Added lines #L1380 - L1381 were not covered by tests
rebuild_alert_from_raw(row) for row in compiled_results
]
# Process results based on dialect
alerts = []
for alert_data in alerts_with_start:
Expand All @@ -1394,6 +1405,27 @@
return alerts


def rebuild_alert_from_raw(row):
# Create Alert object from the raw tuple
alert = Alert(

Check warning on line 1410 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1410

Added line #L1410 was not covered by tests
id=UUID(row[0]),
tenant_id=row[1],
timestamp=row[2],
provider_type=row[3],
provider_id=row[4],
event=json.loads(row[5]) if isinstance(row[5], str) else row[5],
fingerprint=row[6],
alert_hash=row[7],
)

# Get startedAt (first_timestamp)
startedAt = row[8]
incident_id = row[9]

Check warning on line 1423 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1422-L1423

Added lines #L1422 - L1423 were not covered by tests

# Return tuple of (Alert, startedAt, incident_id) to match ORM structure
return (alert, startedAt, incident_id)

Check warning on line 1426 in keep/api/core/db.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/db.py#L1426

Added line #L1426 was not covered by tests


def get_alerts_by_fingerprint(
tenant_id: str, fingerprint: str, limit=1, status=None
) -> List[Alert]:
Expand Down
4 changes: 2 additions & 2 deletions keep/api/core/tenant_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
)

def _load_tenant_configurations(self):
self.logger.debug("Loading tenants configurations")
self.logger.info("Loading tenants configurations")

Check warning on line 22 in keep/api/core/tenant_configuration.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/tenant_configuration.py#L22

Added line #L22 was not covered by tests
tenants_configuration = get_tenants_configurations()
self.logger.debug(
self.logger.info(

Check warning on line 24 in keep/api/core/tenant_configuration.py

View check run for this annotation

Codecov / codecov/patch

keep/api/core/tenant_configuration.py#L24

Added line #L24 was not covered by tests
"Tenants configurations loaded",
extra={
"number_of_tenants": len(tenants_configuration),
Expand Down
83 changes: 53 additions & 30 deletions keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Optional, List
from typing import List, Optional

import celpy
import celpy.c7nlib
Expand All @@ -12,16 +12,25 @@
from keep.api.bl.incidents_bl import IncidentBl
from keep.api.core.db import (
assign_alert_to_incident,
get_incident_for_grouping_rule,
create_incident_for_grouping_rule,
enrich_incidents_with_alerts,
get_incident_for_grouping_rule,
)
from keep.api.core.db import get_rules as get_rules_db
from keep.api.core.db import (
is_all_alerts_in_status,
is_all_alerts_resolved,
is_first_incident_alert_resolved,
is_last_incident_alert_resolved,
is_all_alerts_in_status, enrich_incidents_with_alerts,
)
from keep.api.core.db import get_rules as get_rules_db
from keep.api.core.dependencies import get_pusher_client
from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus
from keep.api.models.alert import (
AlertDto,
AlertSeverity,
AlertStatus,
IncidentDto,
IncidentStatus,
)
from keep.api.models.db.alert import Incident
from keep.api.models.db.rule import ResolveOn, Rule
from keep.api.utils.cel_utils import preprocess_cel_expression
Expand Down Expand Up @@ -49,7 +58,13 @@
def __init__(self, tenant_id=None):
self.tenant_id = tenant_id
self.logger = logging.getLogger(__name__)
self.env = celpy.Environment()
self._env = None

Check warning on line 61 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L61

Added line #L61 was not covered by tests

@property
def env(self):
if not self._env:
self._env = celpy.Environment()
return self._env

Check warning on line 67 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L65-L67

Added lines #L65 - L67 were not covered by tests

def run_rules(
self, events: list[AlertDto], session: Optional[Session] = None
Expand Down Expand Up @@ -101,8 +116,13 @@

rule_groups = self._extract_subrules(rule.definition_cel)

if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == len(matched_rules)):
self.logger.info("Single event is enough, so creating incident")
if rule.create_on == "any" or (

Check warning on line 119 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L119

Added line #L119 was not covered by tests
rule.create_on == "all"
and len(rule_groups) == len(matched_rules)
):
self.logger.info(

Check warning on line 123 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L123

Added line #L123 was not covered by tests
"Single event is enough, so creating incident"
)
incident.is_confirmed = True
elif rule.create_on == "all":
incident = self._process_event_for_history_based_rule(
Expand All @@ -111,7 +131,9 @@

send_created_event = incident.is_confirmed

incident = self._resolve_incident_if_require(rule, incident, session)
incident = self._resolve_incident_if_require(

Check warning on line 134 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L134

Added line #L134 was not covered by tests
rule, incident, session
)
session.add(incident)
session.commit()

Expand All @@ -137,7 +159,6 @@

return list(incidents_dto.values())


def _get_or_create_incident(self, rule, rule_fingerprint, session):
incident = get_incident_for_grouping_rule(
self.tenant_id,
Expand All @@ -155,14 +176,9 @@
return incident

def _process_event_for_history_based_rule(
self,
incident: Incident,
rule: Rule,
session: Session
self, incident: Incident, rule: Rule, session: Session
) -> Incident:
self.logger.info(
"Multiple events required for the incident to start"
)
self.logger.info("Multiple events required for the incident to start")

Check warning on line 181 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L181

Added line #L181 was not covered by tests

enrich_incidents_with_alerts(
tenant_id=self.tenant_id,
Expand All @@ -178,7 +194,9 @@
matched_sub_rules = set()

for alert in incident.alerts:
matched_sub_rules = matched_sub_rules.union(self._check_if_rule_apply(rule, AlertDto(**alert.event)))
matched_sub_rules = matched_sub_rules.union(

Check warning on line 197 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L197

Added line #L197 was not covered by tests
self._check_if_rule_apply(rule, AlertDto(**alert.event))
)
if all_sub_rules == matched_sub_rules:
is_all_conditions_met = True
break
Expand All @@ -193,13 +211,14 @@
return incident

@staticmethod
def _resolve_incident_if_require(rule: Rule, incident: Incident, session: Session) -> Incident:
def _resolve_incident_if_require(
rule: Rule, incident: Incident, session: Session
) -> Incident:

should_resolve = False

if (
rule.resolve_on == ResolveOn.ALL.value
and is_all_alerts_resolved(incident=incident, session=session)
if rule.resolve_on == ResolveOn.ALL.value and is_all_alerts_resolved(

Check warning on line 220 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L220

Added line #L220 was not covered by tests
incident=incident, session=session
):
should_resolve = True

Expand Down Expand Up @@ -336,18 +355,20 @@
Returns:
list[AlertDto]: list of alerts that are related to the cel
"""
logger = logging.getLogger(__name__)
self.logger.info(f"Filtering alerts with CEL expression {cel}")

Check warning on line 358 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L358

Added line #L358 was not covered by tests
# if the cel is empty, return all the alerts
if cel == "":
return alerts
# if the cel is empty, return all the alerts
if not cel:
logger.debug("No CEL expression provided")
self.logger.info("No CEL expression provided")

Check warning on line 364 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L364

Added line #L364 was not covered by tests
return alerts
# preprocess the cel expression
self.logger.info(f"Preprocessing the CEL expression {cel}")

Check warning on line 367 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L367

Added line #L367 was not covered by tests
cel = preprocess_cel_expression(cel)
ast = self.env.compile(cel)
prgm = self.env.program(ast)
self.logger.info(f"CEL expression {cel} compiled successfully")

Check warning on line 371 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L371

Added line #L371 was not covered by tests
filtered_alerts = []

for i, alert in enumerate(alerts):
Expand All @@ -363,33 +384,35 @@
continue
# unknown
elif "no such overload" in str(e):
logger.debug(
self.logger.debug(

Check warning on line 387 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L387

Added line #L387 was not covered by tests
f"Type mismtach between operator and operand in the CEL expression {cel} for alert {alert.id}"
)
continue
elif "found no matching overload" in str(e):
logger.debug(
self.logger.debug(

Check warning on line 392 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L392

Added line #L392 was not covered by tests
f"Type mismtach between operator and operand in the CEL expression {cel} for alert {alert.id}"
)
continue
logger.warning(
self.logger.warning(

Check warning on line 396 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L396

Added line #L396 was not covered by tests
f"Failed to evaluate the CEL expression {cel} for alert {alert.id} - {e}"
)
continue
except Exception:
logger.exception(
self.logger.exception(

Check warning on line 401 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L401

Added line #L401 was not covered by tests
f"Failed to evaluate the CEL expression {cel} for alert {alert.id}"
)
continue
if r:
filtered_alerts.append(alert)

self.logger.info(f"Filtered {len(filtered_alerts)} alerts")

Check warning on line 408 in keep/rulesengine/rulesengine.py

View check run for this annotation

Codecov / codecov/patch

keep/rulesengine/rulesengine.py#L408

Added line #L408 was not covered by tests
return filtered_alerts

def _send_workflow_event(self, session: Session, incident_dto: IncidentDto, action: str):
def _send_workflow_event(
self, session: Session, incident_dto: IncidentDto, action: str
):
pusher_client = get_pusher_client()
incident_bl = IncidentBl(self.tenant_id, session, pusher_client)

incident_bl.send_workflow_event(incident_dto, action)
incident_bl.update_client_on_incident_change(incident_dto.id)

2 changes: 2 additions & 0 deletions keep/searchengine/searchengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def _get_last_alerts(
lower_timestamp=lower_timestamp,
upper_timestamp=upper_timestamp,
with_incidents=True,
optimized=True,
)
self.logger.info("Finished querying last alerts, converting to DTO")
# convert the alerts to DTO
alerts_dto = convert_db_alerts_to_dto_alerts(alerts)
self.logger.info(
Expand Down
Loading