-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
107 lines (89 loc) · 4.23 KB
/
Copy pathmain.py
File metadata and controls
107 lines (89 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import time
import sys
from config import AUTO_LEARNING_DURATION, ANOMALY_THRESHOLD
from sniffer import PacketSniffer
from ml_engine import MLEngine
from logger import log_packet, log_anomaly
from notifier import send_telegram_alert
def main():
print("=======================================")
print(" SentinelX-Hunter Started ")
print("=======================================")
sniffer = PacketSniffer()
ml_engine = MLEngine()
# 1. Start packet sniffer
sniffer.start()
# 2. Auto-Learning Phase
print(f"\n[*] Starting Auto-Learning Phase for {AUTO_LEARNING_DURATION} seconds...")
learning_end_time = time.time() + AUTO_LEARNING_DURATION
training_data = []
try:
while time.time() < learning_end_time:
if sniffer.error:
print(f"\n[!] Aborting: Sniffer encountered an error: {sniffer.error}")
print("[!] Please ensure Npcap is installed and you are running as Administrator.")
return
packet = sniffer.get_packet(timeout=0.5)
if packet:
training_data.append(packet)
# Log normal packet to DB
log_packet(
packet['src_ip'], packet['dst_ip'],
packet['src_port'], packet['dst_port'],
packet['payload_size'], is_anomaly=0
)
# Simple progress indicator
remaining = int(learning_end_time - time.time())
sys.stdout.write(f"\r[*] Auto-learning time remaining: {remaining}s | Packets collected: {len(training_data)}")
sys.stdout.flush()
print("\n[*] Auto-Learning Phase completed.")
# 3. Train the ML Model
if len(training_data) > 0:
ml_engine.train(training_data)
else:
print("[-] Warning: No network traffic captured during learning phase. Model will use default settings.")
# Depending on strictness, we might want to exit here, but we'll continue for now.
# 4. Monitoring Phase
print("\n[*] Entering Monitoring Phase (Real-Time Threat Detection)...")
while True:
if sniffer.error:
print(f"\n[!] Aborting: Sniffer encountered an error: {sniffer.error}")
print("[!] Please ensure Npcap is installed and you are running as Administrator.")
return
packet = sniffer.get_packet(timeout=1.0)
if not packet:
continue
# Predict anomaly
is_anomaly, score = ml_engine.predict(packet)
# We can use the configured ANOMALY_THRESHOLD if we want more strict control,
# or rely on Scikit-Learn's default boolean output.
# Here we will use the boolean output but check the threshold as an alternative:
# if score < ANOMALY_THRESHOLD: ...
if is_anomaly or score < ANOMALY_THRESHOLD:
print(f"\n[!] ANOMALY DETECTED! IP: {packet['src_ip']} -> {packet['dst_ip']} | Score: {score:.4f}")
# Send alert via Telegram
geo_loc = send_telegram_alert(
packet['src_ip'], packet['dst_ip'],
packet['src_port'], packet['dst_port'],
packet['payload_size'], score
)
# Log to anomalies table
log_anomaly(
packet['src_ip'], packet['dst_ip'],
packet['src_port'], packet['dst_port'],
packet['payload_size'], score, geo_loc or "Unknown"
)
else:
# Optionally log normal packets during monitoring (commented out to save DB space)
# log_packet(packet['src_ip'], packet['dst_ip'], packet['src_port'], packet['dst_port'], packet['payload_size'], 0)
pass
except KeyboardInterrupt:
print("\n[*] Shutting down SentinelX-Hunter...")
except Exception:
# Catch all exceptions to prevent sensitive stack trace leakage
print("\n[-] A critical execution error occurred. Shutting down securely.")
finally:
sniffer.stop()
print("[+] Exit complete.")
if __name__ == "__main__":
main()