Skip to content
Snippets Groups Projects
Commit 0e4f5c9d authored by Marie Weiel's avatar Marie Weiel :zap:
Browse files

modify communication

parent 95eaea80
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id:2432be9b tags:
# Skalierbare Methoden der Künstlichen Intelligenz
Dr. Charlotte Debus (<charlotte.debus@kit.edu>)
Dr. Markus Götz (<markus.goetz@kit.edu>)
Dr. Marie Weiel (<marie.weiel@kit.edu>)
Dr. Kaleb Phipps (<kaleb.phipps@kit.edu>)
## Übung 3 am 17.12.24: Paralleles Ensemble Learning mit verteilten Random Forests
In der dritten Übung beschäftigen wir uns mit parallelem Ensemble Learning am Beispiel von Random Forests (siehe Vorlesung vom 28.11.2024).
Dazu verwenden wir [Scikit-learn](https://scikit-learn.org/stable/), die meist genutzte Software-Bibliothek für klassisches maschinelles Lernen in Python. Scikit-learn bietet verschiedene Klassifikations-, Regressions- und Clustering-Algorithmen, darunter Support-Vektor-Maschinen, Random Forests, Gradient Boosting (wie XGBoost), k-Means und DBSCAN. Das Package basiert auf den bekannten Python-Bibliotheken [NumPy](https://numpy.org) und [SciPy](https://scipy.org).
Unser Ziel ist es, den [SUSY-Datensatz](https://archive.ics.uci.edu/ml/datasets/SUSY) mithilfe eines verteilten Random Forests zu klassifizieren. Der SUSY-Datensatz besteht aus 5M Monte-Carlo-Samples supersymmetrischer und nicht-supersymmetrischer Teilchenkollisionen. Der Signalprozess ist die Produktion elektrisch geladener, supersymmetrischer Teilchen, die in $W$-Bosonen und ein elektrisch neutrales, supersymmetrisches Teilchen zerfallen, welches für den Detektor unsichtbar ist. Bei diesem Klassifikationsproblem möchten wir anhand von insgesamt 18 Features zwischen diesem Signalprozess, der supersymmetrische Teilchen erzeugt, und einem Hintergrundprozess, der dies nicht tut, unterscheiden.
Bei den ersten acht Features handelt es sich um kinematische Eigenschaften, die von Detektoren in Teilchenbeschleunigern direkt gemessen werden können. Die verbleibenden zehn Features sind Funktionen der ersten acht und wurden von Physiker*innen abgeleitet, um eine Unterscheidung zwischen den beiden Klassen zu erleichtern.
Sie finden den kompletten SUSY-Datensatz im CSV-Format im Workspace auf dem Cluster: `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv`. Er enthält insgesamt $n_\mathrm{samples}$ = 5M Samples. Die erste Spalte enthält die Klassenbezeichnungen ("Labels" bzw. "Targets"; 1 für Signal, 0 für Hintergrund) gefolgt von den $n_\text{features}$ = 18 Features.
Zur binären Klassifizierung dieses Datensatzes in die oben genannten Klassen nutzen wir einen verteilten [Random Forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html#sklearn.ensemble.RandomForestClassifier.predict) mit insgesamt $n_\text{trees}$ Bäumen. Wir betrachten ein Setting mit $p$ Prozessoren. Jeder Prozessor $i=1,\cdots,p$ hält
- einen Sub Random Forest mit $n_\text{trees}^{i}=n_\text{trees}/p$ Bäumen (vorbehaltlich Load Balancing), wobei $\sum_{i=1}^p n_\text{trees}^i = n_\text{trees}$,
- eine gewisse Anzahl $n_\text{train}^i = n_\text{train}/p$ aller Trainings-Samples (vorbehaltlich Load Balancing) als lokalen Trainingsdatensatz sowie
- den kompletten, global einheitlichen Testdatensatz mit $n_\text{test}$ Samples.
Das bedeutet, dass es einen global einheitlichen Train-Test-Split $f_\text{train}$ des gesamten Datensatzes gibt, wobei die Testdaten auf allen Prozessoren gleichermaßen vollständig vorliegen, wohingegen jeder Prozessor eine gewisse Anzahl an Samples aus dem globalen Trainingsdatensatz als lokal unterschiedliche Trainingsdaten erhält. Es gilt also:
$$n_\text{samples} = n_\text{train} + n_\text{test} = f_\text{train}\cdot n_\text{samples}+(1-f_\text{train})\cdot n_\text{samples} = \sum_{i=1}^p n_\text{train}^i+n_\text{test}$$
Jeder Prozessor trainiert seinen Sub Random Forest auf seinem lokalen Trainingsdatensatz. Zum Schluss werden die Teil-Ergebnisse über einen Majority Vote zu einem finalen Ergebnis kombiniert.
%% Cell type:markdown id:de426257 tags:
### Aufgabe 1: Daten laden
Um unser Modell zu trainieren und zu testen, müssen wir zunächst die verfügbaren Daten laden. Nach dem Train-Test-Split soll jeder Prozessor eine gewisse Anzahl an Datenpunkten aus dem globalen Trainingsdatensatz *mit Zurücklegen* ziehen. Diese Samples bilden den jeweiligen lokalen Trainingsdatensatz. Dazu gibt es verschiedene Ansätze:
1. Alle Prozessoren laden einen entsprechenden Teil der Daten aus dem globalen Trainingsdatensatz echt-parallel: Dataloader `load_data_csv_parallel()`
2. Der Root-Prozess lädt den kompletten globalen Trainingsdatensatz und verteilt diese Samples entsprechend an alle Prozessoren mit `Scatterv`: Dataloader `load_data_csv_root()`
Implementieren und testen Sie die zwei Varianten der Dataloader. Untenstehend finden Sie dazu Code-Gerüste der Funktionen `load_data_csv_parallel()` und `load_data_csv_root()` inklusive einiger Hilfsfunktionen sowie ein Test- und ein beispielhaftes Submitskript. Vollziehen Sie diese nach und vervollständigen Sie den Code an den markierten Stellen. **Normale Kommentare mit '#' beschreiben wie üblich den Code, in Zeilen mit '##' müssen Sie Code ergänzen.** Erstellen Sie entsprechende Python-Skripte mit allen Funktionsdefinitionen bzw. dem Main-Teil. Führen Sie das Test-Skript auf 2 und 4 Knoten des bwUniClusters aus.
%% Cell type:code id:563f8376 tags:
``` python
# IMPORTS
import pathlib
from typing import BinaryIO, List, Tuple, Union
from mpi4py import MPI
from mpi4py.util.dtlib import from_numpy_dtype
import numpy as np
from sklearn.model_selection import train_test_split
```
%% Cell type:code id:eabeeef7 tags:
``` python
# HELPER FUNCTIONS FOR TRULY PARALLEL DATALOADER
# Put together with imports and other dataloaders into file `dataloaders.py`.
def _determine_line_starts(
f: BinaryIO, comm: MPI.Comm, displs: np.ndarray, counts: np.ndarray
) -> Tuple[list, bytes, int]:
"""
Determine line starts in bytes from CSV file.
Parameters
----------
f : BinaryIO
The handle of CSV file to read from
comm : MPI.Comm
The communicator to use.
displs : np.ndarray
The displacements of byte chunk starts on each rank, length is `comm.size`.
counts : np.ndarray
The counts of the byte chunk on each rank, length is `comm.size`.
Returns
-------
list
The line starts in each byte chunk.
bytes
The bytes chunk read in on each rank.
int
The number of chars used to indicate the line end in the file.
"""
rank = comm.rank # Set up communicator stuff.
# Read bytes chunk and count linebreaks.
# \r (Carriage Return) : Move cursor to line start w/o advancing to next line.
# \n (Line Feed) : Move cursor down to next line w/o returning to line start.
# \r\n (End Of Line) : Combination of \r and \n.
lineter_len = 1 # Set number of line termination chars.
line_starts = [] # Set up list to save line starts.
f.seek(displs[rank]) # Jump to pre-assigned position in file.
r = f.read(counts[rank]) # Read number of bytes from starting position.
for pos, l in enumerate(r): # Determine line breaks in bytes chunk.
if chr(l) == "\n": # Line terminated by '\n' only.
if not chr(r[pos - 1]) == "\r": # No \r\n.
line_starts.append(pos + 1)
elif chr(l) == "\r":
if (
pos + 1 < len(r) and chr(r[pos + 1]) == "\n"
): # Line terminated by '\r\n'.
line_starts.append(pos + 2)
lineter_len = 2
else: # Line terminated by '\r' only.
line_starts.append(pos + 1)
return line_starts, r, lineter_len
def _get_byte_pos_from_line_starts(
line_starts: np.ndarray, file_size: int, lineter_len: int
) -> np.ndarray:
"""
Get line starts and counts in byte from line starts to read lines via seek and read.
Parameters
----------
line_starts : np.ndarray
The absolute positions of line starts in bytes.
file_size : int
The absolute file size in bytes.
lineter_len : int
The number of line termination characters.
Returns
-------
np.ndarray
An array containing vectors of (start, count) for lines in byte.
"""
lines_byte = [] # list for line starts and counts
for idx in range(len(line_starts)): # Loop through all line starts.
if idx == len(line_starts) - 1: # Special case for last line.
temp = [
line_starts[idx], # Line start in bytes
file_size
- line_starts[idx]
- lineter_len, # Bytes count of line length via difference
]
else: # All other lines
temp = [
line_starts[idx], # Line start in bytes
line_starts[idx + 1]
- line_starts[idx]
- lineter_len, # Bytes count of line length via difference
]
lines_byte.append(temp)
return np.array(lines_byte)
def _decode_bytes_array(
byte_pos: np.ndarray, f: BinaryIO, sep: str = ",", encoding: str = "utf-8"
) -> List:
"""
Decode lines from byte positions and counts.
Parameters
----------
byte_pos : np.ndarray
The vectors of line starts and lengths in bytes.
f : BinaryIO
The handle of the CSV file to read from.
sep : char
The character used in the file to separate entries.
encoding : str
The encoding used to decode entries from bytes.
Returns
-------
list
Values read from CSV file as numpy array entries in float format
"""
lines = [] # List for saving decoded lines
byte_pos = byte_pos[
byte_pos[:, 0].argsort()
] # Sort line starts of data items to read from file in ascending order.
for item in byte_pos:
f.seek(item[0]) # Go to line start.
line = f.read(item[1]).decode(
encoding
) # Read specified number of bytes and decode.
if len(line) > 0:
sep_values = [
float(val) for val in line.split(sep)
] # Separate values in each line.
line = np.array(
sep_values
) # Convert list of separated values to numpy array.
lines.append(line) # Append numpy data entry to output list.
return lines
def _split_indices_train_test(
n_samples: int, train_split: float, seed: int
) -> Tuple[int, int, np.ndarray, np.ndarray]:
"""
Make index-based train test split with shuffle.
Parameters
----------
n_samples : int
The overall number of samples in the dataset.
train_split : float
The fraction of the dataset used for training (0 < `train_split` < 1)
seed : int
The seed used for the random number generator.
Returns
-------
n_train_samples : int
The number of train samples.
n_test_samples : int
The number of test samples.
train_indices : np.ndarray
The indices of the samples in the dataset used for training.
test_indices : np.ndarray
The indices of the samples in the dataset used for testing.
"""
## DETERMINE GLOBAL NUMBER OF TRAIN SAMPLES FROM SPLIT AND OVERALL NUMBER OF SAMPLES.
## n_train_samples = int(...)
## DETERMINE GLOBAL NUMBER OF TEST SAMPLES FROM OVERALL NUMBER OF SAMPLES AND GLOBAL NUMBER OF TRAIN SAMPLES.
## n_test_samples = ...
rng = np.random.default_rng(
seed=seed
) # Set same seed over all ranks for consistent train-test split.
all_indices = np.arange(0, n_samples) # Construct array of all indices.
rng.shuffle(all_indices) # Shuffle them.
## EXTRACT FIRST `n_train_samples` INDICES FROM SHUFFLED INDICES ARRAY `all_indices` TO FORM TRAIN SET.
## train_indices = all_indices[...]
## REMAINING ONES FORM TEST SET.
## test_indices = ...
return n_train_samples, n_test_samples, train_indices, test_indices
```
%% Cell type:code id:c0fbac25 tags:
``` python
# TRULY PARALLEL DATALOADER
# Put together with imports and other dataloaders into file `dataloaders.py`.
def load_data_csv_parallel(
path_to_data: Union[pathlib.Path, str],
header_lines: int,
comm: MPI.Comm = MPI.COMM_WORLD,
train_split: float = 0.75,
verbose: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Load data from CSV file in truly parallel fashion.
Parameters
----------
path_to_data : str | pathlib.Path
The path to the CSV file.
header_lines : int
The number of header lines.
comm : MPI.Comm
The communicator to use.
train_split : float
The train-test split fraction.
verbose : bool
The verbosity level.
Returns
-------
train_samples_local : np.ndarray
The rank-dependent train samples.
train_targets_local : np.ndarray
The rank-dependent train targets.
test_samples : np.ndarray
The global test samples.
test_targets : np.ndarray
The global test targets.
"""
rank, size = comm.rank, comm.size # Set up communicator stuff.
## GET FILE SIZE IN BYTES.
## file_size = ...
# Determine displs + counts of bytes chunk to read on each rank.
base = file_size // size # Determine base chunk size for each process.
remainder = file_size % size # Determine remainder bytes.
counts = base * np.ones( # Construct array with each rank's chunk counts.
(size,), dtype=int
)
if (
remainder > 0
): # Equally distribute remainder over respective ranks to balance load.
counts[:remainder] += 1
displs = np.concatenate( # Determine displs via cumulative sum from counts.
(np.zeros((1,), dtype=int), np.cumsum(counts, dtype=int)[:-1])
)
if rank == 0:
print(f"File size is {file_size} bytes.")
if rank == 0 and verbose:
print(f"Displs {displs}, counts {counts} for reading bytes chunks from file.")
with open(path_to_data, "rb") as f: # Open csv file to read from.
# Determine line starts in bytes chunks on each rank.
line_starts, r, lineter_len = _determine_line_starts(f, comm, displs, counts)
# On rank 0, add very first line.
if rank == 0:
line_starts = [0] + line_starts
if verbose:
print(f"[{rank}/{size}]: {len(line_starts)} line starts in chunk.")
# Find correct starting point, considering header lines.
# All-gather numbers of line starts in each chunk in `total_lines` array.
total_lines = np.empty(size, dtype=int)
comm.Allgather(
[np.array(len(line_starts), dtype=int), MPI.INT], [total_lines, MPI.INT]
)
cumsum = list(np.cumsum(total_lines))
# Determine rank where actual data lines start,
# i.e. remove ranks only containing header lines.
start = next(i for i in range(size) if cumsum[i] > header_lines)
if verbose:
print(
f"[{rank}/{size}]: total_lines is {total_lines}.\n"
f"[{rank}/{size}]: cumsum is {cumsum}.\n"
f"[{rank}/{size}]: start is {start}."
)
if rank < start: # Ranks containing only header lines.
line_starts = []
if rank == start: # Rank containing header + data lines.
rem = header_lines - (0 if start == 0 else cumsum[start - 1])
line_starts = line_starts[rem:]
# Share line starts of data samples across all ranks via Allgatherv.
line_starts += displs[
rank
] # Shift line starts on each rank according to displs.
if verbose:
print(
f"[{rank}/{size}]: {len(line_starts)} line starts of shape {line_starts.shape} and type "
f"{line_starts.dtype} in local chunk: {line_starts}"
)
count_linestarts = np.array( # Determine local number of line starts.
len(line_starts), dtype=int
)
counts_linestarts = (
np.empty( # Initialize array to all-gather local numbers of line starts.
size, dtype=int
)
)
comm.Allgather([count_linestarts, MPI.INT], [counts_linestarts, MPI.INT])
n_linestarts = np.sum(
counts_linestarts
) # Determine overall number of line starts.
displs_linestarts = (
np.concatenate( # Determine displacements of line starts from counts.
(
np.zeros((1,), dtype=int),
np.cumsum(counts_linestarts, dtype=int)[:-1],
),
dtype=int,
)
)
if verbose and rank == 0:
print(
f"Overall {n_linestarts} linestarts.\n"
f"Number of linestarts in each chunk is {counts_linestarts}.\n"
f"Displs of linestarts in each chunk is {displs_linestarts}."
)
all_line_starts = (
np.empty( # Initialize array to all-gatherv line starts from all ranks.
(n_linestarts,), dtype=line_starts.dtype
)
)
if verbose and rank == 0:
print(
f"Recvbuf {all_line_starts}, {all_line_starts.shape}, {all_line_starts.dtype}."
)
comm.Allgatherv(
line_starts,
[
all_line_starts,
counts_linestarts,
displs_linestarts,
from_numpy_dtype(line_starts.dtype),
],
)
# Line starts were determined as those positions following a line end.
# But: There is no line after last line end in file.
# Thus, remove last entry from all_line_starts.
all_line_starts = all_line_starts[:-1]
if rank == 0:
print(f"After Allgatherv: All line starts: {all_line_starts}")
## CONSTRUCT ARRAY WITH LINE STARTS AND LENGTHS (OR COUNTS) IN BYTES.
## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE.
## lines_byte = ...
## MAKE INDEX-BASED GLOBAL TRAIN-TEST SPLIT.
## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE.
## TAKE CARE OF CHOOSING THE SEEDS PROPERLY TO ENSURE A GLOBALLY CONSISTENS SPLIT OVER ALL RANKS.
## n_train_samples, n_test_samples, train_indices, test_indices = ...
n_train_samples_local = (
n_train_samples // size
) # Determine local train dataset size.
remainder_train = n_train_samples % size # Balance load.
if rank < remainder_train:
n_train_samples_local += 1
# Construct global held-out test dataset (same on each rank).
## CONSTRUCT ACTUAL TEST SET FROM INDEX-BASED TRAIN-TEST SPLIT.
## TO DO SO, USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE TO READ THE
## ACTUAL DATA ENTRIES SELECTIVELY FROM THE FILE (AS SPECIFIED BY THE INDICES).
## test_lines = ...
test_samples = np.array(test_lines)[:, 1:]
test_targets = np.array(test_lines)[:, 0]
if verbose:
print(
f"[{rank}/{size}]: Test samples: {test_samples[0]}\n"
f"Test targets: {test_targets[0]}\n"
)
# Construct train dataset (different on each rank!).
## DRAW `train_samples_local` SAMPLES FROM ALL TRAIN SAMPLES WITH REPETITION.
## THE RESULTING LOCAL TRAIN DATASET SHOULD BE DIFFERENT ON EACH RANK.
## DO THIS IN AN INDICES-BASED FASHION.
## train_indices_local = ...
## USE ONE OF THE HELPER FUNCTIONS DEFINED ABOVE TO READ THE ACTUAL DATA ENTRIES
## SELECTIVELY FROM THE FILE (AS SPECIFIED BY THE INDICES).
## train_lines_local = ...
train_samples_local = np.array(train_lines_local)[:, 1:]
train_targets_local = np.array(train_lines_local)[:, 0]
# Now, each rank holds a local train set (samples + targets)
# and the global held-out test set (samples + targets).
return train_samples_local, train_targets_local, test_samples, test_targets
```
%% Cell type:code id:25acd308 tags:
``` python
# ROOT-BASED DATALOADER WITH SCATTERV
# Put together with imports and other dataloaders into file `dataloaders.py`.
def load_data_csv_root(
path_to_data: Union[str, pathlib.Path],
header_lines: int,
comm: MPI.Comm,
seed: int,
train_split: float = 0.75,
verbose: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Load data from CSV file via root process.
Parameters
----------
path_to_data : str | pathlib.Path
The path to the CSV file.
header_lines : int
The number of header lines.
comm : MPI.Comm
The communicator to use.
seed : int
The seed used for train-test split.
train_split : float
The train-test split fraction.
verbose : bool
The verbosity level
Returns
-------
np.ndarray
The rank-dependent train samples.
np.ndarray
The rank-dependent train targets.
np.ndarray
The global test samples.
np.ndarray
The global test targets.
"""
rank, size = comm.rank, comm.size # Set up communicator stuff.
if rank == 0:
## LOAD COMPLETE DATA INTO NUMPY ARRAY.
## data = ...
## DIVIDE DATA INTO SAMPLES AND TARGETS.
## First column contains targets, followed by the 18 features forming the samples.
## samples = data[...]
## targets = ...
## PERFORM TRAIN-TEST SPLIT USING THE `train_test_split` HELPER FUNCTION IN sklearn.
## samples_train, samples_test, targets_train, targets_test = ...
##
## DETERMINE THE NUMBER OF SAMPLES IN THE TRAIN SET.
## n_train_samples = ...
## DETERMINE THE NUMBER OF SAMPLES IN THE TEST SET.
## n_test_samples = ...
## DETERMINE THE NUMBER OF FEATURES.
## n_features = ...
n_train_samples_local = (
n_train_samples // size
) # Determine rank-local number of train samples.
remainder_train = n_train_samples % size
train_counts = n_train_samples_local * np.ones(
size, dtype=int
) # Determine load-balanced counts and displacements.
for idx in range(remainder_train):
train_counts[idx] += 1
train_counts[:remainder_train] += 1
train_displs = np.concatenate(
(np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int
)
print(
f"There are {n_train_samples} train and {n_test_samples} test samples.\n"
f"Local train samples: {train_counts}"
)
# Construct train dataset.
## SAMPLE `n_train_samples` INDICES FROM TRAIN SET WITH REPLACEMENT.
## train_indices = ...
## CONSTRUCT ACTUAL TRAIN DATASET FROM DRAWN INDICES (REPETITIONS POSSIBLE!)
## samples_train_shuffled = ...
## targets_train_shuffled = ...
send_buf_train_samples = [
samples_train_shuffled,
train_counts * n_features,
train_displs * n_features,
from_numpy_dtype(samples_train_shuffled.dtype),
]
send_buf_train_targets = [
targets_train_shuffled,
train_counts,
train_displs,
from_numpy_dtype(targets_train_shuffled.dtype),
]
else:
train_counts = None
train_counts = np.empty(size, dtype=int)
n_features = None
n_test_samples = None
send_buf_train_samples = None
send_buf_train_targets = None
## BROADCAST NUMBER OF FEATURES FROM ROOT TO ALL OTHERS.
## n_features = ...
## BROADCAST NUMBER OF TEST SAMPLES FROM ROOT TO ALL OTHERS.
## n_test_samples = ...
## BROADCAST TRAIN COUNTS FROM ROOT TO ALL OTHERS.
## train_counts = ...
## ...
samples_train_local = np.empty((train_counts[rank], n_features), dtype=float)
targets_train_local = np.empty((train_counts[rank],), dtype=float)
recv_buf_train_samples = [
samples_train_local,
from_numpy_dtype(samples_train_local.dtype),
]
recv_buf_train_targets = [
targets_train_local,
from_numpy_dtype(targets_train_local.dtype),
]
if rank != 0:
samples_test = np.empty((n_test_samples, n_features), dtype=float)
targets_test = np.empty((n_test_samples,), dtype=float)
## SCATTERV LOCAL TRAIN SAMPLES `samples_train_local` FROM ROOT AT RANK 0 TO ALL OTHERS.
## TO DO SO, MAKE USE OF THE SEND AND RECEIVE BUFFERS DEFINED ABOVE.
## SCATTERV LOCAL TRAIN TARGETS `targets_train_local` FROM ROOT AT RANK 0 TO ALL OTHERS.
## TO DO SO, MAKE USE OF THE SEND AND RECEIVE BUFFERS DEFINED ABOVE.
## BROADCAST TEST SAMPLES `samples_test` FROM ROOT AT RANK 0 TO ALL OTHERS.
## BROADCAST TEST TARGETS `targets_test` FROM ROOT AT RANK 0 TO ALL OTHERS.
if verbose:
print(
f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n"
f"Train targets after Scatterv have shape {targets_train_local.shape}.\n"
f"Test samples after Bcast have shape {samples_test.shape}.\n"
f"Test targets after Bcast have shape {targets_test.shape}."
)
return samples_train_local, targets_train_local, samples_test, targets_test
```
%% Cell type:code id:fe69716a tags:
``` python
"""Test dataloaders."""
import argparse
from mpi4py import MPI
from dataloaders import load_data_csv_parallel, load_data_csv_root
from distributed_forest import MPITimer
if __name__ == "__main__":
# Set up communicator.
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
# Data file specifications
header_lines = 0
sep = ","
encoding = "utf-8"
path_to_data = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv"
parser = argparse.ArgumentParser(prog="Distributed Random Forest")
parser.add_argument(
"--train_split",
type=float,
default=0.75,
help="The fraction of the dataset used for training.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="The verbosity.",
)
args = parser.parse_args()
seed = size
print(f"[{rank}/{size}]: Loading data using truly parallel dataloader...")
with MPITimer(comm, name="truly parallel data loading"):
(
train_samples_local_par,
train_targets_local_par,
test_samples_par,
test_targets_par,
) = load_data_csv_parallel(
path_to_data, header_lines, comm, args.train_split, args.verbose
)
print(f"[{rank}/{size}]: Loading data using root-based dataloader...")
with MPITimer(comm, name="root-based data loading"):
(
train_samples_local_root,
train_targets_local_root,
test_samples_root,
test_targets_root,
) = load_data_csv_root(
path_to_data, header_lines, comm, seed, args.train_split, args.verbose
)
print(
f"[{rank}/{size}]: DONE.\n"
f"Parallel: Local train samples / targets have shapes "
f"{train_samples_local_par.shape} / {train_targets_local_par.shape}.\n"
f"Parallel: Global test samples / targets have shapes {test_samples_par.shape} / {test_targets_par.shape}.\n"
f"Root: Local train samples / targets have shapes "
f"{train_samples_local_root.shape} / {train_targets_local_root.shape}.\n"
f"Root: Global test samples / targets have shapes {test_samples_root.shape} / {test_targets_root.shape}."
)
```
%% Cell type:code id:0bed7db5 tags:
``` python
#!/bin/bash
#SBATCH --job-name=dataloader # job name
#SBATCH --partition=dev_multiple # queue for the resource allocation
#SBATCH --nodes=2
#SBATCH --time=10:00 # wall-clock time limit
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --cpus-per-task=40
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python ${PYDIR}/test_dataloaders.py
```
%% Cell type:markdown id:7c0910e3 tags:
### Aufgabe 2
Laden Sie die Daten mit den verschiedenen Dataloadern und trainieren Sie einen verteilen Random Forest auf 1, 2, 4, 8, 16, 32 und 64 rein CPU-basierten Knoten des bwUniClusters. Untenstehend finden Sie ein entsprechendes Code-Gerüst sowie ein beispielhaftes Submit-Bash-Skript für 2 Knoten. Vervollständigen Sie den Code an den markierten Stellen.
**Normale Kommentare mit '#' beschreiben wie üblich den Code, in Zeilen mit '##' müssen Sie Code ergänzen.**
Erstellen Sie ein vollständiges Python-Skript mit allen Funktionsdefinitionen sowie dem Main-Teil. Messen und vergleichen Sie:
- die Datenladedauer
- die Trainingsdauer
- die lokale Accuracy der Sub Random Forests auf den Testdaten (auf jedem Rank)
- die globale Accuracy des finalen globalen Random Forest auf den Testdaten
Plotten Sie die oben genannten Größen über die Anzahl der verwendeten Knoten. Was fällt Ihnen auf? Welche Trends können Sie beobachten und wie lassen sich diese erklären?
%% Cell type:code id:deed7fc1 tags:
``` python
# IMPORTS
# Put together with helper functions and main (see below) into `distributed_forest.py`.
import argparse
import string
import time
from typing import Tuple
import numpy as np
from mpi4py import MPI
from sklearn.ensemble import RandomForestClassifier
from dataloaders import load_data_csv_parallel, load_data_csv_root
```
%% Cell type:code id:d6590961-723f-41a3-b364-2488f9dd2694 tags:
``` python
# HELPER FUNCTIONS
# Put together with imports (see above) and main (see below) into `distributed_forest.py`.
class MPITimer:
"""A distributed context-manager enabled timer."""
def __init__(
self,
comm: MPI.Comm,
print_on_exit: bool = True,
name: str = "",
output_format: str = "Elapsed time {name}: global average {elapsed_time_average:.2g}s, "
"local {elapsed_time_local:.2g}s",
) -> None:
"""
Create a new distributed context-manager enabled timer.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
print_on_exit : bool
Whether to print the measured time in __exit__.
name : str
Label describing what this timer measured, can be used for printing the results.
output_format : str
Format string template used for printing the output. May reference all attributes of the
"""
self.comm = comm
self.output_format = output_format
self.print_on_exit = print_on_exit
self.name = name
self.start_time = None
self.end_time = None
self.elapsed_time_local = None
self.elapsed_time_average = None
def start(self) -> None:
"""Start the timer by setting the start time."""
self.start_time = time.perf_counter()
def stop(self) -> None:
"""Stop the timer by setting the end time and updating elapsed_time_local."""
self.end_time = time.perf_counter()
self.elapsed_time_local = self.end_time - self.start_time
def allreduce_for_average_time(self) -> None:
"""Compute the global average using allreduce and update elapsed_time_average."""
self.elapsed_time_average = (
self.comm.allreduce(self.elapsed_time_local, op=MPI.SUM) / self.comm.size
)
def print(self) -> None:
"""Print the elapsed time using the given template."""
template_keywords = {
key for (_, key, _, _) in string.Formatter().parse(self.output_format)
}
template_kwargs = {
key: value for key, value in vars(self).items() if key in template_keywords
}
print(self.output_format.format(**template_kwargs))
def __enter__(self) -> "MPITimer":
"""
Called on entering this context (e.g. with a 'with' statement), starts the timer.
Returns
-------
MPITimer
This timer object.
"""
self.start()
return self
def __exit__(self, *args) -> None:
"""
Called on exiting this context (e.g. after a 'with' statement), stops the timer, computes the global average
and optionally prints the result on rank 0.
Parameters
----------
args :
Unused, only to fulfill ``__exit__`` interface.
"""
self.stop()
self.allreduce_for_average_time()
if self.print_on_exit and self.comm.rank == 0:
self.print()
class DistributedRandomForest:
"""
Distributed random forest class.
Attributes
----------
acc_global : float | None
The global accuracy.
acc_local : float | None
Each local trained model's accuracy.
clf : RandomForestClassifier | None
The sklearn classifier.
comm : MPI.Comm
The communicator.
n_trees_base : int
The base number of rank-local trees.
n_trees_global : int
The number of trees in the global forest.
n_trees_local : int
The number of local trees.
n_trees_remainder : int
The remaining number of trees to distribute.
random_state : int
The base random state for sklearn random forest.
Methods
-------
_distribute_trees()
Distribute trees evenly over all processors.
_train_local_classifier()
Train local random forest classifier.
_predict_locally()
Get predictions of all sub estimators in random forest.
_get_predicted_class_hist()
Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.
_calc_majority_vote_hist()
Calculate majority vote from sample-wise histograms of predicted classes.
train()
Train random forest in parallel.
test()
Test trained distributed global random forest.
"""
def __init__(
self,
n_trees_global: int,
comm: MPI.Comm,
random_state: int,
) -> None:
"""
Initialize distributed random forest object.
Parameters
----------
n_trees_global : int
The number of trees in the global forest.
comm : MPI.Comm
The communicator to use.
random_state : int
The base random state for sklearn RF.
"""
self.n_trees_global = n_trees_global
self.comm = comm
(
self.n_trees_base,
self.n_trees_remainder,
self.n_trees_local,
) = self._distribute_trees()
self.random_state = random_state + self.comm.rank
self.clf = None # Local random forest classifier
self.acc_global = None # Accuracy of global classifier
self.acc_local = None # Accuracy of each rank-local classifier
def _distribute_trees(self) -> Tuple[int, int, int]:
"""
Distribute trees evenly over all processors.
Returns
-------
int
The base number of rank-local trees.
int
The remaining number of trees to distribute.
int
The final number of rank-local trees.
"""
size, rank = self.comm.size, self.comm.rank
## DETERMINE BASE NUMBER OF TREES ON EACH RANK FROM GLOBAL NUMBER OF TREES AND COMM SIZE.
## n_trees_base = ... # base number of rank-local trees
## DETERMINE THE REMAINDER TO BE DISTRIBUTED EVENLY OVER THE FIRST RANKS.
## n_trees_remainder = ... # remaining number of trees to distribute
## DETERMINE THE FINAL LOAD-BALANCED LOCAL NUMBER OF TREES ON EACH RANK FROM THE
## BASE NUMBER AND THE REMAINDER.
## n_trees_local = ... # final number of local trees
return n_trees_base, n_trees_remainder, n_trees_local
def _train_local_classifier(
self, train_samples: np.ndarray, train_targets: np.ndarray
) -> RandomForestClassifier:
"""
Train local random forest classifier.
Params
------
train_samples : numpy.ndarray
samples of train dataset
train_targets : numpy.ndarray
targets of train dataset
Returns
-------
sklearn.ensemble.RandomForestClassifier
trained model
"""
## SET UP RANDOM FOREST CLASSSIFIER IN sklearn WITH RANK-LOCAL NUMBER OF TREES
## AND RANDOM SEED.
## clf =
## TRAIN CLASSIFIER ON TRAINING DATASET.
## RETURN TRAINED CLASSIFIER.
def _predict_locally(self, samples: np.ndarray) -> np.ndarray:
"""
Get predictions of all sub estimators in random forest.
Parameters
----------
samples : numpy.ndarray
The samples whose class to predict.
Returns
-------
numpy.ndarray
The predictions of each sub estimator on the samples.
"""
return np.array(
[tree.predict(samples) for tree in self.clf.estimators_], dtype="H"
)
def _get_predicted_class_hist(
self, tree_wise_predictions: np.ndarray, n_classes: int
) -> np.ndarray:
"""
Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.
Parameters
----------
tree_wise_predictions : numpy.ndarray
The tree-wise predictions.
n_classes : int
The number of classes in the dataset.
Returns
-------
numpy.ndarray
The sample-wise distributions of the predicted classes.
"""
## GET SAMPLE-WISE PREDICTIONS FROM TRANSPOSED TREE-WISE PREDICTIONS.
## sample_wise_predictions = ...
predicted_class_hists_local = np.array(
[
np.bincount(sample_pred, minlength=n_classes)
for sample_pred in sample_wise_predictions
]
)
# The histogram arrays have `n_test_samples` entries with `n_classes` elements each.
# The local histogram holds the class prediction distribution for each test sample over the local forest.
# Now we want to sum up those sample-wise distributions to obtain the global histogram over the global forest.
# From this global histogram, we can determine the global majority vote for each sample.
## INITIALIZE RECEIVE BUFFER FOR GLOBAL SAMPLE-WISE DISTRIBUTION OF THE PREDICTED CLASSES.
## ALL-REDUCE LOCAL SAMPLE-WISE DISTRIBUTIONS OF PREDICTED CLASSES TO OBTAIN THE CORRESPONDING GLOBAL DISTRIBUTION.
return predicted_class_hists_global
@staticmethod
def _calc_majority_vote_hist(predicted_class_hists: np.ndarray) -> np.ndarray:
"""
Calculate majority vote from sample-wise histograms of predicted classes.
Parameters
----------
predicted_class_hists : numpy.ndarray
The sample-wise histograms of the predicted classes.
Returns
-------
numpy.ndarray
The majority votes for all samples in histogram input.
"""
return np.array(
[np.argmax(sample_hist) for sample_hist in predicted_class_hists]
)
def train(
self,
train_samples: np.ndarray,
train_targets: np.ndarray,
) -> None:
"""
Train random forest in parallel.
Parameters
----------
train_samples : numpy.ndarray
The rank-local train samples.
train_targets : numpy.ndarray
The corresponding train targets.
"""
# Set up communicator.
rank, size = self.comm.rank, self.comm.size
# Set up and train local forest.
print(
f"[{rank}/{size}]: Set up and train local random forest with "
f"{self.n_trees_local} trees and random state {self.random_state}."
)
## TRAIN LOCAL CLASSIFIER ON LOCAL TRAINING DATA USING METHOD DEFINED ABOVE.
def test(
self,
test_samples: np.ndarray,
test_targets: np.ndarray,
n_classes: int,
) -> None:
"""
Test trained distributed global random forest.
Parameters
----------
test_samples : numpy.ndarray
The rank-local test samples.
test_targets : numpy.ndarray
The corresponding test targets.
n_classes : int
The number of classes in the dataset.
"""
rank, size = self.comm.rank, self.comm.size
# Get class predictions of sub estimators in each forest.
print(f"[{rank}/{size}]: Get predictions of individual sub estimators.")
## GET THE PREDICTIONS OF THE INDIVIDUAL SUB ESTIMATORS, I.E., TREES FOR THE TEST SAMPLES USING THE METHOD DEFINED ABOVE.
## tree_predictions_local = ..
print(f"[{rank}/{size}]: Calculate majority vote via histograms.")
## CALCULATE THE HISTOGRAM-BASED MAJORITY VOTE USING THE METHODS DEFINED ABOVE.
## GET THE DISTRIBUTIONS OF THE PREDICTED CLASSES FOR EACH TEST SAMPLE.
## predicted_class_hists = ...
## GET THE MAJORITY VOTES FROM THE PREDICTED HISTOGRAMS.
## majority_vote = ...
# Calculate accuracies.
self.acc_global = (test_targets == majority_vote).mean()
## CALCULATE THE ACCURACY OF EACH RANK-LOCAL CLASSIFIER.
## self.acc_local = ...
print(
f"[{rank}/{size}]: Local accuracy is {self.acc_local}, global accuracy is {self.acc_global}."
)
```
%% Cell type:code id:5d6694aa-0424-47f1-a546-d6fc12682a1e tags:
``` python
if __name__ == "__main__":
# SETTINGS
comm = MPI.COMM_WORLD # Set up communicator.
rank, size = comm.rank, comm.size
if rank == 0:
print(
"######################################################\n"
"# Distributed Random Forest in Scikit-Learn with MPI #\n"
"######################################################\n"
)
n_classes = 2
header_lines = 0 # Data file specifications
sep = ","
encoding = "utf-8"
path_to_data = "../../SUSY.csv"
parser = argparse.ArgumentParser(prog="Distributed Random Forest")
parser.add_argument(
"--dataloader",
type=str,
choices=["root", "parallel"],
default="root",
help="The dataloader to use.",
)
parser.add_argument(
"--num_trees",
type=int,
default=100,
help="The global number of trees.",
)
parser.add_argument(
"--train_split",
type=float,
default=0.75,
help="The fraction of the dataset used for training.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="The verbosity.",
)
args = parser.parse_args()
seed = size
# -------------- Load data. --------------
print(f"[{rank}/{size}]: Loading data...")
with MPITimer(comm, name="data loading"):
if args.dataloader == "parallel":
if rank == 0:
print("Using truly parallel dataloader...")
(
train_samples_local,
train_targets_local,
test_samples,
test_targets,
) = load_data_csv_parallel(
path_to_data, header_lines, comm, args.train_split, args.verbose
)
elif args.dataloader == "root":
if rank == 0:
print("Using root-based dataloader with Scatterv...")
(
train_samples_local,
train_targets_local,
test_samples,
test_targets,
) = load_data_csv_root(
path_to_data, header_lines, comm, seed, args.train_split, args.verbose
)
else:
raise ValueError
print(
f"[{rank}/{size}]: DONE.\n"
f"Local train samples and targets have shapes {train_samples_local.shape} and {train_targets_local.shape}.\n"
f"Global test samples and targets have shapes {test_samples.shape} and {test_targets.shape}.\n"
f"Labels are {train_targets_local}"
)
# -------------- Set up random forest. --------------
with MPITimer(comm, name="forest creation"):
distributed_random_forest = DistributedRandomForest(
n_trees_global=args.num_trees,
comm=comm,
random_state=seed,
)
# -------------- Train random forest. --------------
with MPITimer(comm, name="training"):
distributed_random_forest.train(
train_samples_local,
train_targets_local,
)
# -------------- Evaluate random forest. --------------
print(f"[{rank}/{size}]: Evaluate random forest.")
with MPITimer(comm, name="test"): # Test trained model on test data.
distributed_random_forest.test(
test_samples,
test_targets,
n_classes,
)
```
%% Cell type:code id:dfe87a2f tags:
``` python
#!/bin/bash
#SBATCH --job-name=RF2 # Job name
#SBATCH --partition=multiple # Queue for the resource allocation
#SBATCH --nodes=2 # Number of nodes
#SBATCH --time=70:00 # Wall-clock time limit
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --cpus-per-task=40 # CPUs per task
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python ${PYDIR}/distributed_forest.py --dataloader parallel # Use truly parallel dataloader.
mpirun python ${PYDIR}/distributed_forest.py --dataloader root # Use root-based dataloader.
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment