-
Notifications
You must be signed in to change notification settings - Fork 0
/
Initializer_DB.py
70 lines (64 loc) · 2.4 KB
/
Initializer_DB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import mysql.connector
from mysql.connector import Error
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger()
# Database configuration
db_config = {
"host": "localhost",
"user": "sigma",
"password": "sigma",
"database": "sigma_db",
}
# Initialize SQL tables
def initialize_sql_tables():
"""Create the sigma_alerts and dbscan_outlier tables in the database if they don't exist."""
connection = None
try:
connection = mysql.connector.connect(**db_config)
if connection.is_connected():
with connection.cursor() as cursor:
# Create sigma_alerts table
create_sigma_alerts_query = """
CREATE TABLE IF NOT EXISTS sigma_alerts (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
tags TEXT,
description TEXT,
system_time DATETIME,
computer_name VARCHAR(100),
user_id VARCHAR(100),
event_id VARCHAR(50),
provider_name VARCHAR(100),
dbscan_cluster INT,
raw TEXT
);
"""
cursor.execute(create_sigma_alerts_query)
# Create dbscan_outlier table
create_dbscan_outlier_query = """
CREATE TABLE IF NOT EXISTS dbscan_outlier (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
tags TEXT,
description TEXT,
system_time DATETIME,
computer_name VARCHAR(100),
user_id VARCHAR(100),
event_id VARCHAR(50),
provider_name VARCHAR(100),
dbscan_cluster INT,
raw TEXT
);
"""
cursor.execute(create_dbscan_outlier_query)
connection.commit()
logger.info("Initialized SQL tables 'sigma_alerts' and 'dbscan_outlier'.")
except Error as e:
logger.error(f"Error initializing SQL tables: {e}")
finally:
if connection and connection.is_connected():
connection.close()
if __name__ == "__main__":
initialize_sql_tables()