From 232d6b75a42d58a442d6594364db9107161d3525 Mon Sep 17 00:00:00 2001 From: Nils Rennebarth Date: Fri, 13 Dec 2024 17:05:08 +0100 Subject: [PATCH] Support parsing of new snort3 rule types snort3 introduces three new rule types: service rules, file rules and file identification rules. The options of these type have the same syntax, they only differ in the header: * Service rules have a two word header: _action_ _service_ where service is the name of an application level protocol, e.g. http, imap, ... * File rules have a two word header: _action_ `file` and action is one of the normal snort actions. * File identification rules have the fixed one word header `file_id`. The patch also adds support for the "rewrite" option that had already been introduced in snort2 The part of the `parse` function that deals with the header was moved into a separate function: `parse_header`. We note that the first two words of a snort rule can not contain spaces, so these can be split off immediately and allow us to handle all new cases. The rest of the header analysis starts with the 3rd word and is mostly unchanged from before. The new parse_header function returns the dict with the header keys or None if the header does not look like a valid snort rule. --- idstools/rule.py | 144 +++++++++++++++++++++++++---------------------- 1 file changed, 77 insertions(+), 67 deletions(-) diff --git a/idstools/rule.py b/idstools/rule.py index d0af8da..90c20a1 100644 --- a/idstools/rule.py +++ b/idstools/rule.py @@ -53,7 +53,16 @@ # Rule actions we expect to see. actions = ( - "alert", "config", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop") + "activate", "alert", "config", "drop", "dynamic", "file_id", "log", "pass", + "reject", "rewrite", "sdrop" +) + +# Services +services = ( + "dce_http_proxy", "dce_http_server", "dcerpc", "dnp3", "file", "ftp", + "http", "http2", "icmp", "imap", "ip", "modbus", "mms", "netflow", + "netbios-ssn", "pop3", "rpc", "s7commplus", "sip", "smtp", "ssh", + "ssl", "tcp", "telnet", "udp") class Rule(dict): """Class representing a rule. @@ -199,6 +208,69 @@ def find_opt_end(options): else: return offset + i +def parse_header(header): + """Parse the rule reader + + :param header: A string containing the header of a rule + + :returns: A dict with keys action, proto, source_addr, source_port, + direction, dest_addr, dest_port. + """ + info = dict( + action=None, + proto=None, + source_addr=None, + source_port=None, + direction=None, + dest_addr=None, + dest_port=None + ) + words = header.split(" ", maxsplit=2) + info["action"] = words[0] + if info["action"] not in actions: + return None + if len(words) == 1: + return info + info["proto"] = words[1] + if info["proto"] not in services: + return None + if len(words) == 2: + return info + + states = [ + "source_addr", + "source_port", + "direction", + "dest_addr", + "dest_port", + ] + state = 0 + + rem = words[2] + while state < len(states): + if not rem: + return None + if rem[0] == "[": + end = rem.find("]") + if end < 0: + return None + end += 1 + token = rem[:end].strip() + rem = rem[end:].strip() + else: + end = rem.find(" ") + if end < 0: + token = rem + rem = "" + else: + token = rem[:end].strip() + rem = rem[end:].strip() + + info[states[state]] = token + state += 1 + + return info + def parse(buf, group=None): """ Parse a single rule for a string buffer. @@ -221,75 +293,13 @@ def parse(buf, group=None): enabled = True header = m.group("header").strip() - - # If a decoder rule, the header will be one word. - if len(header.split(" ")) == 1: - action = header - proto = None - source_addr = None - source_port = None - direction = None - dest_addr = None - dest_port = None - else: - states = ["action", - "proto", - "source_addr", - "source_port", - "direction", - "dest_addr", - "dest_port", - ] - state = 0 - - rem = header - while state < len(states): - if not rem: - return None - if rem[0] == "[": - end = rem.find("]") - if end < 0: - return - end += 1 - token = rem[:end].strip() - rem = rem[end:].strip() - else: - end = rem.find(" ") - if end < 0: - token = rem - rem = "" - else: - token = rem[:end].strip() - rem = rem[end:].strip() - - if states[state] == "action": - action = token - elif states[state] == "proto": - proto = token - elif states[state] == "source_addr": - source_addr = token - elif states[state] == "source_port": - source_port = token - elif states[state] == "direction": - direction = token - elif states[state] == "dest_addr": - dest_addr = token - elif states[state] == "dest_port": - dest_port = token - - state += 1 - - if action not in actions: + info = parse_header(header) + if info is None: return None - rule = Rule(enabled=enabled, action=action, group=group) + rule = Rule(enabled=enabled, action=info["action"], group=group) rule["header"] = header - rule["proto"] = proto - rule["source_addr"] = source_addr - rule["source_port"] = source_port - rule["direction"] = direction - rule["dest_addr"] = dest_addr - rule["dest_port"] = dest_port + rule.update(info) options = m.group("options")