From 5ce38074b0b83e64bd06440c0010d9de7c7597da Mon Sep 17 00:00:00 2001 From: ugmom <ugmom@student.kit.edu> Date: Sun, 23 Mar 2025 21:15:34 +0100 Subject: [PATCH] final code clean up --- code/machine_learning_models/decision_tree.py | 12 +- code/machine_learning_models/knn.py | 7 +- .../logistic_regression.py | 1 - code/machine_learning_models/random_forest.py | 10 +- code/machine_learning_models/utilities.py | 113 +++++++++++------- .../test/test_packet_capturing.py | 2 +- 6 files changed, 89 insertions(+), 56 deletions(-) diff --git a/code/machine_learning_models/decision_tree.py b/code/machine_learning_models/decision_tree.py index 2438a15..aa61493 100644 --- a/code/machine_learning_models/decision_tree.py +++ b/code/machine_learning_models/decision_tree.py @@ -1,6 +1,5 @@ import numpy as np import pandas as pd -import seaborn as sns import warnings import os @@ -13,7 +12,6 @@ from utilities import plot_features, ordinal_encode, normalize, plot_confusion_m warnings.filterwarnings("ignore") - # Constants y_data = 'class' y_columns = ['normal', 'anomaly'] @@ -21,6 +19,8 @@ df_train, df_test, model_name = import_data( train_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTrain+.arff"), test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"), model_name = "Decision Tree") + +# Defining scalers sc = StandardScaler() enc = LabelEncoder() @@ -62,7 +62,6 @@ y_test = df_test[[y_data]] # What is overfitting? # Machine learning model "memorizes" the training data, rather than understanding the underlying pattern -# Training model dtc = DecisionTreeClassifier() def predict(prediction_input): @@ -76,6 +75,7 @@ def train(): print("Training complete.") def graphs(): + # Classification report y_prediction = dtc.predict(X_test) print("Classification report: \n", classification_report(y_test, y_prediction)) @@ -85,11 +85,15 @@ def graphs(): columns=['Importance']).sort_values(by='Importance', ascending=False) plot_features(features, model_name=model_name) + # Plot confusion matrix plot_confusion_matrix(confusion_matrix=confusion_matrix(y_test, y_prediction), accuracy=dtc.score(X_test, y_test), model_name=model_name) + # Print samples for which the model is 90% confident print_high_confidence_samples(model=dtc, x=X_train) + + # Plot quantity of elements in class 0 and class 1 plot_counts(y_data, df_train) - print("Graphs complete.") \ No newline at end of file + print("Graphs complete.") diff --git a/code/machine_learning_models/knn.py b/code/machine_learning_models/knn.py index 37d9795..2b9ef34 100644 --- a/code/machine_learning_models/knn.py +++ b/code/machine_learning_models/knn.py @@ -1,7 +1,6 @@ import warnings import numpy as np import pandas as pd -import seaborn as sns import os from sklearn.metrics import classification_report, confusion_matrix @@ -20,6 +19,7 @@ df_train, df_test, model_name = import_data( test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"), model_name="KNN") +# Defining scalers sc = StandardScaler() enc = LabelEncoder() @@ -49,6 +49,7 @@ def train(): print("Training complete.") def graphs(): + # Classification report y_prediction = model.predict(X_test) print("Classification report: \n", classification_report(y_test, y_prediction)) plot_confusion_matrix(confusion_matrix=confusion_matrix(y_test, y_prediction), @@ -58,6 +59,8 @@ def graphs(): # Calculate prediction probabilities for ROC curve y_score = model.predict_proba(X_test)[:, 1] plot_roc_curve(y_test, y_score, model_name=model_name) + + # Plot quantity of elements in class 0 and class 1 plot_counts(y_data, df_train) - print("Graphs complete.") \ No newline at end of file + print("Graphs complete.") diff --git a/code/machine_learning_models/logistic_regression.py b/code/machine_learning_models/logistic_regression.py index 1b56321..832fc4a 100644 --- a/code/machine_learning_models/logistic_regression.py +++ b/code/machine_learning_models/logistic_regression.py @@ -72,7 +72,6 @@ y_test = df_test[[y_data]] model = LogisticRegression() - def predict(prediction_input): if len(prediction_input) == 0: return diff --git a/code/machine_learning_models/random_forest.py b/code/machine_learning_models/random_forest.py index 7fa6abb..9490865 100644 --- a/code/machine_learning_models/random_forest.py +++ b/code/machine_learning_models/random_forest.py @@ -23,6 +23,8 @@ df_train, df_test, model_name = import_data( train_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTrain+.arff"), test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"), model_name = "Random Forest") + +# Defining scalers sc = StandardScaler() enc = LabelEncoder() @@ -41,9 +43,8 @@ X_test = df_test.select_dtypes(include=[np.number]).drop(columns = [y_data]) y_train = df_train[[y_data]] y_test = df_test[[y_data]] -# Train Random Forest Model model = RandomForestClassifier(n_estimators=100, max_depth=None, random_state=42) -# Prediction function + def predict(prediction_input): if len(prediction_input) == 0: return @@ -95,10 +96,7 @@ def graphs(): # Plot ROC curve using the function from utilities plot_roc_curve(y_test, y_score, model_name=model_name) + # Plot quantity of elements in class 0 and class 1 plot_counts(y_data, df_train) print("Graphs complete.") - -if __name__ == "__main__": - train() - graphs() \ No newline at end of file diff --git a/code/machine_learning_models/utilities.py b/code/machine_learning_models/utilities.py index 17268dc..9f978dd 100644 --- a/code/machine_learning_models/utilities.py +++ b/code/machine_learning_models/utilities.py @@ -11,18 +11,13 @@ from sklearn.preprocessing import OrdinalEncoder show_plots = False -y_data = ['normal', 'anomaly'] # Plots -def heat_map(df, model_name=None): +def heat_map(df, model_name=None) -> None: """ Generates a heatmap of the correlation matrix for numerical features in the DataFrame. - - Parameters: - df (pd.DataFrame): The input DataFrame. - - Modifies: - Displays a heatmap visualization of the feature correlations. + :param df: The given dataframe. + :param model_name: The name of the model to use. """ # Drop all NaN df.dropna(axis='columns') @@ -35,17 +30,13 @@ def heat_map(df, model_name=None): if show_plots: plt.show() -def plot_xy(df, x, y, model_name=None): +def plot_xy(df, x, y, model_name=None) -> None: """ Creates a scatter plot for two numerical columns. - - Parameters: - df (pd.DataFrame): The input DataFrame. - x (str): The column name for the x-axis. - y (str): The column name for the y-axis. - - Modifies: - Displays a scatter plot of the two selected features. + :param df: The given dataframe. + :param x: First feature name to be plotted in the x-axis. + :param y: Second feature name to be plotted in the y-axis. + :param model_name: The name of the model to use. """ plt.scatter(df[x], df[y]) plt.xlabel(x) @@ -55,13 +46,12 @@ def plot_xy(df, x, y, model_name=None): if show_plots: plt.show() -def plot_features(features, info_text: str = None, model_name=None): +def plot_features(features, info_text: str = None, model_name=None) -> None: """ - Parameters: - columns (list): The list of feature names used in the model. - - Modifies: - Displays a horizontal bar chart representing feature importance. + Displays a bar graph with the importance of each feature. + :param features: The given dataframe with all feature importance. + :param info_text: Optional text in the legend. + :param model_name: The name of the model to use. """ plt.figure(figsize=(10, 10)) @@ -88,6 +78,13 @@ def plot_features(features, info_text: str = None, model_name=None): plt.show() def plot_confusion_matrix(confusion_matrix: List[List[int]], accuracy: float, model_name=None) -> None: + """ + Plots the confusion matrix as a heatmap. + :param confusion_matrix: The given confusion matrix. + :param accuracy: Accuracy score of the given confusion matrix. + :param model_name: The name of the model to use. + :return: + """ if len(confusion_matrix) != 2 or any(len(row) != 2 for row in confusion_matrix): raise ValueError("Confusion matrices must be 2x2") @@ -106,8 +103,10 @@ def plot_precision_recall_curve(precision, recall, model_name=None): A Precision-Recall curve shows the trade-off between precision (how many predicted positives are actually correct) and recall (how many actual positives were correctly identified). - - A good curve is mostly at the top and right. + :param precision: Precision of the model. + :param recall: Recall of the model. + :param model_name: The name of the model to use. + :return: """ plt.figure(figsize=(8, 6)) plt.plot(recall, precision, marker='.', label="Precision-Recall Curve") @@ -125,6 +124,10 @@ def plot_learning_curve(train_sizes, train_scores, test_scores, model_name=None) """ A learning curve helps diagnose overfitting or underfitting by plotting training and validation performance as training size increases. + :param train_sizes: The train sizes of the model. + :param train_scores: The train scores of the model. + :param test_scores: The test scores of the model. + :param model_name: The name of the model to use. """ train_mean = np.mean(train_scores, axis=1) train_std = np.std(train_scores, axis=1) @@ -148,16 +151,25 @@ def plot_learning_curve(train_sizes, train_scores, test_scores, model_name=None) if show_plots: plt.show() -def plot_counts(target, df): +def plot_counts(target, df) -> None: + """ + Display a bar graph counting the quantity of the target variable. + :param target: The target variable in the dataframe. + :param df: The given dataframe. + """ plt.clf() sns.countplot(x = target, data = df) save_plot("Count") if show_plots: plt.show() -def plot_roc_curve(y_true, y_score, model_name=None): +def plot_roc_curve(y_true, y_score, model_name=None) -> None: """ Plots the ROC curve for a binary classification model. + :param y_true: The true labels. + :param y_score: The predicted probabilities. + :param model_name: The name of the model to use. + :return: """ fpr, tpr, _ = roc_curve(y_true, y_score) roc_auc = auc(fpr, tpr) @@ -177,13 +189,23 @@ def plot_roc_curve(y_true, y_score, model_name=None): if show_plots: plt.show() -def save_plot(name): +def save_plot(name) -> None: + """ + Saves the current plot. + :param name: The name of the model to use. + """ os.makedirs("resulting_figures", exist_ok=True) plt.savefig("resulting_figures/" + name, dpi=300, bbox_inches='tight') # Data processing def import_data(train_file_path, test_file_path, model_name): + """ + Imports data from the given path. + :param train_file_path: The path to the training data file. + :param test_file_path: The path to the test data file. + :param model_name: The name of the model to use. + """ data, meta = arff.loadarff(train_file_path) df_train = pd.DataFrame(data) df_train = df_train.map(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x) @@ -194,26 +216,28 @@ def import_data(train_file_path, test_file_path, model_name): return df_train, df_test, model_name -def ordinal_encode(df, categories, target): +def ordinal_encode(df, categories, target) -> None: """ Applies ordinal encoding to a specified categorical column in a DataFrame. - - Parameters: - df (pd.DataFrame): The input DataFrame. - categories (list): A list containing the ordered categories for encoding. - target (str): The column name to be encoded. - - Raises: - TypeError: If categories are not provided or do not contain exactly two elements. - - Modifies: - The function modifies the input DataFrame by replacing the target column with its encoded values. + :param df: The given dataframe. + :param categories: The possible values of target. + :param target: The column to be encoded. + :raises TypeError: Occurs when there are more than two possible values for target. """ if categories is None or len(categories) != 2: raise TypeError("Categories must be provided") df[target] = OrdinalEncoder(categories = [categories]).fit_transform(df[[target]]) -def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler): +def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler) -> None: + """ + Normalize the data in the given dataframe. + :param df_train: The training dataframe. + :param df_test: The test dataframe. + :param exclude: Columns to be excluded from normalisation. + :param numerical_scaler: A scaler to normalize numerical values. + :param label_scaler: A scaler to normalize categorical values. + :return: + """ df_temp = pd.concat([df_train, df_test]) scale_targets = df_temp.select_dtypes(include=np.number).drop(columns=exclude).columns numerical_scaler.fit_transform(df_temp[scale_targets]) @@ -229,7 +253,12 @@ def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler): # Additional metrics -def print_high_confidence_samples(model, x: pd.DataFrame): +def print_high_confidence_samples(model, x: pd.DataFrame) -> None: + """ + Prints in the output stream samples for which the model is 90% confident about. + :param model: The given model. + :param x: The features of the dataframe. + """ # Get predicted probabilities predicted_probabilities = pd.DataFrame(model.predict_proba(x)[:, 1], columns=['confidence level']) # Probability of being class 1 diff --git a/code/package_capture/test/test_packet_capturing.py b/code/package_capture/test/test_packet_capturing.py index 6d918fc..3303b56 100644 --- a/code/package_capture/test/test_packet_capturing.py +++ b/code/package_capture/test/test_packet_capturing.py @@ -102,7 +102,7 @@ class TestPacketCapturing(unittest.TestCase): time.sleep(1) ip_rate_based_anomaly_detection(packet) mock_print.assert_called() - self.assertEqual({}, get_dicts()[0], "Expected the packet's IP") + self.assertEqual({"100.84.6.141": 50}, get_dicts()[0], "Expected the packet's IP") def test_icmp_flood_detection(self): repetitions = 150 -- GitLab