Skip to content

Commit

Permalink
Support parsing of new snort3 rule types
Browse files Browse the repository at this point in the history
snort3 introduces three new rule types: service rules, file rules
and file identification rules. The options of these type have the
same syntax, they only differ in the header:

* Service rules have a two word header: _action_ _service_ where
  service is the name of an application level protocol, e.g. http,
  imap, ...
* File rules have a two word header: _action_ `file` and action is
  one of the normal snort actions.
* File identification rules have the fixed one word header `file_id`.

The patch also adds support for the "rewrite" option that had
already been introduced in snort2

The part of the `parse` function that deals with the header was
moved into a separate function: `parse_header`. We note that the
first two words of a snort rule can not contain spaces, so these
can be split off immediately and allow us to handle all new cases.
The rest of the header analysis starts with the 3rd word and is
mostly unchanged from before.

The new parse_header function returns the dict with the header
keys or None if the header does not look like a valid snort rule.
  • Loading branch information
nilsrennebarth committed Dec 13, 2024
1 parent 51aebc0 commit 232d6b7
Showing 1 changed file with 77 additions and 67 deletions.
144 changes: 77 additions & 67 deletions idstools/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@

# Rule actions we expect to see.
actions = (
"alert", "config", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop")
"activate", "alert", "config", "drop", "dynamic", "file_id", "log", "pass",
"reject", "rewrite", "sdrop"
)

# Services
services = (
"dce_http_proxy", "dce_http_server", "dcerpc", "dnp3", "file", "ftp",
"http", "http2", "icmp", "imap", "ip", "modbus", "mms", "netflow",
"netbios-ssn", "pop3", "rpc", "s7commplus", "sip", "smtp", "ssh",
"ssl", "tcp", "telnet", "udp")

class Rule(dict):
"""Class representing a rule.
Expand Down Expand Up @@ -199,6 +208,69 @@ def find_opt_end(options):
else:
return offset + i

def parse_header(header):
"""Parse the rule reader
:param header: A string containing the header of a rule
:returns: A dict with keys action, proto, source_addr, source_port,
direction, dest_addr, dest_port.
"""
info = dict(
action=None,
proto=None,
source_addr=None,
source_port=None,
direction=None,
dest_addr=None,
dest_port=None
)
words = header.split(" ", maxsplit=2)
info["action"] = words[0]
if info["action"] not in actions:
return None
if len(words) == 1:
return info
info["proto"] = words[1]
if info["proto"] not in services:
return None
if len(words) == 2:
return info

states = [
"source_addr",
"source_port",
"direction",
"dest_addr",
"dest_port",
]
state = 0

rem = words[2]
while state < len(states):
if not rem:
return None
if rem[0] == "[":
end = rem.find("]")
if end < 0:
return None
end += 1
token = rem[:end].strip()
rem = rem[end:].strip()
else:
end = rem.find(" ")
if end < 0:
token = rem
rem = ""
else:
token = rem[:end].strip()
rem = rem[end:].strip()

info[states[state]] = token
state += 1

return info

def parse(buf, group=None):
""" Parse a single rule for a string buffer.
Expand All @@ -221,75 +293,13 @@ def parse(buf, group=None):
enabled = True

header = m.group("header").strip()

# If a decoder rule, the header will be one word.
if len(header.split(" ")) == 1:
action = header
proto = None
source_addr = None
source_port = None
direction = None
dest_addr = None
dest_port = None
else:
states = ["action",
"proto",
"source_addr",
"source_port",
"direction",
"dest_addr",
"dest_port",
]
state = 0

rem = header
while state < len(states):
if not rem:
return None
if rem[0] == "[":
end = rem.find("]")
if end < 0:
return
end += 1
token = rem[:end].strip()
rem = rem[end:].strip()
else:
end = rem.find(" ")
if end < 0:
token = rem
rem = ""
else:
token = rem[:end].strip()
rem = rem[end:].strip()

if states[state] == "action":
action = token
elif states[state] == "proto":
proto = token
elif states[state] == "source_addr":
source_addr = token
elif states[state] == "source_port":
source_port = token
elif states[state] == "direction":
direction = token
elif states[state] == "dest_addr":
dest_addr = token
elif states[state] == "dest_port":
dest_port = token

state += 1

if action not in actions:
info = parse_header(header)
if info is None:
return None

rule = Rule(enabled=enabled, action=action, group=group)
rule = Rule(enabled=enabled, action=info["action"], group=group)
rule["header"] = header
rule["proto"] = proto
rule["source_addr"] = source_addr
rule["source_port"] = source_port
rule["direction"] = direction
rule["dest_addr"] = dest_addr
rule["dest_port"] = dest_port
rule.update(info)

options = m.group("options")

Expand Down

0 comments on commit 232d6b7

Please sign in to comment.