Skip to content
Snippets Groups Projects
Commit 5066c26a authored by Marie Weiel's avatar Marie Weiel :zap:
Browse files

add solution Python scripts

parent 24ceb630
No related branches found
No related tags found
No related merge requests found
"""
Sample data samples from CSV file with replacement using multiple processors in truly parallel fashion.
Each processor needs to know
- overall number samples
- absolute byte positions of data samples' start / end as array byte_pos = [[start0, end0], [start1, end1],...]
Once I got this array:
- Perform train-test split by randomly sampling `test_frac` indices from overall number of samples xn = len(byte_pos).
Those samples are held out to form test dataset. The other samples form global train dataset.
- Assume p processors each of which holds sub random forest. Each sub forest should train on `train_frac` * xn/p data
samples, where `train_frac` = 1 - `test_frac`. For this, each processor should sample those `train_frac` * xn/p data
samples from global train dataset with replacement, i.e., globally, the random forest is trained on `train_frac` * xn
samples, however, some of them might occur multiple times while others might not occur at all.
To do so, each processor needs to have `byte_pos` array!
"""
import os.path
import pathlib
from typing import BinaryIO, List, Tuple, Union
from mpi4py import MPI
from mpi4py.util.dtlib import from_numpy_dtype
import numpy as np
from sklearn.model_selection import train_test_split
def _determine_line_starts(
f: BinaryIO, comm: MPI.Comm, displs: np.ndarray, counts: np.ndarray
) -> Tuple[list, bytes, int]:
"""
Determine line starts in bytes from CSV file.
Parameters
----------
f : BinaryIO
The handle of CSV file to read from
comm : MPI.Comm
The communicator to use.
displs : np.ndarray
The displacements of byte chunk starts on each rank, length is `comm.size`.
counts : np.ndarray
The counts of the byte chunk on each rank, length is `comm.size`.
Returns
-------
list
The line starts in each byte chunk.
bytes
The bytes chunk read in on each rank.
int
The number of chars used to indicate the line end in the file.
"""
rank = comm.rank # Set up communicator stuff.
# Read bytes chunk and count linebreaks.
# \r (Carriage Return) : Move cursor to line start w/o advancing to next line.
# \n (Line Feed) : Move cursor down to next line w/o returning to line start.
# \r\n (End Of Line) : Combination of \r and \n.
lineter_len = 1 # Set number of line termination chars.
line_starts = [] # Set up list to save line starts.
f.seek(displs[rank]) # Jump to pre-assigned position in file.
r = f.read(counts[rank]) # Read number of bytes from starting position.
for pos, l in enumerate(r): # Determine line breaks in bytes chunk.
if chr(l) == "\n": # Line terminated by '\n' only.
if not chr(r[pos - 1]) == "\r": # No \r\n.
line_starts.append(pos + 1)
elif chr(l) == "\r":
if (
pos + 1 < len(r) and chr(r[pos + 1]) == "\n"
): # Line terminated by '\r\n'.
line_starts.append(pos + 2)
lineter_len = 2
else: # Line terminated by '\r' only.
line_starts.append(pos + 1)
return line_starts, r, lineter_len
def _get_byte_pos_from_line_starts(
line_starts: np.ndarray, file_size: int, lineter_len: int
) -> np.ndarray:
"""
Get line starts and counts in byte from line starts to read lines via seek and read.
Parameters
----------
line_starts : np.ndarray
The absolute positions of line starts in bytes.
file_size : int
The absolute file size in bytes.
lineter_len : int
The number of line termination characters.
Returns
-------
np.ndarray
An array containing vectors of (start, count) for lines in byte.
"""
lines_byte = [] # list for line starts and counts
for idx in range(len(line_starts)): # Loop through all line starts.
if idx == len(line_starts) - 1: # Special case for last line.
temp = [
line_starts[idx], # Line start in bytes
file_size
- line_starts[idx]
- lineter_len, # Bytes count of line length via difference
]
else: # All other lines
temp = [
line_starts[idx], # Line start in bytes
line_starts[idx + 1]
- line_starts[idx]
- lineter_len, # Bytes count of line length via difference
]
lines_byte.append(temp)
return np.array(lines_byte)
def _decode_bytes_array(
byte_pos: np.ndarray, f: BinaryIO, sep: str = ",", encoding: str = "utf-8"
) -> List:
"""
Decode lines from byte positions and counts.
Parameters
----------
byte_pos : np.ndarray
The vectors of line starts and lengths in bytes.
f : BinaryIO
The handle of the CSV file to read from.
sep : char
The character used in the file to separate entries.
encoding : str
The encoding used to decode entries from bytes.
Returns
-------
list
Values read from CSV file as numpy array entries in float format
"""
lines = [] # List for saving decoded lines
byte_pos = byte_pos[
byte_pos[:, 0].argsort()
] # Sort line starts of data items to read from file in ascending order.
for item in byte_pos:
f.seek(item[0]) # Go to line start.
line = f.read(item[1]).decode(
encoding
) # Read specified number of bytes and decode.
if len(line) > 0:
sep_values = [
float(val) for val in line.split(sep)
] # Separate values in each line.
line = np.array(
sep_values
) # Convert list of separated values to numpy array.
lines.append(line) # Append numpy data entry to output list.
return lines
def _split_indices_train_test(
n_samples: int, train_split: float, seed: int
) -> Tuple[int, int, np.ndarray, np.ndarray]:
"""
Make index-based train test split with shuffle.
Parameters
----------
n_samples : int
The overall number of samples in the dataset.
train_split : float
The fraction of the dataset used for training (0 < `train_split` < 1)
seed : int
The seed used for the random number generator.
Returns
-------
n_train_samples : int
The number of train samples.
n_test_samples : int
The number of test samples.
train_indices : np.ndarray
The indices of the samples in the dataset used for training.
test_indices : np.ndarray
The indices of the samples in the dataset used for testing.
"""
n_train_samples = int(
train_split * n_samples
) # Determine number of train samples from split.
n_test_samples = n_samples - n_train_samples # Determine number of test samples.
rng = np.random.default_rng(
seed=seed
) # Set same seed over all ranks for consistent train-test split.
all_indices = np.arange(0, n_samples) # Construct array of all indices.
rng.shuffle(all_indices) # Shuffle them.
train_indices = all_indices[
:n_train_samples
] # First `n_train_samples` indices form train set.
test_indices = all_indices[n_train_samples:] # Remaining ones form test set.
return n_train_samples, n_test_samples, train_indices, test_indices
def load_data_csv_parallel(
path_to_data: Union[pathlib.Path, str],
header_lines: int,
comm: MPI.Comm = MPI.COMM_WORLD,
train_split: float = 0.75,
verbose: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Load data from CSV file in truly parallel fashion.
Parameters
----------
path_to_data : str | pathlib.Path
The path to the CSV file.
header_lines : int
The number of header lines.
comm : MPI.Comm
The communicator to use.
train_split : float
The train-test split fraction.
verbose : bool
The verbosity level.
Returns
-------
train_samples_local : np.ndarray
The rank-dependent train samples.
train_targets_local : np.ndarray
The rank-dependent train targets.
test_samples : np.ndarray
The global test samples.
test_targets : np.ndarray
The global test targets.
"""
rank, size = comm.rank, comm.size # Set up communicator stuff.
file_size = os.stat(path_to_data).st_size # Get file size in bytes.
# Determine displs + counts of bytes chunk to read on each rank.
base = file_size // size # Determine base chunk size for each process.
remainder = file_size % size # Determine remainder bytes.
counts = base * np.ones( # Construct array with each rank's chunk counts.
(size,), dtype=int
)
if (
remainder > 0
): # Equally distribute remainder over respective ranks to balance load.
counts[:remainder] += 1
displs = np.concatenate( # Determine displs via cumulative sum from counts.
(np.zeros((1,), dtype=int), np.cumsum(counts, dtype=int)[:-1])
)
if rank == 0:
print(f"File size is {file_size} bytes.")
if rank == 0 and verbose:
print(f"Displs {displs}, counts {counts} for reading bytes chunks from file.")
with open(path_to_data, "rb") as f: # Open csv file to read from.
# Determine line starts in bytes chunks on each rank.
line_starts, r, lineter_len = _determine_line_starts(f, comm, displs, counts)
# On rank 0, add very first line.
if rank == 0:
line_starts = [0] + line_starts
if verbose:
print(f"[{rank}/{size}]: {len(line_starts)} line starts in chunk.")
# Find correct starting point, considering header lines.
# All-gather numbers of line starts in each chunk in `total_lines` array.
total_lines = np.empty(size, dtype=int)
comm.Allgather(
[np.array(len(line_starts), dtype=int), MPI.INT], [total_lines, MPI.INT]
)
cumsum = list(np.cumsum(total_lines))
# Determine rank where actual data lines start,
# i.e. remove ranks only containing header lines.
start = next(i for i in range(size) if cumsum[i] > header_lines)
if verbose:
print(
f"[{rank}/{size}]: total_lines is {total_lines}.\n"
f"[{rank}/{size}]: cumsum is {cumsum}.\n"
f"[{rank}/{size}]: start is {start}."
)
if rank < start: # Ranks containing only header lines.
line_starts = []
if rank == start: # Rank containing header + data lines.
rem = header_lines - (0 if start == 0 else cumsum[start - 1])
line_starts = line_starts[rem:]
# Share line starts of data samples across all ranks via Allgatherv.
line_starts += displs[
rank
] # Shift line starts on each rank according to displs.
if verbose:
print(
f"[{rank}/{size}]: {len(line_starts)} line starts of shape {line_starts.shape} and type "
f"{line_starts.dtype} in local chunk: {line_starts}"
)
count_linestarts = np.array( # Determine local number of line starts.
len(line_starts), dtype=int
)
counts_linestarts = (
np.empty( # Initialize array to all-gather local numbers of line starts.
size, dtype=int
)
)
comm.Allgather([count_linestarts, MPI.INT], [counts_linestarts, MPI.INT])
n_linestarts = np.sum(
counts_linestarts
) # Determine overall number of line starts.
displs_linestarts = (
np.concatenate( # Determine displacements of line starts from counts.
(
np.zeros((1,), dtype=int),
np.cumsum(counts_linestarts, dtype=int)[:-1],
),
dtype=int,
)
)
if verbose and rank == 0:
print(
f"Overall {n_linestarts} linestarts.\n"
f"Number of linestarts in each chunk is {counts_linestarts}.\n"
f"Displs of linestarts in each chunk is {displs_linestarts}."
)
all_line_starts = (
np.empty( # Initialize array to all-gatherv line starts from all ranks.
(n_linestarts,), dtype=line_starts.dtype
)
)
if verbose and rank == 0:
print(
f"Recvbuf {all_line_starts}, {all_line_starts.shape}, {all_line_starts.dtype}."
)
comm.Allgatherv(
line_starts,
[
all_line_starts,
counts_linestarts,
displs_linestarts,
from_numpy_dtype(line_starts.dtype),
],
)
# Line starts were determined as those positions following a line end.
# But: There is no line after last line end in file.
# Thus, remove last entry from all_line_starts.
all_line_starts = all_line_starts[:-1]
if rank == 0:
print(f"After Allgatherv: All line starts: {all_line_starts}")
# Construct array with line starts and lengths in bytes.
print(
f"[{rank}/{size}]: Construct array with line starts and lengths in bytes."
)
lines_byte = _get_byte_pos_from_line_starts(
all_line_starts, file_size, lineter_len
)
# Make global train-test split.
print(f"[{rank}/{size}]: Make global train-test split.")
(
n_train_samples,
n_test_samples,
train_indices,
test_indices,
) = _split_indices_train_test(
n_samples=len(lines_byte), train_split=train_split, seed=size
)
n_train_samples_local = (
n_train_samples // size
) # Determine local train dataset size.
remainder_train = n_train_samples % size # Balance load.
if rank < remainder_train:
n_train_samples_local += 1
# Construct held-out test dataset (same on each rank).
print(f"[{rank}/{size}]: Decode {n_test_samples} test samples from file.")
test_lines = _decode_bytes_array(
lines_byte[test_indices], f, sep=",", encoding="utf-8"
)
test_samples = np.array(test_lines)[:, 1:]
test_targets = np.array(test_lines)[:, 0]
if verbose:
print_str = f"[{rank}/{size}]: Test samples: {test_samples[0]}\n"
print_str += f"Test targets: {test_targets[0]}\n"
print(print_str)
# Construct train dataset (different on each rank).
rng = np.random.default_rng(seed=rank)
print(f"[{rank}/{size}]: Draw local {n_train_samples_local} train indices.")
train_indices_local = rng.choice(train_indices, size=n_train_samples_local)
print(f"[{rank}/{size}]: Decode train lines from file.")
train_lines_local = _decode_bytes_array(
lines_byte[train_indices_local], f, sep=",", encoding="utf-8"
)
train_samples_local = np.array(train_lines_local)[:, 1:]
train_targets_local = np.array(train_lines_local)[:, 0]
# Now, each rank holds a local train set (samples + targets)
# and the global held-out test set (samples + targets).
return train_samples_local, train_targets_local, test_samples, test_targets
def load_data_csv_root(
path_to_data: Union[str, pathlib.Path],
header_lines: int,
comm: MPI.Comm,
seed: int,
train_split: float = 0.75,
verbose: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Load data from CSV file via root process.
Parameters
----------
path_to_data : str | pathlib.Path
The path to the CSV file.
header_lines : int
The number of header lines.
comm : MPI.Comm
The communicator to use.
seed : int
The seed used for train-test split.
train_split : float
The train-test split fraction.
verbose : bool
The verbosity level
Returns
-------
np.ndarray
The rank-dependent train samples.
np.ndarray
The rank-dependent train targets.
np.ndarray
The global test samples.
np.ndarray
The global test targets.
"""
rank, size = comm.rank, comm.size # Set up communicator stuff.
if rank == 0:
data = np.loadtxt(
path_to_data, dtype=float, delimiter=",", skiprows=header_lines
) # Load data into numpy array.
samples, targets = (
data[:, 1:],
data[:, 0],
) # Divide data into samples and targets.
samples_train, samples_test, targets_train, targets_test = train_test_split(
samples, targets, test_size=1 - train_split, random_state=seed
) # Perform train-test split.
n_train_samples = len(
samples_train
) # Determine number of samples in train set.
n_test_samples = len(samples_test) # Determine number of samples in test set.
n_features = samples_train.shape[1] # Determine number of features.
n_train_samples_local = (
n_train_samples // size
) # Determine rank-local number of train samples.
remainder_train = n_train_samples % size
train_counts = n_train_samples_local * np.ones(
size, dtype=int
) # Determine load-balanced counts and displacements.
for idx in range(remainder_train):
train_counts[idx] += 1
train_displs = np.concatenate(
(np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int
)
print(
f"There are {n_train_samples} train and {n_test_samples} test samples.\n"
f"Local train samples: {train_counts}"
)
# Sample `n_train_samples` indices from train set with replacement.
rng = np.random.default_rng(seed=size)
train_indices = rng.choice(range(n_train_samples), size=n_train_samples)
print(f"train_indices have shape {train_indices.shape}.")
# Construct train dataset from drawn indices (repetitions possible!).
samples_train_shuffled = samples_train[train_indices]
targets_train_shuffled = targets_train[train_indices]
send_buf_train_samples = [
samples_train_shuffled,
train_counts * n_features,
train_displs * n_features,
from_numpy_dtype(samples_train_shuffled.dtype),
]
send_buf_train_targets = [
targets_train_shuffled,
train_counts,
train_displs,
from_numpy_dtype(targets_train_shuffled.dtype),
]
else:
train_counts = None
n_features = None
n_test_samples = None
send_buf_train_samples = None
send_buf_train_targets = None
n_features = comm.bcast(n_features, root=0)
n_test_samples = comm.bcast(n_test_samples, root=0)
train_counts = comm.bcast(train_counts, root=0)
samples_train_local = np.empty((train_counts[rank], n_features), dtype=float)
targets_train_local = np.empty((train_counts[rank],), dtype=float)
recv_buf_train_samples = [
samples_train_local,
from_numpy_dtype(samples_train_local.dtype),
]
recv_buf_train_targets = [
targets_train_local,
from_numpy_dtype(targets_train_local.dtype),
]
if rank != 0:
samples_test = np.empty((n_test_samples, n_features), dtype=float)
targets_test = np.empty((n_test_samples,), dtype=float)
comm.Scatterv(send_buf_train_samples, recv_buf_train_samples, root=0)
comm.Scatterv(send_buf_train_targets, recv_buf_train_targets, root=0)
comm.Bcast(samples_test, root=0)
comm.Bcast(targets_test, root=0)
if verbose:
print_str = f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n"
print_str += (
f"Train targets after Scatterv have shape {targets_train_local.shape}.\n"
)
print_str += f"Test samples after Bcast have shape {samples_test.shape}.\n"
print_str += f"Test targets after Bcast have shape {targets_test.shape}."
print(print_str)
return samples_train_local, targets_train_local, samples_test, targets_test
"Distributed random forest in sklearn with MPI."
import argparse
import string
from typing import Tuple
import time
from mpi4py import MPI
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from dataloaders import load_data_csv_parallel, load_data_csv_root
class MPITimer:
"""A distributed context-manager enabled timer."""
def __init__(
self,
comm: MPI.Comm,
print_on_exit: bool = True,
name: str = "",
output_format: str = "Elapsed time {name}: global average {elapsed_time_average:.2g}s, "
"local {elapsed_time_local:.2g}s",
) -> None:
"""
Create a new distributed context-manager enabled timer.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
print_on_exit : bool
Whether to print the measured time in __exit__.
name : str
Label describing what this timer measured, can be used for printing the results.
output_format : str
Format string template used for printing the output. May reference all attributes of the
"""
self.comm = comm
self.output_format = output_format
self.print_on_exit = print_on_exit
self.name = name
self.start_time = None
self.end_time = None
self.elapsed_time_local = None
self.elapsed_time_average = None
def start(self) -> None:
"""Start the timer by setting the start time."""
self.start_time = time.perf_counter()
def stop(self) -> None:
"""Stop the timer by setting the end time and updating elapsed_time_local."""
self.end_time = time.perf_counter()
self.elapsed_time_local = self.end_time - self.start_time
def allreduce_for_average_time(self) -> None:
"""Compute the global average using allreduce and update elapsed_time_average."""
self.elapsed_time_average = (
self.comm.allreduce(self.elapsed_time_local, op=MPI.SUM) / self.comm.size
)
def print(self) -> None:
"""Print the elapsed time using the given template."""
template_keywords = {
key for (_, key, _, _) in string.Formatter().parse(self.output_format)
}
template_kwargs = {
key: value for key, value in vars(self).items() if key in template_keywords
}
print(self.output_format.format(**template_kwargs))
def __enter__(self) -> "MPITimer":
"""
Called on entering this context (e.g. with a 'with' statement), starts the timer.
Returns
-------
MPITimer
This timer object.
"""
self.start()
return self
def __exit__(self, *args) -> None:
"""
Called on exiting this context (e.g. after a 'with' statement), stops the timer, computes the global average
and optionally prints the result on rank 0.
Parameters
----------
args :
Unused, only to fulfill __exit__ interface.
"""
self.stop()
self.allreduce_for_average_time()
if self.print_on_exit and self.comm.rank == 0:
self.print()
class DistributedRandomForest:
"""
Distributed random forest class.
Attributes
----------
acc_global : None | float
The global accuracy.
acc_local : None | float
Each local trained model's accuracy.
clf : None | RandomForestClassifier
The sklearn classifier.
comm : MPI.Comm
The communicator.
n_trees_base : int
The base number of rank-local trees.
n_trees_global : int
The number of trees in the global forest.
n_trees_local : int
The number of local trees.
n_trees_remainder : int
The remaining number of trees to distribute.
random_state : int
The base random state for sklearn random forest.
Methods
-------
_distribute_trees()
Distribute trees evenly over all processors.
_train_local_classifier()
Train local random forest classifier.
_predict_locally()
Get predictions of all sub estimators in random forest.
_get_predicted_class_hist()
Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.
_calc_majority_vote_hist()
Calculate majority vote from sample-wise histograms of predicted classes.
train()
Train random forest in parallel.
test()
Test trained distributed global random forest.
"""
def __init__(
self,
n_trees_global: int,
comm: MPI.Comm,
random_state: int,
) -> None:
"""
Initialize distributed random forest object.
Parameters
----------
n_trees_global : int
The number of trees in the global forest.
comm : MPI.Comm
The communicator to use.
random_state : int
The base random state for sklearn RF.
"""
self.n_trees_global = n_trees_global
self.comm = comm
(
self.n_trees_base,
self.n_trees_remainder,
self.n_trees_local,
) = self._distribute_trees()
self.random_state = random_state + self.comm.rank
self.clf = None # local random forest classifier
self.acc_global = None # accuracy of global classifier
self.acc_local = None # accuracy of each rank-local classifier
def _distribute_trees(self) -> Tuple[int, int, int]:
"""
Distribute trees evenly over all processors.
Returns
-------
int
The base number of rank-local trees.
int
The remaining number of trees to distribute.
int
The final number of rank-local trees.
"""
size, rank = self.comm.size, self.comm.rank
n_trees_base = self.n_trees_global // size # base number of rank-local trees
n_trees_remainder = (
self.n_trees_global % size
) # remaining number of trees to distribute
n_trees_local = (
n_trees_base + 1 if rank < n_trees_remainder else n_trees_base
) # final number of local trees
return n_trees_base, n_trees_remainder, n_trees_local
def _train_local_classifier(
self, train_samples: np.ndarray, train_targets: np.ndarray
) -> RandomForestClassifier:
"""
Train local random forest classifier.
Params
------
train_samples : numpy.ndarray
samples of train dataset
train_targets : numpy.ndarray
targets of train dataset
Returns
-------
sklearn.ensemble.RandomForestClassifier
trained model
"""
clf = RandomForestClassifier(
n_estimators=self.n_trees_local, random_state=self.random_state
)
_ = clf.fit(train_samples, train_targets)
return clf
def _predict_locally(self, samples: np.ndarray) -> np.ndarray:
"""
Get predictions of all sub estimators in random forest.
Parameters
----------
samples : numpy.ndarray
The samples whose class to predict.
Returns
-------
numpy.ndarray
The predictions of each sub estimator on the samples.
"""
return np.array(
[tree.predict(samples) for tree in self.clf.estimators_], dtype="H"
)
def _get_predicted_class_hist(
self, tree_wise_predictions: np.ndarray, n_classes: int
) -> np.ndarray:
"""
Calculate global sample-wise distributions of predicted classes from rank-local tree-wise predictions.
Parameters
----------
tree_wise_predictions : numpy.ndarray
The tree-wise predictions.
n_classes : int
The number of classes in the dataset.
Returns
-------
numpy.ndarray
The sample-wise distributions of the predicted classes.
"""
sample_wise_predictions = tree_wise_predictions.transpose()
predicted_class_hists_local = np.array(
[
np.bincount(sample_pred, minlength=n_classes)
for sample_pred in sample_wise_predictions
]
)
# The hist arrays have `n_test_samples` entries with `n_classes` elements each.
# The local hist holds the class prediction distribution for each test sample over the local forest.
# Now we want to sum up those sample-wise distributions to obtain the global hist over the global forest.
# From this global hist, we can determine the global majority vote for each test sample.
predicted_class_hists_global = np.zeros_like(predicted_class_hists_local)
self.comm.Allreduce(predicted_class_hists_local, predicted_class_hists_global)
return predicted_class_hists_global
@staticmethod
def _calc_majority_vote_hist(predicted_class_hists: np.ndarray) -> np.ndarray:
"""
Calculate majority vote from sample-wise histograms of predicted classes.
Parameters
----------
predicted_class_hists : numpy.ndarray
sample-wise histograms of predicted classes
Returns
-------
numpy.ndarray
majority votes for all samples in histogram input
"""
return np.array(
[np.argmax(sample_hist) for sample_hist in predicted_class_hists]
)
def train(
self,
train_samples: np.ndarray,
train_targets: np.ndarray,
) -> None:
"""
Train random forest in parallel.
Parameters
----------
train_samples : numpy.ndarray
rank-local train samples
train_targets : numpy.ndarray
corresponding train targets
"""
# Set up communicator.
rank, size = self.comm.rank, self.comm.size
# Set up and train local forest.
print(
f"[{rank}/{size}]: Set up and train local random forest with "
f"{self.n_trees_local} trees and random state {self.random_state}."
)
self.clf = self._train_local_classifier(
train_samples=train_samples,
train_targets=train_targets,
)
def test(
self,
test_samples: np.ndarray,
test_targets: np.ndarray,
n_classes: int,
) -> None:
"""
Test trained distributed global random forest.
Parameters
----------
test_samples : numpy.ndarray
The rank-local test samples.
test_targets : numpy.ndarray
The corresponding test targets.
n_classes : int
The number of classes in the dataset.
"""
rank, size = self.comm.rank, self.comm.size
# Get class predictions of sub estimators in each forest.
print(f"[{rank}/{size}]: Get predictions of individual sub estimators.")
tree_predictions_local = self._predict_locally(test_samples)
print(f"[{rank}/{size}]: Calculate majority vote via histograms.")
predicted_class_hists = self._get_predicted_class_hist(
tree_predictions_local, n_classes
)
majority_vote = self._calc_majority_vote_hist(predicted_class_hists)
# Calculate accuracies.
self.acc_global = (test_targets == majority_vote).mean()
self.acc_local = self.clf.score(test_samples, test_targets)
print(
f"[{rank}/{size}]: Local accuracy is {self.acc_local}, global accuracy is {self.acc_global}."
)
if __name__ == "__main__":
# SETTINGS
comm = MPI.COMM_WORLD # Set up communicator.
rank, size = comm.rank, comm.size
if rank == 0:
print(
"######################################################\n"
"# Distributed Random Forest in Scikit-Learn with MPI #\n"
"######################################################\n"
)
n_classes = 2
header_lines = 0 # Data file specifications
sep = ","
encoding = "utf-8"
path_to_data = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv"
parser = argparse.ArgumentParser(prog="Distributed Random Forest")
parser.add_argument(
"--dataloader",
type=str,
choices=["root", "parallel"],
default="root",
help="The dataloader to use.",
)
parser.add_argument(
"--num_trees",
type=int,
default=100,
help="The global number of trees.",
)
parser.add_argument(
"--train_split",
type=float,
default=0.75,
help="The fraction of the dataset used for training.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="The verbosity.",
)
args = parser.parse_args()
seed = size
# -------------- Load data. --------------
print(f"[{rank}/{size}]: Loading data...")
with MPITimer(comm, name="data loading"):
if args.dataloader == "parallel":
if rank == 0:
print("Using truly parallel dataloader...")
(
train_samples_local,
train_targets_local,
test_samples,
test_targets,
) = load_data_csv_parallel(
path_to_data, header_lines, comm, args.train_split, args.verbose
)
elif args.dataloader == "root":
if rank == 0:
print("Using root-based dataloader with Scatterv...")
(
train_samples_local,
train_targets_local,
test_samples,
test_targets,
) = load_data_csv_root(
path_to_data, header_lines, comm, seed, args.train_split, args.verbose
)
else:
raise ValueError
print(
f"[{rank}/{size}]: DONE.\n"
f"Local train samples and targets have shapes {train_samples_local.shape} and {train_targets_local.shape}.\n"
f"Global test samples and targets have shapes {test_samples.shape} and {test_targets.shape}.\n"
f"Labels are {train_targets_local}"
)
# -------------- Set up random forest. --------------
with MPITimer(comm, name="forest creation"):
distributed_random_forest = DistributedRandomForest(
n_trees_global=args.num_trees,
comm=comm,
random_state=seed,
)
# -------------- Train random forest. --------------
with MPITimer(comm, name="training"):
distributed_random_forest.train(
train_samples_local,
train_targets_local,
)
# -------------- Evaluate random forest. --------------
print(f"[{rank}/{size}]: Evaluate random forest.")
with MPITimer(comm, name="test"): # Test trained model on test data.
distributed_random_forest.test(
test_samples,
test_targets,
n_classes,
)
"""Serial random forest in sklearn."""
import argparse
import pathlib
from typing import Union
import time
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
def load_data_csv(
path_to_data: Union[str, pathlib.Path],
train_split: float = 0.75,
seed: int = 42,
):
"""
Load data from CSV file via root process.
Params
------
path_to_data : str
The path to .csv file.
train_split : float
The train-test split fraction.
seed : int
The seed used for train-test split.
Returns
-------
np.ndarray
The train samples.
np.ndarray
The train targets.
np.ndarray
The test samples.
nup.ndarray
The test targets.
"""
# Load data into numpy array.
data = np.loadtxt(path_to_data, dtype=float, delimiter=",")
# Divide data into samples and targets.
samples = data[:, 1:]
targets = data[:, 0]
# Perform train-test split.
samples_train, samples_test, targets_train, targets_test = train_test_split(
samples, targets, test_size=1 - train_split, random_state=seed
)
return samples_train, targets_train, samples_test, targets_test
if __name__ == "__main__":
print(
"########################################\n"
"# Serial Random Forest in Scikit-Learn #\n"
"########################################\n"
)
parser = argparse.ArgumentParser(prog="Serial Random Forest")
parser.add_argument(
"--num_trees",
type=int,
default=100,
help="The global number of trees.",
)
parser.add_argument(
"--train_split",
type=float,
default=0.75,
help="The fraction of the dataset used for training.",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="The random seed.",
)
args = parser.parse_args()
path_to_data = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv"
print("Loading data...")
start_load = time.perf_counter()
(
train_samples,
train_targets,
test_samples,
test_targets,
) = load_data_csv(path_to_data, args.train_split, args.seed)
elapsed_load = time.perf_counter() - start_load
print(
"DONE.\n"
f"Train samples and targets have shapes {train_samples.shape} and {train_targets.shape}.\n"
f"First ten elements are: {train_samples[:10]} and {train_targets[:10]}\n"
f"Test samples and targets have shapes {test_samples.shape} and {test_targets.shape}.\n"
f"First ten elements are: {test_samples[:10]} and {test_targets[:10]}\n"
f"Time for data loading is {elapsed_load} s.\n"
"Set up classifier."
)
classifier = RandomForestClassifier(
n_estimators=args.num_trees, random_state=args.seed
)
start_train = time.perf_counter()
print("Train.")
_ = classifier.fit(train_samples, train_targets)
acc = classifier.score(test_samples, test_targets)
elapsed_train = time.perf_counter() - start_train
print(f"Time for training is {elapsed_train} s.\n Accuracy is {acc}.")
"""Test dataloaders."""
import argparse
from mpi4py import MPI
from dataloaders import load_data_csv_parallel, load_data_csv_root
from distributed_forest import MPITimer
if __name__ == "__main__":
# Set up communicator.
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
# Data file specifications
header_lines = 0
sep = ","
encoding = "utf-8"
path_to_data = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY.csv"
parser = argparse.ArgumentParser(prog="Distributed Random Forest")
parser.add_argument(
"--train_split",
type=float,
default=0.75,
help="The fraction of the dataset used for training.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="The verbosity.",
)
args = parser.parse_args()
seed = size
print(f"[{rank}/{size}]: Loading data using truly parallel dataloader...")
with MPITimer(comm, name="truly parallel data loading"):
(
train_samples_local_par,
train_targets_local_par,
test_samples_par,
test_targets_par,
) = load_data_csv_parallel(
path_to_data, header_lines, comm, args.train_split, args.verbose
)
print(f"[{rank}/{size}]: Loading data using root-based dataloader...")
with MPITimer(comm, name="root-based data loading"):
(
train_samples_local_root,
train_targets_local_root,
test_samples_root,
test_targets_root,
) = load_data_csv_root(
path_to_data, header_lines, comm, seed, args.train_split, args.verbose
)
print(
f"[{rank}/{size}]: DONE.\n"
f"Parallel: Local train samples / targets have shapes "
f"{train_samples_local_par.shape} / {train_targets_local_par.shape}.\n"
f"Parallel: Global test samples / targets have shapes {test_samples_par.shape} / {test_targets_par.shape}.\n"
f"Root: Local train samples / targets have shapes "
f"{train_samples_local_root.shape} / {train_targets_local_root.shape}.\n"
f"Root: Global test samples / targets have shapes {test_samples_root.shape} / {test_targets_root.shape}."
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment