Skip to content
Snippets Groups Projects
Commit 95eaea80 authored by Marie Weiel's avatar Marie Weiel :zap:
Browse files

rename variables

parent a72777b7
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ from sklearn.model_selection import train_test_split ...@@ -23,7 +23,7 @@ from sklearn.model_selection import train_test_split
def _determine_line_starts( def _determine_line_starts(
f: BinaryIO, comm: MPI.Comm, displs: np.ndarray, counts: np.ndarray f: BinaryIO, comm: MPI.Comm, byte_displs: np.ndarray, byte_counts: np.ndarray
) -> Tuple[list, bytes, int]: ) -> Tuple[list, bytes, int]:
""" """
Determine line starts in bytes from CSV file. Determine line starts in bytes from CSV file.
...@@ -34,9 +34,9 @@ def _determine_line_starts( ...@@ -34,9 +34,9 @@ def _determine_line_starts(
The handle of CSV file to read from The handle of CSV file to read from
comm : MPI.Comm comm : MPI.Comm
The communicator to use. The communicator to use.
displs : np.ndarray byte_displs : np.ndarray
The displacements of byte chunk starts on each rank, length is `comm.size`. The displacements of byte chunk starts on each rank, length is `comm.size`.
counts : np.ndarray byte_counts : np.ndarray
The counts of the byte chunk on each rank, length is `comm.size`. The counts of the byte chunk on each rank, length is `comm.size`.
Returns Returns
...@@ -57,22 +57,22 @@ def _determine_line_starts( ...@@ -57,22 +57,22 @@ def _determine_line_starts(
lineter_len = 1 # Set number of line termination chars. lineter_len = 1 # Set number of line termination chars.
line_starts = [] # Set up list to save line starts. line_starts = [] # Set up list to save line starts.
f.seek(displs[rank]) # Jump to pre-assigned position in file. f.seek(byte_displs[rank]) # Jump to pre-assigned position in file.
r = f.read(counts[rank]) # Read number of bytes from starting position. bytes_chunk = f.read(byte_counts[rank]) # Read number of bytes from starting position.
for pos, l in enumerate(r): # Determine line breaks in bytes chunk. for pos, l in enumerate(bytes_chunk): # Determine line breaks in bytes chunk.
if chr(l) == "\n": # Line terminated by '\n' only. if chr(l) == "\n": # Line terminated by '\n' only.
if not chr(r[pos - 1]) == "\r": # No \r\n. if not chr(bytes_chunk[pos - 1]) == "\r": # No \r\n.
line_starts.append(pos + 1) line_starts.append(pos + 1)
elif chr(l) == "\r": elif chr(l) == "\r":
if ( if (
pos + 1 < len(r) and chr(r[pos + 1]) == "\n" pos + 1 < len(bytes_chunk) and chr(bytes_chunk[pos + 1]) == "\n"
): # Line terminated by '\r\n'. ): # Line terminated by '\r\n'.
line_starts.append(pos + 2) line_starts.append(pos + 2)
lineter_len = 2 lineter_len = 2
else: # Line terminated by '\r' only. else: # Line terminated by '\r' only.
line_starts.append(pos + 1) line_starts.append(pos + 1)
return line_starts, r, lineter_len return line_starts, bytes_chunk, lineter_len
def _get_byte_pos_from_line_starts( def _get_byte_pos_from_line_starts(
...@@ -95,7 +95,7 @@ def _get_byte_pos_from_line_starts( ...@@ -95,7 +95,7 @@ def _get_byte_pos_from_line_starts(
np.ndarray np.ndarray
An array containing vectors of (start, count) for lines in byte. An array containing vectors of (start, count) for lines in byte.
""" """
lines_byte = [] # list for line starts and counts lines_byte = [] # List for line starts and counts
for idx in range(len(line_starts)): # Loop through all line starts. for idx in range(len(line_starts)): # Loop through all line starts.
if idx == len(line_starts) - 1: # Special case for last line. if idx == len(line_starts) - 1: # Special case for last line.
temp = [ temp = [
...@@ -236,127 +236,123 @@ def load_data_csv_parallel( ...@@ -236,127 +236,123 @@ def load_data_csv_parallel(
rank, size = comm.rank, comm.size # Set up communicator stuff. rank, size = comm.rank, comm.size # Set up communicator stuff.
file_size = os.stat(path_to_data).st_size # Get file size in bytes. file_size = os.stat(path_to_data).st_size # Get file size in bytes.
# Determine displs + counts of bytes chunk to read on each rank. # Determine displacements + counts of bytes chunk to read on each rank.
base = file_size // size # Determine base chunk size for each process. n_bytes_local = file_size // size # Determine number of bytes to read for each process.
remainder = file_size % size # Determine remainder bytes. remainder_bytes = file_size % size # Determine remainder bytes.
counts = base * np.ones( # Construct array with each rank's chunk counts. byte_counts = n_bytes_local * np.ones( # Construct array with each rank's chunk counts.
(size,), dtype=int (size,), dtype=int
) )
if ( if (
remainder > 0 remainder_bytes > 0
): # Equally distribute remainder over respective ranks to balance load. ): # Equally distribute remainder over respective ranks to balance load.
counts[:remainder] += 1 byte_counts[:remainder_bytes] += 1
displs = np.concatenate( # Determine displs via cumulative sum from counts. byte_displs = np.concatenate( # Determine displs via cumulative sum from counts.
(np.zeros((1,), dtype=int), np.cumsum(counts, dtype=int)[:-1]) (np.zeros((1,), dtype=int), np.cumsum(byte_counts, dtype=int)[:-1])
) )
if rank == 0: if rank == 0:
print(f"File size is {file_size} bytes.") print(f"File size is {file_size} bytes.")
if rank == 0 and verbose: if rank == 0 and verbose:
print(f"Displs {displs}, counts {counts} for reading bytes chunks from file.") print(f"Displs {byte_displs}, counts {byte_counts} for reading bytes chunks from file.")
with open(path_to_data, "rb") as f: # Open csv file to read from. with open(path_to_data, "rb") as f: # Open csv file to read from.
# Determine line starts in bytes chunks on each rank. # Determine line starts in bytes chunks on each rank.
line_starts, r, lineter_len = _determine_line_starts(f, comm, displs, counts) line_starts_local, byte_chunk, lineter_len = _determine_line_starts(f, comm, byte_displs, byte_counts)
# On rank 0, add very first line. # On rank 0, add very first line.
if rank == 0: if rank == 0:
line_starts = [0] + line_starts line_starts_local = [0] + line_starts_local
if verbose: if verbose:
print(f"[{rank}/{size}]: {len(line_starts)} line starts in chunk.") print(f"[{rank}/{size}]: {len(line_starts_local)} line starts in chunk.")
# Find correct starting point, considering header lines. # Find correct starting point, considering header lines.
# All-gather numbers of line starts in each chunk in `total_lines` array. # All-gather numbers of line starts in each chunk.
total_lines = np.empty(size, dtype=int) counts_linestarts = np.empty(size, dtype=int)
comm.Allgather( comm.Allgather(
[np.array(len(line_starts), dtype=int), MPI.INT], [total_lines, MPI.INT] [np.array(len(line_starts_local), dtype=int), MPI.INT], [counts_linestarts, MPI.INT]
) )
cumsum = list(np.cumsum(total_lines)) # Determine rank where actual data lines start, i.e., remove ranks only containing header lines.
# Determine rank where actual data lines start, cumsum_linestarts = list(np.cumsum(counts_linestarts))
# i.e. remove ranks only containing header lines. start_rank = next(i for i in range(size) if cumsum_linestarts[i] > header_lines)
start = next(i for i in range(size) if cumsum[i] > header_lines)
if verbose: if verbose:
print( print(
f"[{rank}/{size}]: total_lines is {total_lines}.\n" f"[{rank}/{size}]: There are {counts_linestarts} line starts overall.\n"
f"[{rank}/{size}]: cumsum is {cumsum}.\n" f"[{rank}/{size}]: Their cumsum is {cumsum_linestarts}.\n"
f"[{rank}/{size}]: start is {start}." f"[{rank}/{size}]: The start rank is {start_rank}."
) )
if rank < start: # Ranks containing only header lines. if rank < start_rank: # Ranks containing only header lines.
line_starts = [] line_starts_local = []
if rank == start: # Rank containing header + data lines. if rank == start_rank: # Rank containing header + data lines.
rem = header_lines - (0 if start == 0 else cumsum[start - 1]) lines_to_remove = header_lines - (0 if start_rank == 0 else cumsum_linestarts[start_rank - 1])
line_starts = line_starts[rem:] line_starts_local = line_starts_local[lines_to_remove:]
# Share line starts of data samples across all ranks via Allgatherv. # Share line starts of data samples across all ranks via Allgatherv.
line_starts += displs[ line_starts_local += byte_displs[
rank rank
] # Shift line starts on each rank according to displs. ] # Shift line starts on each rank according to displs.
if verbose: if verbose:
print( print(
f"[{rank}/{size}]: {len(line_starts)} line starts of shape {line_starts.shape} and type " f"[{rank}/{size}]: {len(line_starts_local)} line starts of shape {line_starts_local.shape} and type "
f"{line_starts.dtype} in local chunk: {line_starts}" f"{line_starts_local.dtype} in local chunk: {line_starts_local}"
) )
count_linestarts = np.array( # Determine local number of line starts. counts_linestarts_corrected = (
len(line_starts), dtype=int np.empty( # Initialize array to all-gather local numbers of line starts (corrected for header lines).
)
counts_linestarts = (
np.empty( # Initialize array to all-gather local numbers of line starts.
size, dtype=int size, dtype=int
) )
) )
comm.Allgather([count_linestarts, MPI.INT], [counts_linestarts, MPI.INT]) comm.Allgather(
n_linestarts = np.sum( [np.array(len(line_starts_local), dtype=int), MPI.INT],
counts_linestarts [counts_linestarts_corrected, MPI.INT]
) # Determine overall number of line starts. )
displs_linestarts = ( displs_linestarts_corrected = (
np.concatenate( # Determine displacements of line starts from counts. np.concatenate( # Determine displacements of line starts from counts.
( (
np.zeros((1,), dtype=int), np.zeros((1,), dtype=int),
np.cumsum(counts_linestarts, dtype=int)[:-1], np.cumsum(counts_linestarts_corrected, dtype=int)[:-1],
), ),
dtype=int, dtype=int,
) )
) )
if verbose and rank == 0: if verbose and rank == 0:
print( print(
f"Overall {n_linestarts} linestarts.\n" f"Overall {np.sum(counts_linestarts_corrected)} linestarts.\n"
f"Number of linestarts in each chunk is {counts_linestarts}.\n" f"Number of linestarts in each chunk is {counts_linestarts_corrected}.\n"
f"Displs of linestarts in each chunk is {displs_linestarts}." f"Displs of linestarts in each chunk is {displs_linestarts_corrected}."
) )
all_line_starts = ( line_starts_global = (
np.empty( # Initialize array to all-gatherv line starts from all ranks. np.empty( # Initialize array to all-gatherv line starts from all ranks.
(n_linestarts,), dtype=line_starts.dtype (np.sum(counts_linestarts_corrected),), dtype=line_starts_local.dtype
) )
) )
if verbose and rank == 0: if verbose and rank == 0:
print( print(
f"Recvbuf {all_line_starts}, {all_line_starts.shape}, {all_line_starts.dtype}." f"Recvbuf {line_starts_global}, {line_starts_global.shape}, {line_starts_global.dtype}."
) )
comm.Allgatherv( comm.Allgatherv(
line_starts, line_starts_local,
[ [
all_line_starts, line_starts_global,
counts_linestarts, counts_linestarts_corrected,
displs_linestarts, displs_linestarts_corrected,
from_numpy_dtype(line_starts.dtype), from_numpy_dtype(line_starts_local.dtype),
], ],
) )
# Line starts were determined as those positions following a line end. # Line starts were determined as those positions following a line end.
# But: There is no line after last line end in file. # But: There is no line after last line end in file.
# Thus, remove last entry from all_line_starts. # Thus, remove last entry from all_line_starts.
all_line_starts = all_line_starts[:-1] line_starts_global = line_starts_global[:-1]
if rank == 0: if rank == 0:
print(f"After Allgatherv: All line starts: {all_line_starts}") print(f"After Allgatherv: All line starts: {line_starts_global}")
# Construct array with line starts and lengths in bytes. # Construct array with line starts and lengths in bytes.
print( print(
f"[{rank}/{size}]: Construct array with line starts and lengths in bytes." f"[{rank}/{size}]: Construct array with line starts and lengths in bytes."
) )
lines_byte = _get_byte_pos_from_line_starts( lines_in_byte = _get_byte_pos_from_line_starts(
all_line_starts, file_size, lineter_len line_starts_global, file_size, lineter_len
) )
# Make global train-test split. # Make global train-test split.
...@@ -367,7 +363,7 @@ def load_data_csv_parallel( ...@@ -367,7 +363,7 @@ def load_data_csv_parallel(
train_indices, train_indices,
test_indices, test_indices,
) = _split_indices_train_test( ) = _split_indices_train_test(
n_samples=len(lines_byte), train_split=train_split, seed=size n_samples=len(lines_in_byte), train_split=train_split, seed=size
) )
n_train_samples_local = ( n_train_samples_local = (
...@@ -380,14 +376,15 @@ def load_data_csv_parallel( ...@@ -380,14 +376,15 @@ def load_data_csv_parallel(
# Construct held-out test dataset (same on each rank). # Construct held-out test dataset (same on each rank).
print(f"[{rank}/{size}]: Decode {n_test_samples} test samples from file.") print(f"[{rank}/{size}]: Decode {n_test_samples} test samples from file.")
test_lines = _decode_bytes_array( test_lines = _decode_bytes_array(
lines_byte[test_indices], f, sep=",", encoding="utf-8" lines_in_byte[test_indices], f, sep=",", encoding="utf-8"
) )
test_samples = np.array(test_lines)[:, 1:] test_samples = np.array(test_lines)[:, 1:]
test_targets = np.array(test_lines)[:, 0] test_targets = np.array(test_lines)[:, 0]
if verbose: if verbose:
print_str = f"[{rank}/{size}]: Test samples: {test_samples[0]}\n" print(
print_str += f"Test targets: {test_targets[0]}\n" f"[{rank}/{size}]: Test samples: {test_samples[0]}\n"
print(print_str) f"Test targets: {test_targets[0]}\n"
)
# Construct train dataset (different on each rank). # Construct train dataset (different on each rank).
rng = np.random.default_rng(seed=rank) rng = np.random.default_rng(seed=rank)
...@@ -395,7 +392,7 @@ def load_data_csv_parallel( ...@@ -395,7 +392,7 @@ def load_data_csv_parallel(
train_indices_local = rng.choice(train_indices, size=n_train_samples_local) train_indices_local = rng.choice(train_indices, size=n_train_samples_local)
print(f"[{rank}/{size}]: Decode train lines from file.") print(f"[{rank}/{size}]: Decode train lines from file.")
train_lines_local = _decode_bytes_array( train_lines_local = _decode_bytes_array(
lines_byte[train_indices_local], f, sep=",", encoding="utf-8" lines_in_byte[train_indices_local], f, sep=",", encoding="utf-8"
) )
train_samples_local = np.array(train_lines_local)[:, 1:] train_samples_local = np.array(train_lines_local)[:, 1:]
train_targets_local = np.array(train_lines_local)[:, 0] train_targets_local = np.array(train_lines_local)[:, 0]
...@@ -467,8 +464,7 @@ def load_data_csv_root( ...@@ -467,8 +464,7 @@ def load_data_csv_root(
train_counts = n_train_samples_local * np.ones( train_counts = n_train_samples_local * np.ones(
size, dtype=int size, dtype=int
) # Determine load-balanced counts and displacements. ) # Determine load-balanced counts and displacements.
for idx in range(remainder_train): train_counts[:remainder_train] += 1
train_counts[idx] += 1
train_displs = np.concatenate( train_displs = np.concatenate(
(np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int (np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int
) )
...@@ -497,7 +493,7 @@ def load_data_csv_root( ...@@ -497,7 +493,7 @@ def load_data_csv_root(
] ]
else: else:
train_counts = None train_counts = np.empty(size, dtype=int)
n_features = None n_features = None
n_test_samples = None n_test_samples = None
send_buf_train_samples = None send_buf_train_samples = None
...@@ -505,7 +501,7 @@ def load_data_csv_root( ...@@ -505,7 +501,7 @@ def load_data_csv_root(
n_features = comm.bcast(n_features, root=0) n_features = comm.bcast(n_features, root=0)
n_test_samples = comm.bcast(n_test_samples, root=0) n_test_samples = comm.bcast(n_test_samples, root=0)
train_counts = comm.bcast(train_counts, root=0) comm.Bcast(train_counts, root=0)
samples_train_local = np.empty((train_counts[rank], n_features), dtype=float) samples_train_local = np.empty((train_counts[rank], n_features), dtype=float)
targets_train_local = np.empty((train_counts[rank],), dtype=float) targets_train_local = np.empty((train_counts[rank],), dtype=float)
recv_buf_train_samples = [ recv_buf_train_samples = [
...@@ -524,11 +520,10 @@ def load_data_csv_root( ...@@ -524,11 +520,10 @@ def load_data_csv_root(
comm.Bcast(samples_test, root=0) comm.Bcast(samples_test, root=0)
comm.Bcast(targets_test, root=0) comm.Bcast(targets_test, root=0)
if verbose: if verbose:
print_str = f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n" print(
print_str += ( f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n"
f"Train targets after Scatterv have shape {targets_train_local.shape}.\n" f"Train targets after Scatterv have shape {targets_train_local.shape}.\n"
f"Test samples after Bcast have shape {samples_test.shape}.\n"
f"Test targets after Bcast have shape {targets_test.shape}."
) )
print_str += f"Test samples after Bcast have shape {samples_test.shape}.\n"
print_str += f"Test targets after Bcast have shape {targets_test.shape}."
print(print_str)
return samples_train_local, targets_train_local, samples_test, targets_test return samples_train_local, targets_train_local, samples_test, targets_test
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment