Skip to content

Commit

Permalink
summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Szefler committed Apr 9, 2024
1 parent 13cdaac commit 440acc7
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
26 changes: 14 additions & 12 deletions src/robusta/core/sinks/sink_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import threading
from abc import abstractmethod, ABC
from typing import Any, List, Dict, Tuple
from collections import defaultdict
from typing import Any, List, Dict, Tuple, DefaultDict

from robusta.core.model.k8s_operation_type import K8sOperationType
from robusta.core.reporting.base import Finding
Expand All @@ -15,12 +15,12 @@ class SinkBase(ABC):

# The tuples in the types below holds all the attributes we are aggregating on.
finding_group_start_ts: Dict[Tuple, float] # timestamps for message groups
finding_group_n_ignored: Dict[Tuple, int] # number of messages ignored for each group
finding_group_n_received: DefaultDict[Tuple, int] # number of messages ignored for each group
finding_group_heads: Dict[Tuple, str] # a mapping from a set of parameters to the head of a thread

# Summary groups
finding_sgroup_header: List # descriptive header for the summary table
finding_sgroup_counts: Dict[Tuple, Dict[Tuple, int]] # rows of the summary table
finding_summary_header: List[str] # descriptive header for the summary table
finding_summary_counts: Dict[Tuple, Dict[Tuple, Tuple[int, int]]] # rows of the summary table

finding_group_lock: threading.Lock = threading.Lock()

Expand All @@ -39,17 +39,13 @@ def __init__(self, sink_params: SinkBaseParams, registry):

if sink_params.grouping:
self.grouping_enabled = True
self.finding_group_start_ts = {}
self.finding_group_n_ignored = {}
self.finding_group_heads = {}
if sink_params.grouping.notification_mode.summary:
self.grouping_summary_mode = True
self.finding_sgroup_header = []
self.finding_sgroup_counts = {}
self.finding_summary_header = []
if sink_params.grouping.notification_mode.summary.by:
for attr in sink_params.grouping.notification_mode.summary.by:
if isinstance(attr, str):
self.finding_sgroup_header.append("event" if attr == "identifier" else attr)
self.finding_summary_header.append("event" if attr == "identifier" else attr)
elif isinstance(attr, dict):
keys = list(attr.keys())
if len(keys) > 1:
Expand All @@ -64,12 +60,18 @@ def __init__(self, sink_params: SinkBaseParams, registry):
"(only labels/attributes allowed)"
)
for label_or_attr_name in attr[key]:
self.finding_sgroup_header.append(f"{key[:-1]}:{label_or_attr_name}")
self.finding_summary_header.append(f"{key[:-1]}:{label_or_attr_name}")
else:
self.grouping_summary_mode = False
else:
self.grouping_enabled = False
self.reset_grouping_data()

def reset_grouping_data(self):
self.finding_group_start_ts = {}
self.finding_group_n_received = defaultdict(int)
self.finding_group_heads = {}
self.finding_summary_counts = {}

def _build_time_slices_from_params(self, params: ActivityParams):
if params is None:
Expand Down
64 changes: 42 additions & 22 deletions src/robusta/core/sinks/slack/slack_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,59 +22,72 @@ def write_finding(self, finding: Finding, platform_enabled: bool) -> None:
self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled)

def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) -> None:
# TODO support notification_mode.regular.ignore_first
# TODO support firing/resolved
timestamp = time.time()
finding_data = finding.attribute_map
# The following will be e.g. Deployment, Job, etc. Sometimes it's undefined.
finding_data["workload"] = finding.service.resource_type if finding.service else None
summary_classification = self.classify_finding(
finding_data, self.params.grouping.notification_mode.summary.by
)
group_by_classification = self.classify_finding(finding_data, self.params.grouping.group_by)
logging.warning(f"****** {group_by_classification=}")
finding_data["workload"] = finding.service.resource_type if finding.service else "-"
group_by_classification, _ = self.classify_finding(finding_data, self.params.grouping.group_by)
with self.finding_group_lock:
if (
summary_classification not in self.finding_group_start_ts
or self.finding_group_start_ts[group_by_classification] - timestamp > self.params.grouping.interval
):
if self.finding_group_start_ts[group_by_classification] - timestamp > self.params.grouping.interval:
self.reset_grouping_data()
if group_by_classification not in self.finding_group_start_ts:
# Create a new group/thread
self.finding_group_start_ts[group_by_classification] = timestamp
slack_thread_ts = None
else:
slack_thread_ts = self.finding_group_heads[group_by_classification]
slack_thread_ts = self.finding_group_heads.get(group_by_classification)
self.finding_group_n_received[group_by_classification] += 1
if (
self.finding_group_n_received[group_by_classification] <
self.params.grouping.notification_mode.regular.ignore_first
):
return

if self.grouping_summary_mode:
logging.warning(f"****** {summary_classification=}")
slack_thread_ts = None
summary_classification, summary_classification_header = self.classify_finding(
finding_data, self.params.grouping.notification_mode.summary.by
)

if slack_thread_ts is not None:
# Continue emitting findings in an already existing Slack thread
if not self.grouping_summary_mode or self.params.grouping.notification_mode.summary.threaded:
logging.warning(f"Appending to Slack thread {slack_thread_ts}")
logging.info(f"Appending to Slack thread {slack_thread_ts}")
self.slack_sender.send_finding_to_slack(
finding, self.params, platform_enabled, thread_ts=slack_thread_ts
)
if self.grouping_summary_mode:
pass
logging.info(f"Updating summaries in Slack thread {slack_thread_ts}")
# TODO update totals in the summary message
else:
# Create the first Slack message
if self.grouping_summary_mode:
# TODO
pass
self.finding_summary_counts[group_by_classification] = {group_by_classification: (1, 0)}
logging.info("Creating first Slack summarised thread")
slack_thread_ts = self.slack_sender.send_summary_message(
summary_classification_header,
self.finding_summary_header,
self.finding_summary_counts[group_by_classification],
self.params,
platform_enabled
)
if self.params.grouping.notification_mode.summary.threaded:
self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled)
else:
slack_thread_ts = self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled)
self.finding_group_heads[summary_classification] = slack_thread_ts
logging.warning(f"Created new Slack thread {slack_thread_ts}")
self.finding_group_heads[summary_classification] = slack_thread_ts
logging.info(f"Created new Slack thread {slack_thread_ts}")

def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple:
def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple[Tuple[str], List[str]]:
values = ()
descriptions = []
for attr in attributes:
if isinstance(attr, str):
if attr not in finding_data:
logging.warning(f"Notification grouping: tried to group on non-existent attribute {attr}")
continue
values += (finding_data.get(attr), )
descriptions.append(f"{attr}={finding_data.get(attr)}")
elif isinstance(attr, dict):
if list(attr.keys()) not in [["labels"], ["attributes"]]:
logging.warning(f"Notification grouping: tried to group on non-existent attribute(s) {attr}")
Expand All @@ -83,4 +96,11 @@ def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple:
finding_data.get(top_level_attr_name, {}).get(subitem_name)
for subitem_name in attr[top_level_attr_name]
)
return values
descriptions += [
"%s=%s"%(
f"{top_level_attr_name}:{subitem_name}",
finding_data.get(top_level_attr_name, {}).get(subitem_name)
)
for subitem_name in attr[top_level_attr_name]
]
return values, descriptions
30 changes: 29 additions & 1 deletion src/robusta/integrations/slack/sender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import ssl
import tempfile
from typing import Any, Dict, List, Set
from typing import Any, Dict, List, Set, Tuple

import certifi
from slack_sdk import WebClient
Expand Down Expand Up @@ -382,3 +382,31 @@ def send_finding_to_slack(
sink_params.get_slack_channel(self.cluster_name, finding.subject.labels, finding.subject.annotations),
thread_ts
)

def send_summary_message(
self,
summary_classification_header: List[str],
finding_summary_header: List[str],
summary_table: Dict[tuple, Tuple[int, int]],
sink_params: SlackSinkParams,
platform_enabled: bool,
msg_ts: str = None # message identifier (for updates)
):
"""Create or update a summary message with tabular information about the amount of events
firing/resolved and a header describing the event group that this information concerns."""
logging.warning(f"XXX send_summary_table {summary_classification_header} {finding_summary_header} {summary_table}")

# TODO contents
try:
resp = self.slack_client.chat_postMessage(
# TODO: for the purpose of the summary, we pretend labels and annotations are empty. Is this okay?
channel=sink_params.get_slack_channel(self.cluster_name, {}, {}),
text="A summary message",
# blocks=output_blocks,
display_as_bot=True,
)
return resp["ts"]
except Exception as e:
logging.error(
f"error sending message to slack\ne={e}\ntext={message}\nchannel={channel}\nblocks={*output_blocks,}\nattachment_blocks={*attachment_blocks,}"
)

0 comments on commit 440acc7

Please sign in to comment.