-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmalware_classifier.py
More file actions
151 lines (135 loc) · 6.09 KB
/
Copy pathmalware_classifier.py
File metadata and controls
151 lines (135 loc) · 6.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import joblib
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, f1_score
import lightgbm as lgb
# from lstm import LSTMClassifier
from preprocess import Preprocessor
class MalwareClassifierEvaluator:
"""
A class to evaluate different classifiers for malware classification using API traces.
Attributes:
X_train: Training data
X_test: Testing data
y_train: Training labels
y_test: Testing labels
"""
def __init__(self, X_train, X_test, y_train, y_test):
"""
Initializes the MalwareClassifierEvaluator with the training and testing data.
:param X_train: Training data
:param X_test: Testing data
:param y_train: Training labels
:param y_test: Testing labels
"""
self.X_train = X_train
self.X_test = X_test
self.y_train = y_train
self.y_test = y_test
self.classifiers = {
'Logistic Regression': LogisticRegression(
max_iter=500,
C = 0.01,
solver = 'newton-cg',
class_weight= 'balanced',
),
'K-Nearest Neighbors': KNeighborsClassifier(),
# 'Decision Tree': DecisionTreeClassifier(),
'Random Forest': RandomForestClassifier(
n_estimators=1000, # Number of trees
max_depth=30, # Maximum depth of each tree
max_features='sqrt', # Number of features to consider for the best split
class_weight='balanced', # Handle class imbalance
),
'LightGBM': lgb.LGBMClassifier(
boosting_type='gbdt',
n_estimators=1000,
learning_rate=0.01,
max_depth=20,
num_leaves=64,
class_weight='balanced',
random_state=42,
)
# 'LSTM': LSTMClassifier(vocab_size=self.X_train.shape[1])
}
self.best_model = None
self.best_score = 0
def _core_train(self, X_train, y_train, X_test, clf, clf_name):
print(f"Training {clf_name}...")
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
self.evaluate_classifier(clf, clf_name, y_pred)
def train_and_evaluate(self):
"""Train each classifier and evaluate its performance."""
for clf_name, clf in self.classifiers.items():
if clf_name == 'LSTM':
# Train and evaluate the LSTM
preprocessor = Preprocessor('api_trace.csv', 'apt_trace_labels.txt')
print(f"Training {clf_name}...")
X_train, X_test, y_train, y_test = preprocessor.preprocess_for_lstm()
clf.fit(X_train, y_train, X_test, y_test)
f1 = clf.evaluate(X_test, y_test)
self.update_best_model(f1, clf_name)
else:
if clf_name in ['Logistic Regression']:
print(f"Scaling features for {clf_name}...")
X_train_scaled = self.scale_features(self.X_train)
X_test_scaled = self.scale_features(self.X_test)
self._core_train(X_train_scaled, self.y_train, X_test_scaled, clf, clf_name)
else:
self._core_train(self.X_train, self.y_train, self.X_test, clf, clf_name)
# Save the best model to a file
print("Saving the best model...")
self.save_best_model()
def scale_features(self, X):
"""Standardize the feature vectors to have a mean of 0 and a standard deviation of 1"""
self.scaler = StandardScaler()
return self.scaler.fit_transform(X)
def evaluate_classifier(self, clf, clf_name, y_pred):
"""Evaluates the classifier using F1 score and confusion matrix."""
cm = confusion_matrix(self.y_test, y_pred)
self.plot_confusion_matrix(cm, clf_name)
f1_macro = f1_score(self.y_test, y_pred, average='macro')
print(f"Average (Macro) F1 Score for {clf_name}: {f1_macro:.3f}")
self.update_best_model(f1_macro, clf)
def update_best_model(self, f1_score, clf):
"""Update the best model if the current one has a higher F1 score."""
if f1_score > self.best_score:
self.best_score = f1_score
self.best_model = clf
def save_best_model(self):
"""Save the best model using joblib."""
if self.best_model:
joblib.dump(self.best_model, 'best_model.joblib')
print(f"Best model saved with F1-score: {self.best_score}")
@staticmethod
def _save_plot(path):
"""
Private helper function to save the plot to the specified path
:param path: The file path where the plot will be saved
"""
save_dir = os.path.dirname(path)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
plt.savefig(path)
plt.show()
def plot_confusion_matrix(self, cm, clf_name):
"""Generate a heatmap for the confusion matrix.
:param cm: Confusion matrix
:param clf_name: Name of the classifier
"""
class_labels = sorted(set(self.y_test))
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False,
xticklabels=class_labels, yticklabels=class_labels)
plt.title(f'Confusion Matrix for {clf_name}')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
path = f"resources/cm_{clf_name.lower().replace(' ', '_')}.png"
self._save_plot(path)