diff --git a/3_ensembles/solutions/py/dataloaders.py b/3_ensembles/solutions/py/dataloaders.py index cfca140a0bc10b85d05d2d9eefb17340f21c04a0..8b2994f109c1bc8d5dc4d68e075da12efa2c1bb4 100644 --- a/3_ensembles/solutions/py/dataloaders.py +++ b/3_ensembles/solutions/py/dataloaders.py @@ -23,7 +23,7 @@ from sklearn.model_selection import train_test_split def _determine_line_starts( - f: BinaryIO, comm: MPI.Comm, displs: np.ndarray, counts: np.ndarray + f: BinaryIO, comm: MPI.Comm, byte_displs: np.ndarray, byte_counts: np.ndarray ) -> Tuple[list, bytes, int]: """ Determine line starts in bytes from CSV file. @@ -34,9 +34,9 @@ def _determine_line_starts( The handle of CSV file to read from comm : MPI.Comm The communicator to use. - displs : np.ndarray + byte_displs : np.ndarray The displacements of byte chunk starts on each rank, length is `comm.size`. - counts : np.ndarray + byte_counts : np.ndarray The counts of the byte chunk on each rank, length is `comm.size`. Returns @@ -57,22 +57,22 @@ def _determine_line_starts( lineter_len = 1 # Set number of line termination chars. line_starts = [] # Set up list to save line starts. - f.seek(displs[rank]) # Jump to pre-assigned position in file. - r = f.read(counts[rank]) # Read number of bytes from starting position. + f.seek(byte_displs[rank]) # Jump to pre-assigned position in file. + bytes_chunk = f.read(byte_counts[rank]) # Read number of bytes from starting position. - for pos, l in enumerate(r): # Determine line breaks in bytes chunk. + for pos, l in enumerate(bytes_chunk): # Determine line breaks in bytes chunk. if chr(l) == "\n": # Line terminated by '\n' only. - if not chr(r[pos - 1]) == "\r": # No \r\n. + if not chr(bytes_chunk[pos - 1]) == "\r": # No \r\n. line_starts.append(pos + 1) elif chr(l) == "\r": if ( - pos + 1 < len(r) and chr(r[pos + 1]) == "\n" + pos + 1 < len(bytes_chunk) and chr(bytes_chunk[pos + 1]) == "\n" ): # Line terminated by '\r\n'. line_starts.append(pos + 2) lineter_len = 2 else: # Line terminated by '\r' only. line_starts.append(pos + 1) - return line_starts, r, lineter_len + return line_starts, bytes_chunk, lineter_len def _get_byte_pos_from_line_starts( @@ -95,7 +95,7 @@ def _get_byte_pos_from_line_starts( np.ndarray An array containing vectors of (start, count) for lines in byte. """ - lines_byte = [] # list for line starts and counts + lines_byte = [] # List for line starts and counts for idx in range(len(line_starts)): # Loop through all line starts. if idx == len(line_starts) - 1: # Special case for last line. temp = [ @@ -236,127 +236,123 @@ def load_data_csv_parallel( rank, size = comm.rank, comm.size # Set up communicator stuff. file_size = os.stat(path_to_data).st_size # Get file size in bytes. - # Determine displs + counts of bytes chunk to read on each rank. - base = file_size // size # Determine base chunk size for each process. - remainder = file_size % size # Determine remainder bytes. - counts = base * np.ones( # Construct array with each rank's chunk counts. + # Determine displacements + counts of bytes chunk to read on each rank. + n_bytes_local = file_size // size # Determine number of bytes to read for each process. + remainder_bytes = file_size % size # Determine remainder bytes. + byte_counts = n_bytes_local * np.ones( # Construct array with each rank's chunk counts. (size,), dtype=int ) if ( - remainder > 0 + remainder_bytes > 0 ): # Equally distribute remainder over respective ranks to balance load. - counts[:remainder] += 1 - displs = np.concatenate( # Determine displs via cumulative sum from counts. - (np.zeros((1,), dtype=int), np.cumsum(counts, dtype=int)[:-1]) + byte_counts[:remainder_bytes] += 1 + byte_displs = np.concatenate( # Determine displs via cumulative sum from counts. + (np.zeros((1,), dtype=int), np.cumsum(byte_counts, dtype=int)[:-1]) ) if rank == 0: print(f"File size is {file_size} bytes.") if rank == 0 and verbose: - print(f"Displs {displs}, counts {counts} for reading bytes chunks from file.") + print(f"Displs {byte_displs}, counts {byte_counts} for reading bytes chunks from file.") with open(path_to_data, "rb") as f: # Open csv file to read from. # Determine line starts in bytes chunks on each rank. - line_starts, r, lineter_len = _determine_line_starts(f, comm, displs, counts) + line_starts_local, byte_chunk, lineter_len = _determine_line_starts(f, comm, byte_displs, byte_counts) # On rank 0, add very first line. if rank == 0: - line_starts = [0] + line_starts + line_starts_local = [0] + line_starts_local if verbose: - print(f"[{rank}/{size}]: {len(line_starts)} line starts in chunk.") + print(f"[{rank}/{size}]: {len(line_starts_local)} line starts in chunk.") # Find correct starting point, considering header lines. - # All-gather numbers of line starts in each chunk in `total_lines` array. - total_lines = np.empty(size, dtype=int) + # All-gather numbers of line starts in each chunk. + counts_linestarts = np.empty(size, dtype=int) comm.Allgather( - [np.array(len(line_starts), dtype=int), MPI.INT], [total_lines, MPI.INT] + [np.array(len(line_starts_local), dtype=int), MPI.INT], [counts_linestarts, MPI.INT] ) - cumsum = list(np.cumsum(total_lines)) - # Determine rank where actual data lines start, - # i.e. remove ranks only containing header lines. - start = next(i for i in range(size) if cumsum[i] > header_lines) + # Determine rank where actual data lines start, i.e., remove ranks only containing header lines. + cumsum_linestarts = list(np.cumsum(counts_linestarts)) + start_rank = next(i for i in range(size) if cumsum_linestarts[i] > header_lines) if verbose: print( - f"[{rank}/{size}]: total_lines is {total_lines}.\n" - f"[{rank}/{size}]: cumsum is {cumsum}.\n" - f"[{rank}/{size}]: start is {start}." + f"[{rank}/{size}]: There are {counts_linestarts} line starts overall.\n" + f"[{rank}/{size}]: Their cumsum is {cumsum_linestarts}.\n" + f"[{rank}/{size}]: The start rank is {start_rank}." ) - if rank < start: # Ranks containing only header lines. - line_starts = [] - if rank == start: # Rank containing header + data lines. - rem = header_lines - (0 if start == 0 else cumsum[start - 1]) - line_starts = line_starts[rem:] + if rank < start_rank: # Ranks containing only header lines. + line_starts_local = [] + if rank == start_rank: # Rank containing header + data lines. + lines_to_remove = header_lines - (0 if start_rank == 0 else cumsum_linestarts[start_rank - 1]) + line_starts_local = line_starts_local[lines_to_remove:] # Share line starts of data samples across all ranks via Allgatherv. - line_starts += displs[ + line_starts_local += byte_displs[ rank ] # Shift line starts on each rank according to displs. if verbose: print( - f"[{rank}/{size}]: {len(line_starts)} line starts of shape {line_starts.shape} and type " - f"{line_starts.dtype} in local chunk: {line_starts}" + f"[{rank}/{size}]: {len(line_starts_local)} line starts of shape {line_starts_local.shape} and type " + f"{line_starts_local.dtype} in local chunk: {line_starts_local}" ) - count_linestarts = np.array( # Determine local number of line starts. - len(line_starts), dtype=int - ) - counts_linestarts = ( - np.empty( # Initialize array to all-gather local numbers of line starts. + counts_linestarts_corrected = ( + np.empty( # Initialize array to all-gather local numbers of line starts (corrected for header lines). size, dtype=int ) ) - comm.Allgather([count_linestarts, MPI.INT], [counts_linestarts, MPI.INT]) - n_linestarts = np.sum( - counts_linestarts - ) # Determine overall number of line starts. - displs_linestarts = ( + comm.Allgather( + [np.array(len(line_starts_local), dtype=int), MPI.INT], + [counts_linestarts_corrected, MPI.INT] + ) + displs_linestarts_corrected = ( np.concatenate( # Determine displacements of line starts from counts. ( np.zeros((1,), dtype=int), - np.cumsum(counts_linestarts, dtype=int)[:-1], + np.cumsum(counts_linestarts_corrected, dtype=int)[:-1], ), dtype=int, ) ) if verbose and rank == 0: print( - f"Overall {n_linestarts} linestarts.\n" - f"Number of linestarts in each chunk is {counts_linestarts}.\n" - f"Displs of linestarts in each chunk is {displs_linestarts}." + f"Overall {np.sum(counts_linestarts_corrected)} linestarts.\n" + f"Number of linestarts in each chunk is {counts_linestarts_corrected}.\n" + f"Displs of linestarts in each chunk is {displs_linestarts_corrected}." ) - all_line_starts = ( + line_starts_global = ( np.empty( # Initialize array to all-gatherv line starts from all ranks. - (n_linestarts,), dtype=line_starts.dtype + (np.sum(counts_linestarts_corrected),), dtype=line_starts_local.dtype ) ) if verbose and rank == 0: print( - f"Recvbuf {all_line_starts}, {all_line_starts.shape}, {all_line_starts.dtype}." + f"Recvbuf {line_starts_global}, {line_starts_global.shape}, {line_starts_global.dtype}." ) comm.Allgatherv( - line_starts, + line_starts_local, [ - all_line_starts, - counts_linestarts, - displs_linestarts, - from_numpy_dtype(line_starts.dtype), + line_starts_global, + counts_linestarts_corrected, + displs_linestarts_corrected, + from_numpy_dtype(line_starts_local.dtype), ], ) # Line starts were determined as those positions following a line end. # But: There is no line after last line end in file. # Thus, remove last entry from all_line_starts. - all_line_starts = all_line_starts[:-1] + line_starts_global = line_starts_global[:-1] if rank == 0: - print(f"After Allgatherv: All line starts: {all_line_starts}") + print(f"After Allgatherv: All line starts: {line_starts_global}") # Construct array with line starts and lengths in bytes. print( f"[{rank}/{size}]: Construct array with line starts and lengths in bytes." ) - lines_byte = _get_byte_pos_from_line_starts( - all_line_starts, file_size, lineter_len + lines_in_byte = _get_byte_pos_from_line_starts( + line_starts_global, file_size, lineter_len ) # Make global train-test split. @@ -367,7 +363,7 @@ def load_data_csv_parallel( train_indices, test_indices, ) = _split_indices_train_test( - n_samples=len(lines_byte), train_split=train_split, seed=size + n_samples=len(lines_in_byte), train_split=train_split, seed=size ) n_train_samples_local = ( @@ -380,14 +376,15 @@ def load_data_csv_parallel( # Construct held-out test dataset (same on each rank). print(f"[{rank}/{size}]: Decode {n_test_samples} test samples from file.") test_lines = _decode_bytes_array( - lines_byte[test_indices], f, sep=",", encoding="utf-8" + lines_in_byte[test_indices], f, sep=",", encoding="utf-8" ) test_samples = np.array(test_lines)[:, 1:] test_targets = np.array(test_lines)[:, 0] if verbose: - print_str = f"[{rank}/{size}]: Test samples: {test_samples[0]}\n" - print_str += f"Test targets: {test_targets[0]}\n" - print(print_str) + print( + f"[{rank}/{size}]: Test samples: {test_samples[0]}\n" + f"Test targets: {test_targets[0]}\n" + ) # Construct train dataset (different on each rank). rng = np.random.default_rng(seed=rank) @@ -395,7 +392,7 @@ def load_data_csv_parallel( train_indices_local = rng.choice(train_indices, size=n_train_samples_local) print(f"[{rank}/{size}]: Decode train lines from file.") train_lines_local = _decode_bytes_array( - lines_byte[train_indices_local], f, sep=",", encoding="utf-8" + lines_in_byte[train_indices_local], f, sep=",", encoding="utf-8" ) train_samples_local = np.array(train_lines_local)[:, 1:] train_targets_local = np.array(train_lines_local)[:, 0] @@ -467,8 +464,7 @@ def load_data_csv_root( train_counts = n_train_samples_local * np.ones( size, dtype=int ) # Determine load-balanced counts and displacements. - for idx in range(remainder_train): - train_counts[idx] += 1 + train_counts[:remainder_train] += 1 train_displs = np.concatenate( (np.zeros(1, dtype=int), np.cumsum(train_counts, dtype=int)[:-1]), dtype=int ) @@ -497,7 +493,7 @@ def load_data_csv_root( ] else: - train_counts = None + train_counts = np.empty(size, dtype=int) n_features = None n_test_samples = None send_buf_train_samples = None @@ -505,7 +501,7 @@ def load_data_csv_root( n_features = comm.bcast(n_features, root=0) n_test_samples = comm.bcast(n_test_samples, root=0) - train_counts = comm.bcast(train_counts, root=0) + comm.Bcast(train_counts, root=0) samples_train_local = np.empty((train_counts[rank], n_features), dtype=float) targets_train_local = np.empty((train_counts[rank],), dtype=float) recv_buf_train_samples = [ @@ -524,11 +520,10 @@ def load_data_csv_root( comm.Bcast(samples_test, root=0) comm.Bcast(targets_test, root=0) if verbose: - print_str = f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n" - print_str += ( + print( + f"[{rank}/{size}]: Train samples after Scatterv have shape {samples_train_local.shape}.\n" f"Train targets after Scatterv have shape {targets_train_local.shape}.\n" + f"Test samples after Bcast have shape {samples_test.shape}.\n" + f"Test targets after Bcast have shape {targets_test.shape}." ) - print_str += f"Test samples after Bcast have shape {samples_test.shape}.\n" - print_str += f"Test targets after Bcast have shape {targets_test.shape}." - print(print_str) return samples_train_local, targets_train_local, samples_test, targets_test