diff --git a/3_ensembles/sheet_3.ipynb b/3_ensembles/sheet_3.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..52f2306d768814bfaff01d35383613ff3e51fe95 --- /dev/null +++ b/3_ensembles/sheet_3.ipynb @@ -0,0 +1,1290 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2432be9b", + "metadata": {}, + "source": [ + "# Skalierbare Methoden der Künstlichen Intelligenz\n", + "Dr. Charlotte Debus (<charlotte.debus@kit.edu>) \n", + "Dr. Markus Götz (<markus.goetz@kit.edu>) \n", + "Dr. Marie Weiel (<marie.weiel@kit.edu>) \n", + "Dr. Kaleb Phipps (<kaleb.phipps@kit.edu>) \n", + "\n", + "## Übung 3 am 17.12.24: Paralleles Ensemble Learning mit verteilten Random Forests\n", + "In der dritten Übung beschäftigen wir uns mit parallelem Ensemble Learning am Beispiel von Random Forests (siehe Vorlesung vom 28.11.2024). \n", + "Dazu verwenden wir [Scikit-learn](https://scikit-learn.org/stable/), die meist genutzte Software-Bibliothek für klassisches maschinelles Lernen in Python. Scikit-learn bietet verschiedene Klassifikations-, Regressions- und Clustering-Algorithmen, darunter Support-Vektor-Maschinen, Random Forests, Gradient Boosting (wie XGBoost), k-Means und DBSCAN. Das Package basiert auf den bekannten Python-Bibliotheken [NumPy](https://numpy.org) und [SciPy](https://scipy.org). \n", + "\n", + "Unser Ziel ist es, den [SUSY-Datensatz](https://archive.ics.uci.edu/ml/datasets/SUSY) mithilfe eines verteilten Random Forests zu klassifizieren. Der SUSY-Datensatz besteht aus 5M Monte-Carlo-Samples supersymmetrischer und nicht-supersymmetrischer Teilchenkollisionen. Der Signalprozess ist die Produktion elektrisch geladener, supersymmetrischer Teilchen, die in $W$-Bosonen und ein elektrisch neutrales, supersymmetrisches Teilchen zerfallen, welches für den Detektor unsichtbar ist. Bei diesem Klassifikationsproblem möchten wir anhand von insgesamt 18 Features zwischen diesem Signalprozess, der supersymmetrische Teilchen erzeugt, und einem Hintergrundprozess, der dies nicht tut, unterscheiden.\n", + "Bei den ersten acht Features handelt es sich um kinematische Eigenschaften, die von Detektoren in Teilchenbeschleunigern direkt gemessen werden können. Die verbleibenden zehn Features sind Funktionen der ersten acht und wurden von Physiker*innen abgeleitet, um eine Unterscheidung zwischen den beiden Klassen zu erleichtern. \n", + "Sie finden den kompletten SUSY-Datensatz im CSV-Format im Workspace auf dem Cluster: `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv`. Er enthält insgesamt $n_\\mathrm{samples}$ = 5M Samples. Die erste Spalte enthält die Klassenbezeichnungen (\"Labels\" bzw. \"Targets\"; 1 für Signal, 0 für Hintergrund) gefolgt von den $n_\\text{features}$ = 18 Features.\n", + "\n", + "Zur binären Klassifizierung dieses Datensatzes in die oben genannten Klassen nutzen wir einen verteilten [Random Forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html#sklearn.ensemble.RandomForestClassifier.predict) mit insgesamt $n_\\text{trees}$ Bäumen. Wir betrachten ein Setting mit $p$ Prozessoren. Jeder Prozessor $i=1,\\cdots,p$ hält \n", + "- einen Sub Random Forest mit $n_\\text{trees}^{i}=n_\\text{trees}/p$ Bäumen (vorbehaltlich Load Balancing), wobei $\\sum_{i=1}^p n_\\text{trees}^i = n_\\text{trees}$,\n", + "- eine gewisse Anzahl $n_\\text{train}^i = n_\\text{train}/p$ aller Trainings-Samples (vorbehaltlich Load Balancing) als lokalen Trainingsdatensatz sowie \n", + "- den kompletten, global einheitlichen Testdatensatz mit $n_\\text{test}$ Samples. \n", + "\n", + "Das bedeutet, dass es einen global einheitlichen Train-Test-Split $f_\\text{train}$ des gesamten Datensatzes gibt, wobei die Testdaten auf allen Prozessoren gleichermaßen vollständig vorliegen, wohingegen jeder Prozessor eine gewisse Anzahl an Samples aus dem globalen Trainingsdatensatz als lokal unterschiedliche Trainingsdaten erhält. Es gilt also:\n", + "\n", + "$$n_\\text{samples} = n_\\text{train} + n_\\text{test} = f_\\text{train}\\cdot n_\\text{samples}+(1-f_\\text{train})\\cdot n_\\text{samples} = \\sum_{i=1}^p n_\\text{train}^i+n_\\text{test}$$\n", + "\n", + "Jeder Prozessor trainiert seinen Sub Random Forest auf seinem lokalen Trainingsdatensatz. Zum Schluss werden die Teil-Ergebnisse über einen Majority Vote zu einem finalen Ergebnis kombiniert." + ] + }, + { + "cell_type": "markdown", + "id": "de426257", + "metadata": {}, + "source": [ + "### Aufgabe 1: Daten laden\n", + "Um unser Modell zu trainieren und zu testen, müssen wir zunächst die verfügbaren Daten laden. Nach dem Train-Test-Split soll jeder Prozessor eine gewisse Anzahl an Datenpunkten aus dem globalen Trainingsdatensatz *mit Zurücklegen* ziehen. Diese Samples bilden den jeweiligen lokalen Trainingsdatensatz. Dazu gibt es verschiedene Ansätze: \n", + "1. Alle Prozessoren laden einen entsprechenden Teil der Daten aus dem globalen Trainingsdatensatz echt-parallel: Dataloader `load_data_csv_parallel()`\n", + "2. Der Root-Prozess lädt den kompletten globalen Trainingsdatensatz und verteilt diese Samples entsprechend an alle Prozessoren mit `Scatterv`: Dataloader `load_data_csv_root()`\n", + "\n", + "Implementieren und testen Sie die zwei Varianten der Dataloader. Untenstehend finden Sie dazu Code-Gerüste der Funktionen `load_data_csv_parallel()` und `load_data_csv_root()` inklusive einiger Hilfsfunktionen sowie ein Test- und ein beispielhaftes Submitskript. Vollziehen Sie diese nach und vervollständigen Sie den Code an den markierten Stellen. **Normale Kommentare mit '#' beschreiben wie üblich den Code, in Zeilen mit '##' müssen Sie Code ergänzen.** Erstellen Sie entsprechende Python-Skripte mit allen Funktionsdefinitionen bzw. dem Main-Teil. Führen Sie das Test-Skript auf 2 und 4 Knoten des bwUniClusters aus. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "563f8376", + "metadata": {}, + "outputs": [], + "source": [ + "# IMPORTS\n", + "import pathlib\n", + "from typing import BinaryIO, List, Tuple, Union\n", + "\n", + "from mpi4py import MPI\n", + "from mpi4py.util.dtlib import from_numpy_dtype\n", + "import numpy as np\n", + "from sklearn.model_selection import train_test_split" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eabeeef7", + "metadata": {}, + "outputs": [], + "source": [ + "# HELPER FUNCTIONS FOR TRULY PARALLEL DATALOADER\n", + "# Put together with imports and other dataloaders into file `dataloaders.py`.\n", + "\n", + "def _determine_line_starts(\n", + " f: BinaryIO, comm: MPI.Comm, displs: np.ndarray, counts: np.ndarray\n", + ") -> Tuple[list, bytes, int]:\n", + " \"\"\"\n", + " Determine line starts in bytes from CSV file.\n", + "\n", + " Parameters\n", + " ----------\n", + " f : BinaryIO\n", + " The handle of CSV file to read from\n", + " comm : MPI.Comm\n", + " The communicator to use.\n", + " displs : np.ndarray\n", + " The displacements of byte chunk starts on each rank, length is `comm.size`.\n", + " counts : np.ndarray\n", + " The counts of the byte chunk on each rank, length is `comm.size`.\n", + "\n", + " Returns\n", + " -------\n", + " list\n", + " The line starts in each byte chunk.\n", + " bytes\n", + " The bytes chunk read in on each rank.\n", + " int\n", + " The number of chars used to indicate the line end in the file.\n", + " \"\"\"\n", + " rank = comm.rank # Set up communicator stuff.\n", + "\n", + " # Read bytes chunk and count linebreaks.\n", + " # \\r (Carriage Return) : Move cursor to line start w/o advancing to next line.\n", + " # \\n (Line Feed) : Move cursor down to next line w/o returning to line start.\n", + " # \\r\\n (End Of Line) : Combination of \\r and \\n.\n", + "\n", + " lineter_len = 1 # Set number of line termination chars.\n", + " line_starts = [] # Set up list to save line starts.\n", + " f.seek(displs[rank]) # Jump to pre-assigned position in file.\n", + " r = f.read(counts[rank]) # Read number of bytes from starting position.\n", + "\n", + " for pos, l in enumerate(r): # Determine line breaks in bytes chunk.\n", + " if chr(l) == \"\\n\": # Line terminated by '\\n' only.\n", + " if not chr(r[pos - 1]) == \"\\r\": # No \\r\\n.\n", + " line_starts.append(pos + 1)\n", + " elif chr(l) == \"\\r\":\n", + " if (\n", + " pos + 1 < len(r) and chr(r[pos + 1]) == \"\\n\"\n", + " ): # Line terminated by '\\r\\n'.\n", + " line_starts.append(pos + 2)\n", + " lineter_len = 2\n", + " else: # Line terminated by '\\r' only.\n", + " line_starts.append(pos + 1)\n", + " return line_starts, r, lineter_len\n", + "\n", + "\n", + "def _get_byte_pos_from_line_starts(\n", + " line_starts: np.ndarray, file_size: int, lineter_len: int\n", + ") -> np.ndarray:\n", + " \"\"\"\n", + " Get line starts and counts in byte from line starts to read lines via seek and read.\n", + "\n", + " Parameters\n", + " ----------\n", + " line_starts : np.ndarray\n", + " The absolute positions of line starts in bytes.\n", + " file_size : int\n", + " The absolute file size in bytes.\n", + " lineter_len : int\n", + " The number of line termination characters.\n", + "\n", + " Returns\n", + " -------\n", + " np.ndarray\n", + " An array containing vectors of (start, count) for lines in byte.\n", + " \"\"\"\n", + " lines_byte = [] # list for line starts and counts\n", + " for idx in range(len(line_starts)): # Loop through all line starts.\n", + " if idx == len(line_starts) - 1: # Special case for last line.\n", + " temp = [\n", + " line_starts[idx], # Line start in bytes\n", + " file_size\n", + " - line_starts[idx]\n", + " - lineter_len, # Bytes count of line length via difference\n", + " ]\n", + " else: # All other lines\n", + " temp = [\n", + " line_starts[idx], # Line start in bytes\n", + " line_starts[idx + 1]\n", + " - line_starts[idx]\n", + " - lineter_len, # Bytes count of line length via difference\n", + " ]\n", + " lines_byte.append(temp)\n", + " return np.array(lines_byte)\n", + "\n", + "\n", + "def _decode_bytes_array(\n", + " byte_pos: np.ndarray, f: BinaryIO, sep: str = \",\", encoding: str = \"utf-8\"\n", + ") -> List:\n", + " \"\"\"\n", + " Decode lines from byte positions and counts.\n", + "\n", + " Parameters\n", + " ----------\n", + " byte_pos : np.ndarray\n", + " The vectors of line starts and lengths in bytes.\n", + " f : BinaryIO\n", + " The handle of the CSV file to read from.\n", + " sep : char\n", + " The character used in the file to separate entries.\n", + " encoding : str\n", + " The encoding used to decode entries from bytes.\n", + "\n", + " Returns\n", + " -------\n", + " list\n", + " Values read from CSV file as numpy array entries in float format\n", + " \"\"\"\n", + " lines = [] # List for saving decoded lines\n", + " byte_pos = byte_pos[\n", + " byte_pos[:, 0].argsort()\n", + " ] # Sort line starts of data items to read from file in ascending order.\n", + " for item in byte_pos:\n", + " f.seek(item[0]) # Go to line start.\n", + " line = f.read(item[1]).decode(\n", + " encoding\n", + " ) # Read specified number of bytes and decode.\n", + " if len(line) > 0:\n", + " sep_values = [\n", + " float(val) for val in line.split(sep)\n", + " ] # Separate values in each line.\n", + " line = np.array(\n", + " sep_values\n", + " ) # Convert list of separated values to numpy array.\n", + " lines.append(line) # Append numpy data entry to output list.\n", + " return lines\n", + "\n", + "\n", + "def _split_indices_train_test(\n", + " n_samples: int, train_split: float, seed: int\n", + ") -> Tuple[int, int, np.ndarray, np.ndarray]:\n", + " \"\"\"\n", + " Make index-based train test split with shuffle.\n", + "\n", + " Parameters\n", + " ----------\n", + " n_samples : int\n", + " The overall number of samples in the dataset.\n", + " train_split : float\n", + " The fraction of the dataset used for training (0 < `train_split` < 1)\n", + " seed : int\n", + " The seed used for the random number generator.\n", + "\n", + " Returns\n", + " -------\n", + " n_train_samples : int\n", + " The number of train samples.\n", + " n_test_samples : int\n", + " The number of test samples.\n", + " train_indices : np.ndarray\n", + " The indices of the samples in the dataset used for training.\n", + " test_indices : np.ndarray\n", + " The indices of the samples in the dataset used for testing.\n", + " \"\"\"\n", + " ## DETERMINE GLOBAL NUMBER OF TRAIN SAMPLES FROM SPLIT AND OVERALL NUMBER OF SAMPLES.\n", + " ## n_train_samples = int(...)\n", + " ## DETERMINE GLOBAL NUMBER OF TEST SAMPLES FROM OVERALL NUMBER OF SAMPLES AND GLOBAL NUMBER OF TRAIN SAMPLES.\n", + " ## n_test_samples = ...\n", + " \n", + " rng = np.random.default_rng(\n", + " seed=seed\n", + " ) # Set same seed over all ranks for consistent train-test split.\n", + " all_indices = np.arange(0, n_samples) # Construct array of all indices.\n", + " rng.shuffle(all_indices) # Shuffle them.\n", + " \n", + " ## EXTRACT FIRST `n_train_samples` INDICES FROM SHUFFLED INDICES ARRAY `all_indices` TO FORM TRAIN SET. \n", + " ## train_indices = all_indices[...]\n", + " ## REMAINING ONES FORM TEST SET.\n", + " ## test_indices = ...\n", + " \n", + " return n_train_samples, n_test_samples, train_indices, test_indices" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c0fbac25", + "metadata": {}, + "outputs": [], + "source": [ + "# TRULY PARALLEL DATALOADER\n", + "# Put together with imports and other dataloaders into file `dataloaders.py`.\n", + "\n", + "def load_data_csv_parallel(\n", + " path_to_data: Union[pathlib.Path, str],\n", + " header_lines: int,\n", + " comm: MPI.Comm = MPI.COMM_WORLD,\n", + " train_split: float = 0.75,\n", + " verbose: bool = True,\n", + ") -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:\n", + " \"\"\"\n", + " Load data from CSV file in truly parallel fashion.\n", + "\n", + " Parameters\n", + " ----------\n", + " path_to_data : str | pathlib.Path\n", + " The path to the CSV file.\n", + " header_lines : int\n", + " The number of header lines.\n", + " comm : MPI.Comm\n", + " The communicator to use.\n", + " train_split : float\n", + " The train-test split fraction.\n", + " verbose : bool\n", + " The verbosity level.\n", + "\n", + " Returns\n", + " -------\n", + " train_samples_local : np.ndarray\n", + " The rank-dependent train samples.\n", + " train_targets_local : np.ndarray\n", + " The rank-dependent train targets.\n", + " test_samples : np.ndarray\n", + " The global test samples.\n", + " test_targets : np.ndarray\n", + " The global test targets.\n", + " \"\"\"\n", + " rank, size = comm.rank, comm.size # Set up communicator stuff.\n", + "\n", + " ## GET FILE SIZE IN BYTES.\n", + " ## file_size = ...\n", + "\n", + " # Determine displs + counts of bytes chunk to read on each rank.\n", + " base = file_size // size # Determine base chunk size for each process.\n", + " remainder = file_size % size # Determine remainder bytes.\n", + " counts = base * np.ones( # Construct array with each rank's chunk counts.\n", + " (size,), dtype=int\n", + " )\n", + " if (\n", + " remainder > 0\n", + " ): # Equally distribute remainder over respective ranks to balance load.\n", + " counts[:remainder] += 1\n", + " displs = np.concatenate( # Determine displs via cumulative sum from counts.\n", + " (np.zeros((1,), dtype=int), np.cumsum(counts, dtype=int)[:-1])\n", + " )\n", + "\n", + " if rank == 0:\n", + " print(f\"File size is {file_size} bytes.\")\n", + "\n", + " if rank == 0 and verbose:\n", + " print(f\"Displs {displs}, counts {counts} for reading bytes chunks from file.\")\n", + "\n", + " with open(path_to_data, \"rb\") as f: # Open csv file to read from.\n", + " # Determine line starts in bytes chunks on each rank.\n", + " line_starts, r, lineter_len = _determine_line_starts(f, comm, displs, counts)\n", + "\n", + " # On rank 0, add very first line.\n", + " if rank == 0:\n", + " line_starts = [0] + line_starts\n", + "\n", + " if verbose:\n", + " print(f\"[{rank}/{size}]: {len(line_starts)} line starts in chunk.\")\n", + "\n", + " # Find correct starting point, considering header lines.\n", + " # All-gather numbers of line starts in each chunk in `total_lines` array.\n", + " total_lines = np.empty(size, dtype=int)\n", + " comm.Allgather(\n", + " [np.array(len(line_starts), dtype=int), MPI.INT], [total_lines, MPI.INT]\n", + " )\n", + " cumsum = list(np.cumsum(total_lines))\n", + " # Determine rank where actual data lines start,\n", + " # i.e. remove ranks only containing header lines.\n", + " start = next(i for i in range(size) if cumsum[i] > header_lines)\n", + " if verbose:\n", + " print(\n", + " f\"[{rank}/{size}]: total_lines is {total_lines}.\\n\"\n", + " f\"[{rank}/{size}]: cumsum is {cumsum}.\\n\"\n", + " f\"[{rank}/{size}]: start is {start}.\"\n", + " )\n", + "\n", + " if rank < start: # Ranks containing only header lines.\n", + " line_starts = []\n", + " if rank == start: # Rank containing header + data lines.\n", + " rem = header_lines - (0 if start == 0 else cumsum[start - 1])\n", + " line_starts = line_starts[rem:]\n", + "\n", + " # Share line starts of data samples across all ranks via Allgatherv.\n", + " line_starts += displs[\n", + " rank\n", + " ] # Shift line starts on each rank according to displs.\n", + " if verbose:\n", + " print(\n", + " f\"[{rank}/{size}]: {len(line_starts)} line starts of shape {line_starts.shape} and type \"\n", + " f\"{line_starts.dtype} in local chunk: {line_starts}\"\n", + " )\n", + " count_linestarts = np.array( # Determine local number of line starts.\n", + " len(line_starts), dtype=int\n", + " )\n", + " counts_linestarts = (\n", + " np.empty( # Initialize array to all-gather local numbers of line starts.\n", + " size, dtype=int\n", + " )\n", + " )\n", + " comm.Allgather([count_linestarts, MPI.INT], [counts_linestarts, MPI.INT])\n", + " n_linestarts = np.sum(\n", + " counts_linestarts\n", + " ) # Determine overall number of line starts.\n", + " displs_linestarts = (\n", + " np.concatenate( # Determine displacements of line starts from counts.\n", + " (\n", + " np.zeros((1,), dtype=int),\n", + " np.cumsum(counts_linestarts, dtype=int)[:-1],\n", + " ),\n", + " dtype=int,\n", + " )\n", + " )\n", + " if verbose and rank == 0:\n", + " print(\n", + " f\"Overall {n_linestarts} linestarts.\\n\"\n", + " f\"Number of linestarts in each chunk is {counts_linestarts}.\\n\"\n", + " f\"Displs of linestarts in each chunk is {displs_linestarts}.\"\n", + " )\n", + "\n", + " all_line_starts = (\n", + " np.empty( # Initialize array to all-gatherv line starts from all ranks.\n", + " (n_linestarts,), dtype=line_starts.dtype\n", + " )\n", + " )\n", + " if verbose and rank == 0:\n", + " print(\n", + " f\"Recvbuf {all_line_starts}, {all_line_starts.shape}, {all_line_starts.dtype}.\"\n", + " )\n", + " comm.Allgatherv(\n", + " line_starts,\n", + " [\n", + " all_line_starts,\n", + " counts_linestarts,\n", + " displs_linestarts,\n", + " from_numpy_dtype(line_starts.dtype),\n", + " ],\n", + " )\n", + " # Line starts were determined as those positions following a line end.\n", + " # But: There is no line after last line end in file.\n", + " # Thus, remove last entry from all_line_starts.\n", + " all_line_starts = all_line_starts[:-1]\n", + " if rank == 0:\n", + " print(f\"After Allgatherv: All line starts: {all_line_starts}\")\n", + " \n", + " ## CONSTRUCT ARRAY WITH LINE STARTS AND LENGTHS (OR COUNTS) IN BYTES.\n", + " ## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE.\n", + " ## lines_byte = ...\n", + "\n", + " ## MAKE INDEX-BASED GLOBAL TRAIN-TEST SPLIT.\n", + " ## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE.\n", + " ## TAKE CARE OF CHOOSING THE SEEDS PROPERLY TO ENSURE A GLOBALLY CONSISTENS SPLIT OVER ALL RANKS.\n", + " ## n_train_samples, n_test_samples, train_indices, test_indices = ...\n", + "\n", + " n_train_samples_local = (\n", + " n_train_samples // size\n", + " ) # Determine local train dataset size.\n", + " remainder_train = n_train_samples % size # Balance load.\n", + " if rank < remainder_train:\n", + " n_train_samples_local += 1\n", + "\n", + " # Construct global held-out test dataset (same on each rank).\n", + " ## CONSTRUCT ACTUAL TEST SET FROM INDEX-BASED TRAIN-TEST SPLIT.\n", + " ## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE TO READ THE \n", + " ## ACTUAL DATA ENTRIES SELECTIVELY FROM THE FILE (AS SPECIFIED BY THE INDICES).\n", + " ## test_lines = ...\n", + " \n", + " test_samples = np.array(test_lines)[:, 1:]\n", + " test_targets = np.array(test_lines)[:, 0]\n", + " if verbose:\n", + " print(\n", + " f\"[{rank}/{size}]: Test samples: {test_samples[0]}\\n\"\n", + " f\"Test targets: {test_targets[0]}\\n\"\n", + " )\n", + "\n", + " # Construct train dataset (different on each rank!).\n", + " ## DRAW `train_samples_local` SAMPLES FROM ALL TRAIN SAMPLES WITH REPETITION.\n", + " ## THE RESULTING LOCAL TRAIN DATASET SHOULD BE DIFFERENT ON EACH RANK.\n", + " ## DO THIS IN AN INDICES-BASED FASHION.\n", + " ## train_indices_local = ...\n", + " ## USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE TO READ THE ACTUAL DATA ENTRIES\n", + " ## SELECTIVELY FROM THE FILE (AS SPECIFIED BY THE INDICES).\n", + " ## train_lines_local = ...\n", + " \n", + " train_samples_local = np.array(train_lines_local)[:, 1:]\n", + " train_targets_local = np.array(train_lines_local)[:, 0]\n", + "\n", + " # Now, each rank holds a local train set (samples + targets)\n", + " # and the global held-out test set (samples + targets).\n", + " return train_samples_local, train_targets_local, test_samples, test_targets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25acd308", + "metadata": {}, + "outputs": [], + "source": [ + "# ROOT-BASED DATALOADER WITH SCATTERV\n", + "# Put together with imports and other dataloaders into file `dataloaders.py`.\n", + "\n", + "def load_data_csv_root(\n", + " path_to_data: Union[str, pathlib.Path],\n", + " header_lines: int,\n", + " comm: MPI.Comm,\n", + " seed: int,\n", + " train_split: float = 0.75,\n", + " verbose: bool = True,\n", + ") -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:\n", + " \"\"\"\n", + " Load data from CSV file via root process.\n", + "\n", + " Parameters\n", + " ----------\n", + " path_to_data : str | pathlib.Path\n", + " The path to the CSV file.\n", + " header_lines : int\n", + " The number of header lines.\n", + " comm : MPI.Comm\n", + " The communicator to use.\n", + " seed : int\n", + " The seed used for train-test split.\n", + " train_split : float\n", + " The train-test split fraction.\n", + " verbose : bool\n", + " The verbosity level\n", + "\n", + " Returns\n", + " -------\n", + " np.ndarray\n", + " The rank-dependent train samples.\n", + " np.ndarray\n", + " The rank-dependent train targets.\n", + " np.ndarray\n", + " The global test samples.\n", + " np.ndarray\n", + " The global test targets.\n", + " \"\"\"\n", + " rank, size = comm.rank, comm.size # Set up communicator stuff.\n", + "\n", + " if rank == 0:\n", + " \n", + " ## LOAD COMPLETE DATA INTO NUMPY ARRAY.\n", + " ## data = ...\n", + " \n", + " ## DIVIDE DATA INTO SAMPLES AND TARGETS.\n", + " ## First column contains targets, followed by the 18 features forming the samples.\n", + " ## samples = data[...]\n", + " ## targets = ...\n", + " \n", + " ## PERFORM TRAIN-TEST SPLIT USING THE `train_test_split` HELPER FUNCTION IN sklearn.\n", + " ## samples_train, samples_test, targets_train, targets_test = ...\n", + " ##\n", + " ## DETERMINE THE NUMBER OF SAMPLES IN THE TRAIN SET.\n", + " ## n_train_samples = ...\n", + " ## DETERMINE THE NUMBER OF SAMPLES IN THE TEST SET.\n", + " ## n_test_samples = ...\n", + " ## DETERMINE THE NUMBER OF FEATURES.\n", + " ## n_features = ...\n", + " \n", + " n_train_samples_local = (\n", + " n_train_samples // size\n", + " ) # Determine rank-local number of train samples.\n", + " remainder_train = n_train_samples % size\n", + " train_counts = n_train_samples_local * np.ones(\n", + " size, dtype=int\n", + " ) # Determine load-balanced counts and displacements.\n", + " for idx in range(remainder_train):\n", + " train_counts[idx] += 1\n", + " train_displs = np.concatenate(\n", + " (np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int\n", + " )\n", + " print(\n", + " f\"There are {n_train_samples} train and {n_test_samples} test samples.\\n\"\n", + " f\"Local train samples: {train_counts}\"\n", + " )\n", + " \n", + " # Construct train dataset.\n", + " ## SAMPLE `n_train_samples` INDICES FROM TRAIN SET WITH REPLACEMENT.\n", + " ## train_indices = ...\n", + " ## CONSTRUCT ACTUAL TRAIN DATASET FROM DRAWN INDICES (REPETITIONS POSSIBLE!)\n", + " ## samples_train_shuffled = ...\n", + " ## targets_train_shuffled = ...\n", + " \n", + " send_buf_train_samples = [\n", + " samples_train_shuffled,\n", + " train_counts * n_features,\n", + " train_displs * n_features,\n", + " from_numpy_dtype(samples_train_shuffled.dtype),\n", + " ]\n", + " send_buf_train_targets = [\n", + " targets_train_shuffled,\n", + " train_counts,\n", + " train_displs,\n", + " from_numpy_dtype(targets_train_shuffled.dtype),\n", + " ]\n", + "\n", + " else:\n", + " train_counts = None\n", + " n_features = None\n", + " n_test_samples = None\n", + " send_buf_train_samples = None\n", + " send_buf_train_targets = None\n", + "\n", + " ## BROADCAST NUMBER OF FEATURES FROM ROOT TO ALL OTHERS.\n", + " ## n_features = ...\n", + " ## BROADCAST NUMBER OF TEST SAMPLES FROM ROOT TO ALL OTHERS.\n", + " ## n_test_samples = ...\n", + " ## BROADCAST TRAIN COUNTS FROM ROOT TO ALL OTHERS.\n", + " ## train_counts = ...\n", + " \n", + " samples_train_local = np.empty((train_counts[rank], n_features), dtype=float)\n", + " targets_train_local = np.empty((train_counts[rank],), dtype=float)\n", + " recv_buf_train_samples = [\n", + " samples_train_local,\n", + " from_numpy_dtype(samples_train_local.dtype),\n", + " ]\n", + " recv_buf_train_targets = [\n", + " targets_train_local,\n", + " from_numpy_dtype(targets_train_local.dtype),\n", + " ]\n", + " if rank != 0:\n", + " samples_test = np.empty((n_test_samples, n_features), dtype=float)\n", + " targets_test = np.empty((n_test_samples,), dtype=float)\n", + " \n", + " ## SCATTERV LOCAL TRAIN SAMPLES `samples_train_local` FROM ROOT AT RANK 0 TO ALL OTHERS.\n", + " ## TO DO SO, MAKE USE OF THE SEND AND RECEIVE BUFFERS DEFINED ABOVE.\n", + " ## SCATTERV LOCAL TRAIN TARGETS `targets_train_local` FROM ROOT AT RANK 0 TO ALL OTHERS.\n", + " ## TO DO SO, MAKE USE OF THE SEND AND RECEIVE BUFFERS DEFINED ABOVE.\n", + " ## BROADCAST TEST SAMPLES `samples_test` FROM ROOT AT RANK 0 TO ALL OTHERS.\n", + " ## BROADCAST TEST TARGETS `targets_test` FROM ROOT AT RANK 0 TO ALL OTHERS.\n", + " \n", + " if verbose:\n", + " print(\n", + " f\"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\\n\"\n", + " f\"Train targets after Scatterv have shape {targets_train_local.shape}.\\n\"\n", + " f\"Test samples after Bcast have shape {samples_test.shape}.\\n\"\n", + " f\"Test targets after Bcast have shape {targets_test.shape}.\"\n", + " )\n", + " return samples_train_local, targets_train_local, samples_test, targets_test\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fe69716a", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"Test dataloaders.\"\"\"\n", + "import argparse\n", + "\n", + "from mpi4py import MPI\n", + "\n", + "from dataloaders import load_data_csv_parallel, load_data_csv_root\n", + "from distributed_forest import MPITimer\n", + "\n", + "if __name__ == \"__main__\":\n", + " # Set up communicator.\n", + " comm = MPI.COMM_WORLD\n", + " rank, size = comm.rank, comm.size\n", + "\n", + " # Data file specifications\n", + " header_lines = 0\n", + " sep = \",\"\n", + " encoding = \"utf-8\"\n", + " path_to_data = \"/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv\"\n", + "\n", + " parser = argparse.ArgumentParser(prog=\"Distributed Random Forest\")\n", + " parser.add_argument(\n", + " \"--train_split\",\n", + " type=float,\n", + " default=0.75,\n", + " help=\"The fraction of the dataset used for training.\",\n", + " )\n", + " parser.add_argument(\n", + " \"--verbose\",\n", + " action=\"store_true\",\n", + " help=\"The verbosity.\",\n", + " )\n", + " args = parser.parse_args()\n", + "\n", + " seed = size\n", + "\n", + " print(f\"[{rank}/{size}]: Loading data using truly parallel dataloader...\")\n", + " with MPITimer(comm, name=\"truly parallel data loading\"):\n", + " (\n", + " train_samples_local_par,\n", + " train_targets_local_par,\n", + " test_samples_par,\n", + " test_targets_par,\n", + " ) = load_data_csv_parallel(\n", + " path_to_data, header_lines, comm, args.train_split, args.verbose\n", + " )\n", + "\n", + " print(f\"[{rank}/{size}]: Loading data using root-based dataloader...\")\n", + " with MPITimer(comm, name=\"root-based data loading\"):\n", + " (\n", + " train_samples_local_root,\n", + " train_targets_local_root,\n", + " test_samples_root,\n", + " test_targets_root,\n", + " ) = load_data_csv_root(\n", + " path_to_data, header_lines, comm, seed, args.train_split, args.verbose\n", + " )\n", + "\n", + " print(\n", + " f\"[{rank}/{size}]: DONE.\\n\"\n", + " f\"Parallel: Local train samples / targets have shapes \"\n", + " f\"{train_samples_local_par.shape} / {train_targets_local_par.shape}.\\n\"\n", + " f\"Parallel: Global test samples / targets have shapes {test_samples_par.shape} / {test_targets_par.shape}.\\n\"\n", + " f\"Root: Local train samples / targets have shapes \"\n", + " f\"{train_samples_local_root.shape} / {train_targets_local_root.shape}.\\n\"\n", + " f\"Root: Global test samples / targets have shapes {test_samples_root.shape} / {test_targets_root.shape}.\"\n", + " )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0bed7db5", + "metadata": {}, + "outputs": [], + "source": [ + "#!/bin/bash\n", + "\n", + "#SBATCH --job-name=dataloader # job name\n", + "#SBATCH --partition=dev_multiple # queue for the resource allocation\n", + "#SBATCH --nodes=2\n", + "#SBATCH --time=10:00 # wall-clock time limit \n", + "#SBATCH --ntasks-per-node=1 # maximum count of tasks per node\n", + "#SBATCH --cpus-per-task=40\n", + "#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.\n", + "\n", + "export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}\n", + "export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.\n", + "export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.\n", + "\n", + "# Set up modules.\n", + "module purge # Unload all currently loaded modules.\n", + "module load compiler/gnu/13.3 # Load required modules.\n", + "module load mpi/openmpi/4.1\n", + "module load devel/cuda/12.4\n", + "module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1\n", + "\n", + "source ${VENVDIR}/bin/activate # Activate your virtual environment.\n", + "\n", + "mpirun python ${PYDIR}/test_dataloaders.py" + ] + }, + { + "cell_type": "markdown", + "id": "7c0910e3", + "metadata": {}, + "source": [ + "### Aufgabe 2\n", + "Laden Sie die Daten mit den verschiedenen Dataloadern und trainieren Sie einen verteilen Random Forest auf 1, 2, 4, 8, 16, 32 und 64 rein CPU-basierten Knoten des bwUniClusters. Untenstehend finden Sie ein entsprechendes Code-Gerüst sowie ein beispielhaftes Submit-Bash-Skript für 2 Knoten. Vervollständigen Sie den Code an den markierten Stellen. \n", + "\n", + "**Normale Kommentare mit '#' beschreiben wie üblich den Code, in Zeilen mit '##' müssen Sie Code ergänzen.**\n", + "Erstellen Sie ein vollständiges Python-Skript mit allen Funktionsdefinitionen sowie dem Main-Teil. Messen und vergleichen Sie:\n", + "- die Datenladedauer\n", + "- die Trainingsdauer\n", + "- die lokale Accuracy der Sub Random Forests auf den Testdaten (auf jedem Rank)\n", + "- die globale Accuracy des finalen globalen Random Forest auf den Testdaten\n", + "\n", + "Plotten Sie die oben genannten Größen über die Anzahl der verwendeten Knoten. Was fällt Ihnen auf? Welche Trends können Sie beobachten und wie lassen sich diese erklären?" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "deed7fc1", + "metadata": {}, + "outputs": [], + "source": [ + "# IMPORTS\n", + "# Put together with helper functions and main (see below) into `distributed_forest.py`.\n", + "\n", + "import argparse\n", + "import string\n", + "import time\n", + "from typing import Tuple\n", + "\n", + "import numpy as np\n", + "from mpi4py import MPI\n", + "from sklearn.ensemble import RandomForestClassifier\n", + "\n", + "from dataloaders import load_data_csv_parallel, load_data_csv_root" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d6590961-723f-41a3-b364-2488f9dd2694", + "metadata": {}, + "outputs": [], + "source": [ + "# HELPER FUNCTIONS\n", + "# Put together with imports (see above) and main (see below) into `distributed_forest.py`.\n", + "\n", + "class MPITimer:\n", + " \"\"\"A distributed context-manager enabled timer.\"\"\"\n", + "\n", + " def __init__(\n", + " self,\n", + " comm: MPI.Comm,\n", + " print_on_exit: bool = True,\n", + " name: str = \"\",\n", + " output_format: str = \"Elapsed time {name}: global average {elapsed_time_average:.2g}s, \"\n", + " \"local {elapsed_time_local:.2g}s\",\n", + " ) -> None:\n", + " \"\"\"\n", + " Create a new distributed context-manager enabled timer.\n", + "\n", + " Parameters\n", + " ----------\n", + " comm : MPI.Comm\n", + " The MPI communicator.\n", + " print_on_exit : bool\n", + " Whether to print the measured time in __exit__.\n", + " name : str\n", + " Label describing what this timer measured, can be used for printing the results.\n", + " output_format : str\n", + " Format string template used for printing the output. May reference all attributes of the\n", + " \"\"\"\n", + " self.comm = comm\n", + " self.output_format = output_format\n", + " self.print_on_exit = print_on_exit\n", + " self.name = name\n", + "\n", + " self.start_time = None\n", + " self.end_time = None\n", + " self.elapsed_time_local = None\n", + " self.elapsed_time_average = None\n", + "\n", + " def start(self) -> None:\n", + " \"\"\"Start the timer by setting the start time.\"\"\"\n", + " self.start_time = time.perf_counter()\n", + "\n", + " def stop(self) -> None:\n", + " \"\"\"Stop the timer by setting the end time and updating elapsed_time_local.\"\"\"\n", + " self.end_time = time.perf_counter()\n", + " self.elapsed_time_local = self.end_time - self.start_time\n", + "\n", + " def allreduce_for_average_time(self) -> None:\n", + " \"\"\"Compute the global average using allreduce and update elapsed_time_average.\"\"\"\n", + " self.elapsed_time_average = (\n", + " self.comm.allreduce(self.elapsed_time_local, op=MPI.SUM) / self.comm.size\n", + " )\n", + "\n", + " def print(self) -> None:\n", + " \"\"\"Print the elapsed time using the given template.\"\"\"\n", + " template_keywords = {\n", + " key for (_, key, _, _) in string.Formatter().parse(self.output_format)\n", + " }\n", + " template_kwargs = {\n", + " key: value for key, value in vars(self).items() if key in template_keywords\n", + " }\n", + " print(self.output_format.format(**template_kwargs))\n", + "\n", + " def __enter__(self) -> \"MPITimer\":\n", + " \"\"\"\n", + " Called on entering this context (e.g. with a 'with' statement), starts the timer.\n", + "\n", + " Returns\n", + " -------\n", + " MPITimer\n", + " This timer object.\n", + " \"\"\"\n", + " self.start()\n", + " return self\n", + "\n", + " def __exit__(self, *args) -> None:\n", + " \"\"\"\n", + " Called on exiting this context (e.g. after a 'with' statement), stops the timer, computes the global average\n", + " and optionally prints the result on rank 0.\n", + "\n", + " Parameters\n", + " ----------\n", + " args :\n", + " Unused, only to fulfill ``__exit__`` interface.\n", + " \"\"\"\n", + " self.stop()\n", + " self.allreduce_for_average_time()\n", + " if self.print_on_exit and self.comm.rank == 0:\n", + " self.print()\n", + "\n", + "\n", + "class DistributedRandomForest:\n", + " \"\"\"\n", + " Distributed random forest class.\n", + "\n", + " Attributes\n", + " ----------\n", + " acc_global : float | None\n", + " The global accuracy.\n", + " acc_local : float | None\n", + " Each local trained model's accuracy.\n", + " clf : RandomForestClassifier | None\n", + " The sklearn classifier.\n", + " comm : MPI.Comm\n", + " The communicator.\n", + " n_trees_base : int\n", + " The base number of rank-local trees.\n", + " n_trees_global : int\n", + " The number of trees in the global forest.\n", + " n_trees_local : int\n", + " The number of local trees.\n", + " n_trees_remainder : int\n", + " The remaining number of trees to distribute.\n", + " random_state : int\n", + " The base random state for sklearn random forest.\n", + "\n", + " Methods\n", + " -------\n", + " _distribute_trees()\n", + " Distribute trees evenly over all processors.\n", + " _train_local_classifier()\n", + " Train local random forest classifier.\n", + " _predict_locally()\n", + " Get predictions of all sub estimators in random forest.\n", + " _get_predicted_class_hist()\n", + " Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.\n", + " _calc_majority_vote_hist()\n", + " Calculate majority vote from sample-wise histograms of predicted classes.\n", + " train()\n", + " Train random forest in parallel.\n", + " test()\n", + " Test trained distributed global random forest.\n", + " \"\"\"\n", + "\n", + " def __init__(\n", + " self,\n", + " n_trees_global: int,\n", + " comm: MPI.Comm,\n", + " random_state: int,\n", + " ) -> None:\n", + " \"\"\"\n", + " Initialize distributed random forest object.\n", + "\n", + " Parameters\n", + " ----------\n", + " n_trees_global : int\n", + " The number of trees in the global forest.\n", + " comm : MPI.Comm\n", + " The communicator to use.\n", + " random_state : int\n", + " The base random state for sklearn RF.\n", + " \"\"\"\n", + " self.n_trees_global = n_trees_global\n", + " self.comm = comm\n", + " (\n", + " self.n_trees_base,\n", + " self.n_trees_remainder,\n", + " self.n_trees_local,\n", + " ) = self._distribute_trees()\n", + " self.random_state = random_state + self.comm.rank\n", + " self.clf = None # Local random forest classifier\n", + " self.acc_global = None # Accuracy of global classifier\n", + " self.acc_local = None # Accuracy of each rank-local classifier\n", + "\n", + " def _distribute_trees(self) -> Tuple[int, int, int]:\n", + " \"\"\"\n", + " Distribute trees evenly over all processors.\n", + "\n", + " Returns\n", + " -------\n", + " int\n", + " The base number of rank-local trees.\n", + " int\n", + " The remaining number of trees to distribute.\n", + " int\n", + " The final number of rank-local trees.\n", + " \"\"\"\n", + " size, rank = self.comm.size, self.comm.rank\n", + " ## DETERMINE BASE NUMBER OF TREES ON EACH RANK FROM GLOBAL NUMBER OF TREES AND COMM SIZE.\n", + " ## n_trees_base = ... # base number of rank-local trees\n", + " ## DETERMINE THE REMAINDER TO BE DISTRIBUTED EVENLY OVER THE FIRST RANKS.\n", + " ## n_trees_remainder = ... # remaining number of trees to distribute\n", + " ## DETERMINE THE FINAL LOAD-BALANCED LOCAL NUMBER OF TREES ON EACH RANK FROM THE \n", + " ## BASE NUMBER AND THE REMAINDER.\n", + " ## n_trees_local = ... # final number of local trees\n", + " return n_trees_base, n_trees_remainder, n_trees_local\n", + "\n", + " def _train_local_classifier(\n", + " self, train_samples: np.ndarray, train_targets: np.ndarray\n", + " ) -> RandomForestClassifier:\n", + " \"\"\"\n", + " Train local random forest classifier.\n", + "\n", + " Params\n", + " ------\n", + " train_samples : numpy.ndarray\n", + " samples of train dataset\n", + " train_targets : numpy.ndarray\n", + " targets of train dataset\n", + "\n", + " Returns\n", + " -------\n", + " sklearn.ensemble.RandomForestClassifier\n", + " trained model\n", + " \"\"\"\n", + " ## SET UP RANDOM FOREST CLASSSIFIER IN sklearn WITH RANK-LOCAL NUMBER OF TREES\n", + " ## AND RANDOM SEED.\n", + " ## clf = \n", + " ## TRAIN CLASSIFIER ON TRAINING DATASET.\n", + " ## RETURN TRAINED CLASSIFIER.\n", + "\n", + " def _predict_locally(self, samples: np.ndarray) -> np.ndarray:\n", + " \"\"\"\n", + " Get predictions of all sub estimators in random forest.\n", + "\n", + " Parameters\n", + " ----------\n", + " samples : numpy.ndarray\n", + " The samples whose class to predict.\n", + "\n", + " Returns\n", + " -------\n", + " numpy.ndarray\n", + " The predictions of each sub estimator on the samples.\n", + " \"\"\"\n", + " return np.array(\n", + " [tree.predict(samples) for tree in self.clf.estimators_], dtype=\"H\"\n", + " )\n", + "\n", + " def _get_predicted_class_hist(\n", + " self, tree_wise_predictions: np.ndarray, n_classes: int\n", + " ) -> np.ndarray:\n", + " \"\"\"\n", + " Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.\n", + "\n", + " Parameters\n", + " ----------\n", + " tree_wise_predictions : numpy.ndarray\n", + " The tree-wise predictions.\n", + " n_classes : int\n", + " The number of classes in the dataset.\n", + "\n", + " Returns\n", + " -------\n", + " numpy.ndarray\n", + " The sample-wise distributions of the predicted classes.\n", + " \"\"\"\n", + " ## GET SAMPLE-WISE PREDICTIONS FROM TRANSPOSED TREE-WISE PREDICTIONS.\n", + " ## sample_wise_predictions = ...\n", + " predicted_class_hists_local = np.array(\n", + " [\n", + " np.bincount(sample_pred, minlength=n_classes)\n", + " for sample_pred in sample_wise_predictions\n", + " ]\n", + " )\n", + " # The histogram arrays have `n_test_samples` entries with `n_classes` elements each.\n", + " # The local histogram holds the class prediction distribution for each test sample over the local forest.\n", + " # Now we want to sum up those sample-wise distributions to obtain the global histogram over the global forest.\n", + " # From this global histogram, we can determine the global majority vote for each sample.\n", + " ## INITIALIZE RECEIVE BUFFER FOR GLOBAL SAMPLE-WISE DISTRIBUTION OF THE PREDICTED CLASSES.\n", + " ## ALL-REDUCE LOCAL SAMPLE-WISE DISTRIBUTIONS OF PREDICTED CLASSES TO OBTAIN THE CORRESPONDING GLOBAL DISTRIBUTION.\n", + " return predicted_class_hists_global\n", + "\n", + " @staticmethod\n", + " def _calc_majority_vote_hist(predicted_class_hists: np.ndarray) -> np.ndarray:\n", + " \"\"\"\n", + " Calculate majority vote from sample-wise histograms of predicted classes.\n", + "\n", + " Parameters\n", + " ----------\n", + " predicted_class_hists : numpy.ndarray\n", + " The sample-wise histograms of the predicted classes.\n", + "\n", + " Returns\n", + " -------\n", + " numpy.ndarray\n", + " The majority votes for all samples in histogram input.\n", + " \"\"\"\n", + " return np.array(\n", + " [np.argmax(sample_hist) for sample_hist in predicted_class_hists]\n", + " )\n", + "\n", + " def train(\n", + " self,\n", + " train_samples: np.ndarray,\n", + " train_targets: np.ndarray,\n", + " ) -> None:\n", + " \"\"\"\n", + " Train random forest in parallel.\n", + "\n", + " Parameters\n", + " ----------\n", + " train_samples : numpy.ndarray\n", + " The rank-local train samples.\n", + " train_targets : numpy.ndarray\n", + " The corresponding train targets.\n", + " \"\"\"\n", + " # Set up communicator.\n", + " rank, size = self.comm.rank, self.comm.size\n", + " # Set up and train local forest.\n", + " print(\n", + " f\"[{rank}/{size}]: Set up and train local random forest with \"\n", + " f\"{self.n_trees_local} trees and random state {self.random_state}.\"\n", + " )\n", + " ## TRAIN LOCAL CLASSIFIER ON LOCAL TRAINING DATA USING METHOD DEFINED ABOVE.\n", + "\n", + " def test(\n", + " self,\n", + " test_samples: np.ndarray,\n", + " test_targets: np.ndarray,\n", + " n_classes: int,\n", + " ) -> None:\n", + " \"\"\"\n", + " Test trained distributed global random forest.\n", + "\n", + " Parameters\n", + " ----------\n", + " test_samples : numpy.ndarray\n", + " The rank-local test samples.\n", + " test_targets : numpy.ndarray\n", + " The corresponding test targets.\n", + " n_classes : int\n", + " The number of classes in the dataset.\n", + " \"\"\"\n", + " rank, size = self.comm.rank, self.comm.size\n", + " # Get class predictions of sub estimators in each forest.\n", + " print(f\"[{rank}/{size}]: Get predictions of individual sub estimators.\")\n", + " ## GET THE PREDICTIONS OF THE INDIVIDUAL SUB ESTIMATORS, I.E., TREES FOR THE TEST SAMPLES USING THE METHOD DEFINED ABOVE.\n", + " ## tree_predictions_local = ..\n", + " print(f\"[{rank}/{size}]: Calculate majority vote via histograms.\")\n", + " ## CALCULATE THE HISTOGRAM-BASED MAJORITY VOTE USING THE METHODS DEFINED ABOVE.\n", + " ## GET THE DISTRIBUTIONS OF THE PREDICTED CLASSES FOR EACH TEST SAMPLE.\n", + " ## predicted_class_hists = ...\n", + " ## GET THE MAJORITY VOTES FROM THE PREDICTED HISTOGRAMS.\n", + " ## majority_vote = ...\n", + " # Calculate accuracies.\n", + " self.acc_global = (test_targets == majority_vote).mean()\n", + " ## CALCULATE THE ACCURACY OF EACH RANK-LOCAL CLASSIFIER.\n", + " ## self.acc_local = ...\n", + " print(\n", + " f\"[{rank}/{size}]: Local accuracy is {self.acc_local}, global accuracy is {self.acc_global}.\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d6694aa-0424-47f1-a546-d6fc12682a1e", + "metadata": {}, + "outputs": [], + "source": [ + "if __name__ == \"__main__\":\n", + " # SETTINGS\n", + " comm = MPI.COMM_WORLD # Set up communicator.\n", + " rank, size = comm.rank, comm.size\n", + "\n", + " if rank == 0:\n", + " print(\n", + " \"######################################################\\n\"\n", + " \"# Distributed Random Forest in Scikit-Learn with MPI #\\n\"\n", + " \"######################################################\\n\"\n", + " )\n", + "\n", + " n_classes = 2\n", + "\n", + " header_lines = 0 # Data file specifications\n", + " sep = \",\"\n", + " encoding = \"utf-8\"\n", + " path_to_data = \"../../SUSY.csv\"\n", + "\n", + " parser = argparse.ArgumentParser(prog=\"Distributed Random Forest\")\n", + " parser.add_argument(\n", + " \"--dataloader\",\n", + " type=str,\n", + " choices=[\"root\", \"parallel\"],\n", + " default=\"root\",\n", + " help=\"The dataloader to use.\",\n", + " )\n", + " parser.add_argument(\n", + " \"--num_trees\",\n", + " type=int,\n", + " default=100,\n", + " help=\"The global number of trees.\",\n", + " )\n", + " parser.add_argument(\n", + " \"--train_split\",\n", + " type=float,\n", + " default=0.75,\n", + " help=\"The fraction of the dataset used for training.\",\n", + " )\n", + " parser.add_argument(\n", + " \"--verbose\",\n", + " action=\"store_true\",\n", + " help=\"The verbosity.\",\n", + " )\n", + " args = parser.parse_args()\n", + "\n", + " seed = size\n", + "\n", + " # -------------- Load data. --------------\n", + " print(f\"[{rank}/{size}]: Loading data...\")\n", + " with MPITimer(comm, name=\"data loading\"):\n", + " if args.dataloader == \"parallel\":\n", + " if rank == 0:\n", + " print(\"Using truly parallel dataloader...\")\n", + " (\n", + " train_samples_local,\n", + " train_targets_local,\n", + " test_samples,\n", + " test_targets,\n", + " ) = load_data_csv_parallel(\n", + " path_to_data, header_lines, comm, args.train_split, args.verbose\n", + " )\n", + "\n", + " elif args.dataloader == \"root\":\n", + " if rank == 0:\n", + " print(\"Using root-based dataloader with Scatterv...\")\n", + " (\n", + " train_samples_local,\n", + " train_targets_local,\n", + " test_samples,\n", + " test_targets,\n", + " ) = load_data_csv_root(\n", + " path_to_data, header_lines, comm, seed, args.train_split, args.verbose\n", + " )\n", + " else:\n", + " raise ValueError\n", + "\n", + " print(\n", + " f\"[{rank}/{size}]: DONE.\\n\"\n", + " f\"Local train samples and targets have shapes {train_samples_local.shape} and {train_targets_local.shape}.\\n\"\n", + " f\"Global test samples and targets have shapes {test_samples.shape} and {test_targets.shape}.\\n\"\n", + " f\"Labels are {train_targets_local}\"\n", + " )\n", + "\n", + " # -------------- Set up random forest. --------------\n", + " with MPITimer(comm, name=\"forest creation\"):\n", + " distributed_random_forest = DistributedRandomForest(\n", + " n_trees_global=args.num_trees,\n", + " comm=comm,\n", + " random_state=seed,\n", + " )\n", + "\n", + " # -------------- Train random forest. --------------\n", + " with MPITimer(comm, name=\"training\"):\n", + " distributed_random_forest.train(\n", + " train_samples_local,\n", + " train_targets_local,\n", + " )\n", + "\n", + " # -------------- Evaluate random forest. --------------\n", + " print(f\"[{rank}/{size}]: Evaluate random forest.\")\n", + " with MPITimer(comm, name=\"test\"): # Test trained model on test data.\n", + " distributed_random_forest.test(\n", + " test_samples,\n", + " test_targets,\n", + " n_classes,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dfe87a2f", + "metadata": {}, + "outputs": [], + "source": [ + "#!/bin/bash\n", + "\n", + "#SBATCH --job-name=RF2 # Job name\n", + "#SBATCH --partition=multiple # Queue for the resource allocation\n", + "#SBATCH --nodes=2 # Number of nodes \n", + "#SBATCH --time=70:00 # Wall-clock time limit \n", + "#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node\n", + "#SBATCH --cpus-per-task=40 # CPUs per task\n", + "#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.\n", + "\n", + "export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}\n", + "export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.\n", + "export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.\n", + "\n", + "# Set up modules.\n", + "module purge # Unload all currently loaded modules.\n", + "module load compiler/gnu/10.2 # Load required modules.\n", + "module load devel/python/3.8.6_gnu_10.2\n", + "module load mpi/openmpi/4.1\n", + "\n", + "source ${VENVDIR}/bin/activate # Activate your virtual environment.\n", + "\n", + "mpirun python ${PYDIR}/distributed_forest.py --dataloader parallel # Use truly parallel dataloader.\n", + "mpirun python ${PYDIR}/distributed_forest.py --dataloader root # Use root-based dataloader.\n" + ] + } + ], + "metadata": { + "jupytext": { + "main_language": "python" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}