-
Notifications
You must be signed in to change notification settings - Fork 0
/
query_builder.py
71 lines (58 loc) · 2.63 KB
/
query_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import sqlite3
class QueryBuilder:
def __init__(self):
self.query_parts = []
self.operators = ["AND", "OR", "NOT", "NULL"]
def get_db_connection(self):
conn = sqlite3.connect('FortiLucene.db')
conn.row_factory = sqlite3.Row
return conn
def get_field_mapping(self, field_title):
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT BuiltInQuery FROM GeneralDeviceInformation
UNION ALL
SELECT BuiltInQuery FROM CloudContainerInformation
UNION ALL
SELECT BuiltInQuery FROM EventInformation
UNION ALL
SELECT BuiltInQuery FROM ProcessInformation
UNION ALL
SELECT BuiltInQuery FROM FileInformationBasic
UNION ALL
SELECT BuiltInQuery FROM FileInformationAdvanced
UNION ALL
SELECT BuiltInQuery FROM MITREinformation
UNION ALL
SELECT BuiltInQuery FROM NetworkInformation
UNION ALL
SELECT BuiltInQuery FROM RegistryInformation
WHERE BuiltInQuery = ?
""", (field_title,))
result = cursor.fetchone()
return result['BuiltInQuery'] if result else None
def add_query_part(self, field: str, value: str, operator: str = ""):
if operator not in self.operators:
raise ValueError(f"Invalid operator: {operator}")
query_part = f"{field}:({value})"
self.query_parts.append((query_part, operator))
def build_query(self) -> str:
if not self.query_parts:
return ""
query = self.query_parts[0][0] # Start with the first query part
for i in range(1, len(self.query_parts)):
previous_operator = self.query_parts[i-1][1]
current_part = self.query_parts[i][0]
if previous_operator != "NULL":
query += f" {previous_operator} "
else:
query += " " # Add a space if the previous operator was NULL
query += current_part
return query
def clear(self):
self.query_parts = []
def validate_query(self, query: str) -> bool:
balanced_parentheses = query.count('(') == query.count(')')
valid_operators = all(op in self.operators for op in query.split() if op.upper() in self.operators)
return balanced_parentheses and valid_operators