From 52054a99275f65e068bfd91d585df32d8f47c088 Mon Sep 17 00:00:00 2001 From: Daniel Yang <t5wol3yv@duck.com> Date: Mon, 3 Mar 2025 13:10:18 +0100 Subject: [PATCH] code refactoring --- code/machine_learning_models/decision_tree.py | 9 +- .../logistic_regression.py | 18 ++-- code/machine_learning_models/random_forest.py | 8 +- code/machine_learning_models/utilities.py | 89 ++++++++++--------- 4 files changed, 63 insertions(+), 61 deletions(-) diff --git a/code/machine_learning_models/decision_tree.py b/code/machine_learning_models/decision_tree.py index 2d22b76..8b45961 100644 --- a/code/machine_learning_models/decision_tree.py +++ b/code/machine_learning_models/decision_tree.py @@ -16,6 +16,7 @@ warnings.filterwarnings("ignore") # Constants y_data = 'class' +y_columns = ['normal', 'anomaly'] df_train, df_test, model_name = import_data( train_file_path = "nsl-kdd-dataset/" + "KDDTrain+.arff", test_file_path = "nsl-kdd-dataset/" + "KDDTest+.arff", @@ -23,13 +24,9 @@ df_train, df_test, model_name = import_data( sc = StandardScaler() enc = LabelEncoder() -is_threat = df_train[y_data].unique() -if len(is_threat) != 2: - raise Exception("Target must be a binary decision.") - # Normalize data -ordinal_encode(df = df_train, categories = is_threat, target = y_data) -ordinal_encode(df = df_test, categories = is_threat, target = y_data) +ordinal_encode(df = df_train, categories = y_columns, target = y_data) +ordinal_encode(df = df_test, categories = y_columns, target = y_data) normalize(df_train, df_test, y_data, sc, enc) diff --git a/code/machine_learning_models/logistic_regression.py b/code/machine_learning_models/logistic_regression.py index f71f0db..6be4e33 100644 --- a/code/machine_learning_models/logistic_regression.py +++ b/code/machine_learning_models/logistic_regression.py @@ -3,10 +3,12 @@ import pandas as pd import seaborn as sns import warnings + +from sklearn.model_selection import learning_curve from sklearn.preprocessing import StandardScaler, LabelEncoder from utilities import ordinal_encode, heat_map, plot_features, plot_confusion_matrix, normalize, \ - print_high_confidence_samples, plot_counts, import_data, plot_roc_curve + print_high_confidence_samples, plot_counts, import_data, plot_roc_curve, plot_learning_curve warnings.filterwarnings("ignore") @@ -16,6 +18,7 @@ from sklearn.metrics import classification_report, confusion_matrix # Constants y_data = 'class' +y_values = ['normal', 'anomaly'] df_train, df_test, model_name = import_data( train_file_path = "nsl-kdd-dataset/" + "KDDTrain+.arff", test_file_path = "nsl-kdd-dataset/" + "KDDTest+.arff", @@ -24,17 +27,12 @@ sc = StandardScaler() enc = LabelEncoder() # Data inspection and pre-processing - -is_threat = df_train[y_data].unique() -if len(is_threat) != 2: - raise Exception("Logistic Regression only works for binary classification.") - numerical_columns = df_train.select_dtypes(include = np.number).columns label_columns = df_train.select_dtypes(include=object, exclude = np.number).columns # Normalize data -ordinal_encode(df = df_train, categories = is_threat, target = y_data) -ordinal_encode(df = df_test, categories = is_threat, target = y_data) +ordinal_encode(df = df_train, categories = y_values, target = y_data) +ordinal_encode(df = df_test, categories = y_values, target = y_data) normalize(df_train, df_test, y_data, sc, enc) @@ -100,6 +98,10 @@ print_high_confidence_samples(model, X_train) # Calculate prediction probabilities for ROC curve y_score = model.predict_proba(X_test)[:, 1] +# Plot learning curve +train_sizes, train_scores, test_scores = learning_curve(model, X_train, y_train.values.ravel(), cv=5, scoring="accuracy") +plot_learning_curve(train_sizes, train_scores, test_scores, model_name) + # Plot ROC curve using the function from utilities plot_roc_curve(y_test, y_score, model_name=model_name) diff --git a/code/machine_learning_models/random_forest.py b/code/machine_learning_models/random_forest.py index 2e5056d..5bdbbf0 100644 --- a/code/machine_learning_models/random_forest.py +++ b/code/machine_learning_models/random_forest.py @@ -14,6 +14,7 @@ warnings.filterwarnings("ignore") # Constants y_data = 'class' +y_values = ['normal', 'anomaly'] df_train, df_test, model_name = import_data( train_file_path = "nsl-kdd-dataset/" + "KDDTrain+.arff", test_file_path = "nsl-kdd-dataset/" + "KDDTest+.arff", @@ -21,15 +22,12 @@ df_train, df_test, model_name = import_data( sc = StandardScaler() enc = LabelEncoder() -print("Before Processing: \n", df_train) - -is_threat = df_train[y_data].unique() numerical_columns = df_train.select_dtypes(include=np.number).columns label_columns = df_train.select_dtypes(include=object, exclude=np.number).columns # Normalize data -ordinal_encode(df = df_train, categories = is_threat, target = y_data) -ordinal_encode(df = df_test, categories = is_threat, target = y_data) +ordinal_encode(df = df_train, categories = y_values, target = y_data) +ordinal_encode(df = df_test, categories = y_values, target = y_data) normalize(df_train, df_test, y_data, sc, enc) diff --git a/code/machine_learning_models/utilities.py b/code/machine_learning_models/utilities.py index 4406d32..cfbca48 100644 --- a/code/machine_learning_models/utilities.py +++ b/code/machine_learning_models/utilities.py @@ -10,24 +10,7 @@ from sklearn.preprocessing import OrdinalEncoder show_plots = False -def ordinal_encode(df, categories, target): - """ - Applies ordinal encoding to a specified categorical column in a DataFrame. - - Parameters: - df (pd.DataFrame): The input DataFrame. - categories (list): A list containing the ordered categories for encoding. - target (str): The column name to be encoded. - - Raises: - TypeError: If categories are not provided or do not contain exactly two elements. - - Modifies: - The function modifies the input DataFrame by replacing the target column with its encoded values. - """ - if categories is None or len(categories) != 2: - raise TypeError("Categories must be provided") - df[target] = OrdinalEncoder(categories = [categories]).fit_transform(df[[target]]) +# Plots def heat_map(df, model_name=None): """ @@ -108,21 +91,6 @@ def plot_features(features, info_text: str = None, model_name=None): if show_plots: plt.show() -def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler): - - df_temp = pd.concat([df_train, df_test]) - scale_targets = df_temp.select_dtypes(include=np.number).drop(columns=exclude).columns - numerical_scaler.fit_transform(df_temp[scale_targets]) - - df_train[scale_targets] = numerical_scaler.transform(df_train[scale_targets]) - df_test[scale_targets] = numerical_scaler.transform(df_test[scale_targets]) - - labels = df_train.select_dtypes(include=object, exclude=np.number).columns - for label in labels: - label_scaler.fit_transform(df_temp[label]) - df_train[label] = label_scaler.transform(df_train[label]) - df_test[label] = label_scaler.transform(df_test[label]) - def plot_confusion_matrix(confusion_matrix: List[List[int]], accuracy: float, model_name=None) -> None: if len(confusion_matrix) != 2 or any(len(row) != 2 for row in confusion_matrix): raise ValueError("Confusion matrices must be 2x2") @@ -137,14 +105,6 @@ def plot_confusion_matrix(confusion_matrix: List[List[int]], accuracy: float, mo if show_plots: plt.show() -def print_high_confidence_samples(model, x: pd.DataFrame): - # Get predicted probabilities - predicted_probabilities = pd.DataFrame(model.predict_proba(x)[:, 1], - columns=['confidence level']) # Probability of being class 1 - # Filter samples where the model is at least 90% sure - high_confidence_samples = predicted_probabilities[predicted_probabilities['confidence level'] > 0.9] - print(high_confidence_samples.head()) - def plot_precision_recall_curve(precision, recall, model_name=None): """ A Precision-Recall curve shows the trade-off between precision @@ -217,6 +177,8 @@ def plot_roc_curve(y_true, y_score, model_name=None): def save_plot(name): plt.savefig("resulting_figures/" + name, dpi=300, bbox_inches='tight') +# Data processing + def import_data(train_file_path: str, test_file_path: str, model_name: str): data, meta = arff.loadarff(train_file_path) df_train = pd.DataFrame(data) @@ -227,4 +189,47 @@ def import_data(train_file_path: str, test_file_path: str, model_name: str): df_test = pd.DataFrame(data) df_test = df_test.applymap(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x) - return df_train, df_test, model_name \ No newline at end of file + return df_train, df_test, model_name + +def ordinal_encode(df, categories, target): + """ + Applies ordinal encoding to a specified categorical column in a DataFrame. + + Parameters: + df (pd.DataFrame): The input DataFrame. + categories (list): A list containing the ordered categories for encoding. + target (str): The column name to be encoded. + + Raises: + TypeError: If categories are not provided or do not contain exactly two elements. + + Modifies: + The function modifies the input DataFrame by replacing the target column with its encoded values. + """ + if categories is None or len(categories) != 2: + raise TypeError("Categories must be provided") + df[target] = OrdinalEncoder(categories = [categories]).fit_transform(df[[target]]) + +def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler): + df_temp = pd.concat([df_train, df_test]) + scale_targets = df_temp.select_dtypes(include=np.number).drop(columns=exclude).columns + numerical_scaler.fit_transform(df_temp[scale_targets]) + + df_train[scale_targets] = numerical_scaler.transform(df_train[scale_targets]) + df_test[scale_targets] = numerical_scaler.transform(df_test[scale_targets]) + + labels = df_train.select_dtypes(include=object, exclude=np.number).columns + for label in labels: + label_scaler.fit_transform(df_temp[label]) + df_train[label] = label_scaler.transform(df_train[label]) + df_test[label] = label_scaler.transform(df_test[label]) + +# Additional metrics + +def print_high_confidence_samples(model, x: pd.DataFrame): + # Get predicted probabilities + predicted_probabilities = pd.DataFrame(model.predict_proba(x)[:, 1], + columns=['confidence level']) # Probability of being class 1 + # Filter samples where the model is at least 90% sure + high_confidence_samples = predicted_probabilities[predicted_probabilities['confidence level'] > 0.9] + print(high_confidence_samples.head()) \ No newline at end of file -- GitLab