Skip to content
Snippets Groups Projects
Commit 5ce38074 authored by Daniel Yang's avatar Daniel Yang
Browse files

final code clean up

parent ba00f656
No related branches found
No related tags found
No related merge requests found
import numpy as np
import pandas as pd
import seaborn as sns
import warnings
import os
......@@ -13,7 +12,6 @@ from utilities import plot_features, ordinal_encode, normalize, plot_confusion_m
warnings.filterwarnings("ignore")
# Constants
y_data = 'class'
y_columns = ['normal', 'anomaly']
......@@ -21,6 +19,8 @@ df_train, df_test, model_name = import_data(
train_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTrain+.arff"),
test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"),
model_name = "Decision Tree")
# Defining scalers
sc = StandardScaler()
enc = LabelEncoder()
......@@ -62,7 +62,6 @@ y_test = df_test[[y_data]]
# What is overfitting?
# Machine learning model "memorizes" the training data, rather than understanding the underlying pattern
# Training model
dtc = DecisionTreeClassifier()
def predict(prediction_input):
......@@ -76,6 +75,7 @@ def train():
print("Training complete.")
def graphs():
# Classification report
y_prediction = dtc.predict(X_test)
print("Classification report: \n", classification_report(y_test, y_prediction))
......@@ -85,11 +85,15 @@ def graphs():
columns=['Importance']).sort_values(by='Importance', ascending=False)
plot_features(features, model_name=model_name)
# Plot confusion matrix
plot_confusion_matrix(confusion_matrix=confusion_matrix(y_test, y_prediction),
accuracy=dtc.score(X_test, y_test),
model_name=model_name)
# Print samples for which the model is 90% confident
print_high_confidence_samples(model=dtc, x=X_train)
# Plot quantity of elements in class 0 and class 1
plot_counts(y_data, df_train)
print("Graphs complete.")
\ No newline at end of file
print("Graphs complete.")
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
import os
from sklearn.metrics import classification_report, confusion_matrix
......@@ -20,6 +19,7 @@ df_train, df_test, model_name = import_data(
test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"),
model_name="KNN")
# Defining scalers
sc = StandardScaler()
enc = LabelEncoder()
......@@ -49,6 +49,7 @@ def train():
print("Training complete.")
def graphs():
# Classification report
y_prediction = model.predict(X_test)
print("Classification report: \n", classification_report(y_test, y_prediction))
plot_confusion_matrix(confusion_matrix=confusion_matrix(y_test, y_prediction),
......@@ -58,6 +59,8 @@ def graphs():
# Calculate prediction probabilities for ROC curve
y_score = model.predict_proba(X_test)[:, 1]
plot_roc_curve(y_test, y_score, model_name=model_name)
# Plot quantity of elements in class 0 and class 1
plot_counts(y_data, df_train)
print("Graphs complete.")
\ No newline at end of file
print("Graphs complete.")
......@@ -72,7 +72,6 @@ y_test = df_test[[y_data]]
model = LogisticRegression()
def predict(prediction_input):
if len(prediction_input) == 0:
return
......
......@@ -23,6 +23,8 @@ df_train, df_test, model_name = import_data(
train_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTrain+.arff"),
test_file_path = os.path.join(os.path.dirname(__file__), "nsl-kdd-dataset/" + "KDDTest+.arff"),
model_name = "Random Forest")
# Defining scalers
sc = StandardScaler()
enc = LabelEncoder()
......@@ -41,9 +43,8 @@ X_test = df_test.select_dtypes(include=[np.number]).drop(columns = [y_data])
y_train = df_train[[y_data]]
y_test = df_test[[y_data]]
# Train Random Forest Model
model = RandomForestClassifier(n_estimators=100, max_depth=None, random_state=42)
# Prediction function
def predict(prediction_input):
if len(prediction_input) == 0:
return
......@@ -95,10 +96,7 @@ def graphs():
# Plot ROC curve using the function from utilities
plot_roc_curve(y_test, y_score, model_name=model_name)
# Plot quantity of elements in class 0 and class 1
plot_counts(y_data, df_train)
print("Graphs complete.")
if __name__ == "__main__":
train()
graphs()
\ No newline at end of file
......@@ -11,18 +11,13 @@ from sklearn.preprocessing import OrdinalEncoder
show_plots = False
y_data = ['normal', 'anomaly']
# Plots
def heat_map(df, model_name=None):
def heat_map(df, model_name=None) -> None:
"""
Generates a heatmap of the correlation matrix for numerical features in the DataFrame.
Parameters:
df (pd.DataFrame): The input DataFrame.
Modifies:
Displays a heatmap visualization of the feature correlations.
:param df: The given dataframe.
:param model_name: The name of the model to use.
"""
# Drop all NaN
df.dropna(axis='columns')
......@@ -35,17 +30,13 @@ def heat_map(df, model_name=None):
if show_plots:
plt.show()
def plot_xy(df, x, y, model_name=None):
def plot_xy(df, x, y, model_name=None) -> None:
"""
Creates a scatter plot for two numerical columns.
Parameters:
df (pd.DataFrame): The input DataFrame.
x (str): The column name for the x-axis.
y (str): The column name for the y-axis.
Modifies:
Displays a scatter plot of the two selected features.
:param df: The given dataframe.
:param x: First feature name to be plotted in the x-axis.
:param y: Second feature name to be plotted in the y-axis.
:param model_name: The name of the model to use.
"""
plt.scatter(df[x], df[y])
plt.xlabel(x)
......@@ -55,13 +46,12 @@ def plot_xy(df, x, y, model_name=None):
if show_plots:
plt.show()
def plot_features(features, info_text: str = None, model_name=None):
def plot_features(features, info_text: str = None, model_name=None) -> None:
"""
Parameters:
columns (list): The list of feature names used in the model.
Modifies:
Displays a horizontal bar chart representing feature importance.
Displays a bar graph with the importance of each feature.
:param features: The given dataframe with all feature importance.
:param info_text: Optional text in the legend.
:param model_name: The name of the model to use.
"""
plt.figure(figsize=(10, 10))
......@@ -88,6 +78,13 @@ def plot_features(features, info_text: str = None, model_name=None):
plt.show()
def plot_confusion_matrix(confusion_matrix: List[List[int]], accuracy: float, model_name=None) -> None:
"""
Plots the confusion matrix as a heatmap.
:param confusion_matrix: The given confusion matrix.
:param accuracy: Accuracy score of the given confusion matrix.
:param model_name: The name of the model to use.
:return:
"""
if len(confusion_matrix) != 2 or any(len(row) != 2 for row in confusion_matrix):
raise ValueError("Confusion matrices must be 2x2")
......@@ -106,8 +103,10 @@ def plot_precision_recall_curve(precision, recall, model_name=None):
A Precision-Recall curve shows the trade-off between precision
(how many predicted positives are actually correct) and recall
(how many actual positives were correctly identified).
A good curve is mostly at the top and right.
:param precision: Precision of the model.
:param recall: Recall of the model.
:param model_name: The name of the model to use.
:return:
"""
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, marker='.', label="Precision-Recall Curve")
......@@ -125,6 +124,10 @@ def plot_learning_curve(train_sizes, train_scores, test_scores, model_name=None)
"""
A learning curve helps diagnose overfitting or underfitting by plotting
training and validation performance as training size increases.
:param train_sizes: The train sizes of the model.
:param train_scores: The train scores of the model.
:param test_scores: The test scores of the model.
:param model_name: The name of the model to use.
"""
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
......@@ -148,16 +151,25 @@ def plot_learning_curve(train_sizes, train_scores, test_scores, model_name=None)
if show_plots:
plt.show()
def plot_counts(target, df):
def plot_counts(target, df) -> None:
"""
Display a bar graph counting the quantity of the target variable.
:param target: The target variable in the dataframe.
:param df: The given dataframe.
"""
plt.clf()
sns.countplot(x = target, data = df)
save_plot("Count")
if show_plots:
plt.show()
def plot_roc_curve(y_true, y_score, model_name=None):
def plot_roc_curve(y_true, y_score, model_name=None) -> None:
"""
Plots the ROC curve for a binary classification model.
:param y_true: The true labels.
:param y_score: The predicted probabilities.
:param model_name: The name of the model to use.
:return:
"""
fpr, tpr, _ = roc_curve(y_true, y_score)
roc_auc = auc(fpr, tpr)
......@@ -177,13 +189,23 @@ def plot_roc_curve(y_true, y_score, model_name=None):
if show_plots:
plt.show()
def save_plot(name):
def save_plot(name) -> None:
"""
Saves the current plot.
:param name: The name of the model to use.
"""
os.makedirs("resulting_figures", exist_ok=True)
plt.savefig("resulting_figures/" + name, dpi=300, bbox_inches='tight')
# Data processing
def import_data(train_file_path, test_file_path, model_name):
"""
Imports data from the given path.
:param train_file_path: The path to the training data file.
:param test_file_path: The path to the test data file.
:param model_name: The name of the model to use.
"""
data, meta = arff.loadarff(train_file_path)
df_train = pd.DataFrame(data)
df_train = df_train.map(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x)
......@@ -194,26 +216,28 @@ def import_data(train_file_path, test_file_path, model_name):
return df_train, df_test, model_name
def ordinal_encode(df, categories, target):
def ordinal_encode(df, categories, target) -> None:
"""
Applies ordinal encoding to a specified categorical column in a DataFrame.
Parameters:
df (pd.DataFrame): The input DataFrame.
categories (list): A list containing the ordered categories for encoding.
target (str): The column name to be encoded.
Raises:
TypeError: If categories are not provided or do not contain exactly two elements.
Modifies:
The function modifies the input DataFrame by replacing the target column with its encoded values.
:param df: The given dataframe.
:param categories: The possible values of target.
:param target: The column to be encoded.
:raises TypeError: Occurs when there are more than two possible values for target.
"""
if categories is None or len(categories) != 2:
raise TypeError("Categories must be provided")
df[target] = OrdinalEncoder(categories = [categories]).fit_transform(df[[target]])
def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler):
def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler) -> None:
"""
Normalize the data in the given dataframe.
:param df_train: The training dataframe.
:param df_test: The test dataframe.
:param exclude: Columns to be excluded from normalisation.
:param numerical_scaler: A scaler to normalize numerical values.
:param label_scaler: A scaler to normalize categorical values.
:return:
"""
df_temp = pd.concat([df_train, df_test])
scale_targets = df_temp.select_dtypes(include=np.number).drop(columns=exclude).columns
numerical_scaler.fit_transform(df_temp[scale_targets])
......@@ -229,7 +253,12 @@ def normalize(df_train, df_test, exclude, numerical_scaler, label_scaler):
# Additional metrics
def print_high_confidence_samples(model, x: pd.DataFrame):
def print_high_confidence_samples(model, x: pd.DataFrame) -> None:
"""
Prints in the output stream samples for which the model is 90% confident about.
:param model: The given model.
:param x: The features of the dataframe.
"""
# Get predicted probabilities
predicted_probabilities = pd.DataFrame(model.predict_proba(x)[:, 1],
columns=['confidence level']) # Probability of being class 1
......
......@@ -102,7 +102,7 @@ class TestPacketCapturing(unittest.TestCase):
time.sleep(1)
ip_rate_based_anomaly_detection(packet)
mock_print.assert_called()
self.assertEqual({}, get_dicts()[0], "Expected the packet's IP")
self.assertEqual({"100.84.6.141": 50}, get_dicts()[0], "Expected the packet's IP")
def test_icmp_flood_detection(self):
repetitions = 150
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment