Skip to content
Snippets Groups Projects
Commit 0a254010 authored by Marie Weiel's avatar Marie Weiel :zap:
Browse files

update sheet 2

parent fa54ef86
Branches
No related tags found
No related merge requests found
%% Cell type:markdown id:ea5b6890 tags: %% Cell type:markdown id:ea5b6890 tags:
# Skalierbare Methoden der Künstlichen Intelligenz # Skalierbare Methoden der Künstlichen Intelligenz
Dr. Charlotte Debus (charlotte.debus@kit.edu) Dr. Charlotte Debus (charlotte.debus@kit.edu)
Dr. Markus Götz (markus.goetz@kit.edu) Dr. Markus Götz (markus.goetz@kit.edu)
Dr. Marie Weiel (marie.weiel@kit.edu) Dr. Marie Weiel (marie.weiel@kit.edu)
Dr. Kaleb Phipps (kaleb.phipps@kit.edu) Dr. Kaleb Phipps (kaleb.phipps@kit.edu)
## Übung 2 am 03.12.24: Pairwise Distances und Parallel Sorting by Regular Sampling ## Übung 2 am 03.12.24: Pairwise Distances und Parallel Sorting by Regular Sampling
In der zweiten Übung beschäftigen wir uns mit der parallelen Berechnung paarweiser Distanzen ("pairwise distances", siehe Vorlesung vom 14.11.24) und dem "Parallel Sorting by Regular Sampling" (PSRS) Algorithmus (siehe Vorlesung vom 07.11.24). In der zweiten Übung beschäftigen wir uns mit der parallelen Berechnung paarweiser Distanzen ("pairwise distances", siehe Vorlesung vom 14.11.24) und dem "Parallel Sorting by Regular Sampling" (PSRS) Algorithmus (siehe Vorlesung vom 07.11.24).
### Aufgabe 1 ### Aufgabe 1
Untenstehend finden Sie eine parallele Implementierung eines Algorithmus zur Berechnung paarweiser Distanzen in `Python3`. Wir verwenden 50 000 Samples des [SUSY-Datensatzes](https://archive.ics.uci.edu/dataset/279/susy). Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5` auf dem bwUniCluster. Der SUSY-Datensatz enthält insgesamt 5 000 000 Samples aus Monte-Carlo-Simulationen hochenergetischer Teilchenkollisionen. Jedes Sample hat 18 Features, bestehend aus kinematischen Eigenschaften, die typischerweise von Teilchendetektoren gemessen werden, sowie aus diesen Messungen abgeleiteten Größen. Führen Sie den Code auf einem, zwei, vier, acht sowie 16 CPU-basierten Knoten in den Partitionen "single" bzw. "multiple" des bwUniClusters aus. Untersuchen Sie das schwache sowie das starke Skalierungsverhalten des Algorithmus und stellen Sie diese grafisch dar, z.B. mit `matplotlib.pyplot` in `Python3`. Untenstehend finden Sie eine parallele Implementierung eines Algorithmus zur Berechnung paarweiser Distanzen in `Python3`. Wir verwenden 50 000 Samples des [SUSY-Datensatzes](https://archive.ics.uci.edu/dataset/279/susy). Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5` auf dem bwUniCluster. Der SUSY-Datensatz enthält insgesamt 5 000 000 Samples aus Monte-Carlo-Simulationen hochenergetischer Teilchenkollisionen. Jedes Sample hat 18 Features, bestehend aus kinematischen Eigenschaften, die typischerweise von Teilchendetektoren gemessen werden, sowie aus diesen Messungen abgeleiteten Größen. Führen Sie den Code auf einem, zwei, vier, acht sowie 16 CPU-basierten Knoten in den Partitionen "single" bzw. "multiple" des bwUniClusters aus. Untersuchen Sie das schwache sowie das starke Skalierungsverhalten des Algorithmus und stellen Sie diese grafisch dar, z.B. mit `matplotlib.pyplot` in `Python3`.
**Zur Erinnerung (siehe auch Vorlesung vom 31.10.24):** Bei der starken Skalierung wird die Problemgröße konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein Problem konstanter Größe durch Hinzunahme von mehr Rechenressourcen schneller lösen lässt. Bei der schwachen Skalierung wird die Problemgröße pro Prozess konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein größeres Problem durch Hinzunahme von mehr Rechenressourcen in gleicher Zeit lösen lässt. Das bedeutet, dass Sie die Problemgröße zur Untersuchung des schwachen Skalierungsverhaltens proportional anpassen müssen! **Zur Erinnerung (siehe auch Vorlesung vom 31.10.24):** Bei der starken Skalierung wird die Problemgröße konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein Problem konstanter Größe durch Hinzunahme von mehr Rechenressourcen schneller lösen lässt. Bei der schwachen Skalierung wird die Problemgröße pro Prozess konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein größeres Problem durch Hinzunahme von mehr Rechenressourcen in gleicher Zeit lösen lässt. Das bedeutet, dass Sie die Problemgröße zur Untersuchung des schwachen Skalierungsverhaltens proportional anpassen müssen!
**Vorgehensweise (analog zum ersten Übungsblatt):** **Vorgehensweise (analog zum ersten Übungsblatt):**
- Laden Sie zunächst die benötigten Module auf dem bwUniCluster. - Laden Sie zunächst die benötigten Module auf dem bwUniCluster.
- Setzen Sie dann eine virtuelle Umgebung mit `Python` auf, in der Sie die benötigten Pakete installieren. An dieser Stelle können Sie auch Ihre virtuelle Umgebung vom letzten Übungsblatt nutzen. - Setzen Sie dann eine virtuelle Umgebung mit `Python` auf, in der Sie die benötigten Pakete installieren. An dieser Stelle können Sie auch Ihre virtuelle Umgebung vom letzten Übungsblatt nutzen.
- Erstellen Sie basierend auf untenstehendem Code ein `Python`-Skript, welches Sie mithilfe eines `bash`-Skripts über SLURM auf dem Cluster submittieren (siehe Übung vom 05.11.24). Nachfolgend finden Sie ein beispielhaftes Template für das Submit-Skript für einen Multi-Node-Job inklusive der benötigten Module. Wenn Sie eine andere Anzahl an Knoten verwenden möchten, müssen Sie die `#SBATCH`-Optionen entsprechend modifizieren. Weitere Informationen dazu finden Sie [hier](https://wiki.bwhpc.de/e/BwUniCluster_2.0_Slurm_common_Features). - Erstellen Sie basierend auf untenstehendem Code ein `Python`-Skript, welches Sie mithilfe eines `bash`-Skripts über SLURM auf dem Cluster submittieren (siehe Übung vom 05.11.24). Nachfolgend finden Sie ein beispielhaftes Template für das Submit-Skript für einen Multi-Node-Job inklusive der benötigten Module. Wenn Sie eine andere Anzahl an Knoten verwenden möchten, müssen Sie die `#SBATCH`-Optionen entsprechend modifizieren. Weitere Informationen dazu finden Sie [hier](https://wiki.bwhpc.de/e/BwUniCluster_2.0_Slurm_common_Features).
%% Cell type:code id:bd16f08b-584d-4913-86bb-f5bb6d4822b6 tags: %% Cell type:code id:bd16f08b-584d-4913-86bb-f5bb6d4822b6 tags:
``` python ``` python
#!/bin/bash #!/bin/bash
#SBATCH --job-name=cdist # Job name #SBATCH --job-name=cdist # Job name
#SBATCH --partition=multiple # Queue for resource allocation #SBATCH --partition=multiple # Queue for resource allocation
#SBATCH --nodes=2 # Number of nodes to be used #SBATCH --nodes=2 # Number of nodes to be used
#SBATCH --time=4:00 # Wall-clock time limit #SBATCH --time=4:00 # Wall-clock time limit
#SBATCH --mem=90000 # Memory per node
#SBATCH --cpus-per-task=40 # Number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node #SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --cpus-per-task=40 # Number of CPUs per MPI task
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. #SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export IBV_FORK_SAFE=1
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment. export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script. export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
# Set up modules. # Set up modules.
module purge # Unload all currently loaded modules. module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules. module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1 module load mpi/openmpi/4.1
module load devel/cuda/12.4 module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment. source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun --mca mpi_warn_on_fork 0 python -u ${PYDIR}/cdist.py mpirun python -u ${PYDIR}/cdist.py
``` ```
%% Cell type:code id:9e172b4f tags: %% Cell type:code id:9e172b4f tags:
``` python ``` python
"""Parallel calculation of pairwise distances""" """Parallel calculation of pairwise distances"""
import time import time
import h5py import h5py
import torch import torch
from mpi4py import MPI from mpi4py import MPI
torch.set_default_dtype(torch.float32) torch.set_default_dtype(torch.float32)
def dist(x: torch.Tensor, y: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor: def dist(x: torch.Tensor, y: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
""" """
Calculate pairwise distances between all rows (samples, i.e., along axis 0) of two tensors x and y in parallel. Calculate pairwise distances between all rows (samples, i.e., along axis 0) of two tensors x and y in parallel.
The distance matrix is calculated tile-wise with ring communication between processes, each holding a piece of x The distance matrix is calculated tile-wise with ring communication between processes, each holding a piece of x
and/or y. and/or y.
Parameters Parameters
---------- ----------
x : torch.Tensor x : torch.Tensor
First 2d tensor (of shape m/p x f). m is the total number of samples in x, distributed over p processors. First 2d tensor (of shape m/p x f). m is the total number of samples in x, distributed over p processors.
f is the number of features. f is the number of features.
y : torch.Tensor y : torch.Tensor
Second 2d tensor (of shape n/p x f). n is the total number of samples in x, distributed over p processors. Second 2d tensor (of shape n/p x f). n is the total number of samples in x, distributed over p processors.
The number of features f must be the same as for x. The number of features f must be the same as for x.
comm : MPI.Comm comm : MPI.Comm
Communicator to use. Default is ``MPI.COMM_WORLD``. Communicator to use. Default is ``MPI.COMM_WORLD``.
""" """
# Check whether two input tensors are compatible. # Check whether two input tensors are compatible.
if len(x.shape) != len(y.shape) != 2: if len(x.shape) != len(y.shape) != 2:
raise ValueError("Input tensors must be two-dimensional.") raise ValueError("Input tensors must be two-dimensional.")
if x.shape[1] != y.shape[1]: if x.shape[1] != y.shape[1]:
raise ValueError(f"Input tensors must have the same number of features but {x.shape[1]} != {y.shape[1]}.") raise ValueError(f"Input tensors must have the same number of features but {x.shape[1]} != {y.shape[1]}.")
size, rank = comm.size, comm.rank # Set up communication. size, rank = comm.size, comm.rank # Set up communication.
if size == 1: # Use torch functionality in non-parallel case. if size == 1: # Use torch functionality in non-parallel case.
return torch.cdist(x, y) return torch.cdist(x, y)
else: # Parallel case else: # Parallel case
# --- Setup and Matrix Initialization --- # --- Setup and Matrix Initialization ---
mp, f = x.shape # Get number of samples in local chunk of x and number of features. mp, f = x.shape # Get number of samples in local chunk of x and number of features.
np = y.shape[0] # Get number of samples in local chunk of y. np = y.shape[0] # Get number of samples in local chunk of y.
# Each process initializes a local matrix, `local_distances`, of shape `(mp, n)`, where `mp` is the local chunk # Each process initializes a local matrix, `local_distances`, of shape `(mp, n)`, where `mp` is the local chunk
# size of `x`, and `n` is the total number of samples in `y`. Each rank thus calculates the distance matrix # size of `x`, and `n` is the total number of samples in `y`. Each rank thus calculates the distance matrix
# chunk of size `mp x n`, i.e., rank 0 has distances from its own local `x` to all other `y`'s. # chunk of size `mp x n`, i.e., rank 0 has distances from its own local `x` to all other `y`'s.
# Determine overall number of samples in y. # Determine overall number of samples in y.
n = comm.allreduce(np, op=MPI.SUM) n = comm.allreduce(np, op=MPI.SUM)
print(f"Overall number of samples is {n}.") print(f"Overall number of samples is {n}.")
# Initialize rank-local chunk of mp x n distance matrix with zeros. # Initialize rank-local chunk of mp x n distance matrix with zeros.
local_distances = torch.zeros((mp, n)) local_distances = torch.zeros((mp, n))
# --- Managing Chunks and Displacements --- # --- Managing Chunks and Displacements ---
# Determine where to put each result in the rank-local distance matrix chunk. # Determine where to put each result in the rank-local distance matrix chunk.
# Determine number of samples (rows) in each rank-local y. # Determine number of samples (rows) in each rank-local y.
y_counts = torch.tensor(comm.allgather(torch.numel(y) // f), dtype=torch.int) y_counts = torch.tensor(comm.allgather(np), dtype=torch.int)
# Calculate corresponding displacements from counts to record the starting index of each chunk in y. Thus, each # Calculate corresponding displacements from counts to record the starting index of each chunk in y. Thus, each
# process can identify where in the result matrix it should write the distances. # process can identify where in the result matrix it should write the distances.
y_displ = (0,) + tuple(torch.cumsum(y_counts, dim=0, dtype=torch.int)[:-1]) y_displ = (0,) + tuple(torch.cumsum(y_counts, dim=0, dtype=torch.int)[:-1])
# Extract actual result columns in distance matrix chunk for each rank.
cols = (y_displ[rank], y_displ[rank + 1] if (rank + 1) != size else n)
# --- Ring Communication Pattern --- # --- Ring Communication Pattern ---
# Calculate distances in a "ring" pattern. Each process calculates distances for its local x chunk against its # Calculate distances in a "ring" pattern. Each process calculates distances for its local x chunk against its
# local y chunk (diagonal calculation). Then, through `size - 1` iterations, each process sends its y chunk to # local y chunk (diagonal calculation). Then, through `size - 1` iterations, each process sends its y chunk to
# the next process in the "ring" while receiving a new y chunk from the previous process. This continues until # the next process in the "ring" while receiving a new y chunk from the previous process. This continues until
# each process has calculated distances between its x chunk and all chunks of y across all processes. # each process has calculated distances between its x chunk and all chunks of y across all processes.
stationary = y
# 0th iteration: Calculate diagonal of global distance matrix. # 0th iteration: Calculate diagonal of global distance matrix.
# Each process calculates distances for its local x chunk against its local y chunk. # Each process calculates distances for its local x chunk against its local y chunk.
print(f"Rank [{rank}/{size}]: Calculate diagonal blocks...") print(f"Rank [{rank}/{size}]: Calculate diagonal blocks in global distance matrix...")
local_distances[:, cols[0]: cols[1]] = torch.cdist(x, stationary) # Extract actual result columns in distance matrix chunk for each rank.
cols = (y_displ[rank], y_displ[rank + 1] if (rank + 1) != size else n)
local_distances[:, cols[0]: cols[1]] = torch.cdist(x, y)
print(f"Rank [{rank}/{size}]: Start tile-wise ring communication...") print(f"Rank [{rank}/{size}]: Start tile-wise ring communication...")
# Remaining `size-1` iterations: Send rank-local part of y to next process in circular fashion. # Remaining `size-1` iterations: Send rank-local part of y to next process in circular fashion.
for iter_idx in range(1, size): for iter_idx in range(1, size):
receiver = (rank + iter_idx) % size # Determine receiving process. receiver = (rank + iter_idx) % size # Determine receiving process.
sender = (rank - iter_idx) % size # Determine sending process. sender = (rank - iter_idx) % size # Determine sending process.
# Determine columns of rank-local distance matrix chunk to write result to. # Determine columns of rank-local distance matrix chunk to write result to.
col1 = y_displ[sender] col1 = y_displ[sender]
col2 = y_displ[sender + 1] if sender != size - 1 else n col2 = y_displ[sender + 1] if sender != size - 1 else n
columns = (col1, col2)
# All but first `iter_idx` processes are first receiving, then sending. # All but first `iter_idx` processes are first receiving, then sending.
if (rank // iter_idx) != 0: if (rank // iter_idx) != 0:
stat = MPI.Status() stat = MPI.Status()
# Probe for incoming message containing the next chunk of y to consider. # Probe for incoming message containing the next chunk of y to consider.
comm.Probe(source=sender, tag=iter_idx, status=stat) comm.Probe(source=sender, tag=iter_idx, status=stat)
# Determine number of samples to receive (= overall number of floats to receive / number of features). # Determine number of samples to receive (= overall number of floats to receive / number of features).
count = int(stat.Get_count(MPI.FLOAT) / f) count = int(stat.Get_count(MPI.FLOAT) / f)
# Initialize tensor for incoming chunk of y with zeros. # Initialize tensor for incoming chunk of y with zeros.
moving = torch.zeros((count, f)) moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx) comm.Recv(moving, source=sender, tag=iter_idx)
# Send rank-local chunk of y to next process. # Send rank-local chunk of y to next process.
comm.Send(stationary, dest=receiver, tag=iter_idx) comm.Send(y, dest=receiver, tag=iter_idx)
# First `iter_idx` processes can now receive after sending. # First `iter_idx` processes can now receive after sending.
if (rank // iter_idx) == 0: if (rank // iter_idx) == 0:
stat = MPI.Status() stat = MPI.Status()
comm.Probe(source=sender, tag=iter_idx, status=stat) comm.Probe(source=sender, tag=iter_idx, status=stat)
count = int(stat.Get_count(MPI.FLOAT) / f) count = int(stat.Get_count(MPI.FLOAT) / f)
moving = torch.zeros((count, f)) moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx) comm.Recv(moving, source=sender, tag=iter_idx)
# Calculate distances between stationary chunk of x and currently considered, moving chunk of y. # Calculate distances between stationary chunk of x and currently considered, moving chunk of y.
# Write result at correct position in distance matrix. # Write result at correct position in distance matrix.
local_distances[:, columns[0]: columns[1]] = torch.cdist(x, moving) local_distances[:, col1: col2] = torch.cdist(x, moving)
print(f"Rank [{rank}/{size}]: [DONE]") print(f"Rank [{rank}/{size}]: [DONE]")
return local_distances return local_distances
if __name__ == "__main__": if __name__ == "__main__":
comm = MPI.COMM_WORLD comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size rank, size = comm.rank, comm.size
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5" data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5"
dataset = "data" dataset = "data"
if rank == 0: if rank == 0:
print( print(
"######################\n" "######################\n"
"# Pairwise distances #\n" "# Pairwise distances #\n"
"######################\n" "######################\n"
f"COMM_WORLD size is {size}.\n" f"COMM_WORLD size is {size}.\n"
f"Loading data... {data_path}[{dataset}]" f"Loading data... {data_path}[{dataset}]"
) )
# Parallel data loader for SUSY data. # Parallel data loader for SUSY data.
with h5py.File(data_path, "r") as handle: with h5py.File(data_path, "r") as handle:
chunk = int(handle[dataset].shape[0]/size) chunk = int(handle[dataset].shape[0]/size)
if rank == size - 1: if rank == size - 1:
data = torch.FloatTensor(handle[dataset][rank*chunk:]) data = torch.FloatTensor(handle[dataset][rank*chunk:])
else: else:
data = torch.FloatTensor(handle[dataset][rank*chunk:(rank+1)*chunk]) data = torch.FloatTensor(handle[dataset][rank*chunk:(rank+1)*chunk])
print(f"\t[OK]\nRank [{rank}/{size}]: Local data chunk has shape {list(data.shape)}...") print(f"\t[OK]\nRank [{rank}/{size}]: Local data chunk has shape {list(data.shape)}...")
if rank == 0: if rank == 0:
print("Start distance calculations...") print("Start distance calculations...")
# Calculate distances of all SUSY samples w.r.t. each other and measure runtime. # Calculate distances of all SUSY samples w.r.t. each other and measure runtime.
start = time.perf_counter() start = time.perf_counter()
distances = dist(data, data, comm) distances = dist(data, data, comm)
local_runtime = time.perf_counter() - start local_runtime = time.perf_counter() - start
# Calculate process-averaged runtime. # Calculate process-averaged runtime.
average_runtime = comm.allreduce(local_runtime, op=MPI.SUM) / size average_runtime = comm.allreduce(local_runtime, op=MPI.SUM) / size
print(f"Rank [{rank}/{size}]: Local distance matrix has shape {list(distances.shape)}.") print(f"Rank [{rank}/{size}]: Local distance matrix has shape {list(distances.shape)}.")
if rank == 0: if rank == 0:
print(f"Process-averaged run time:\t{average_runtime} s") print(f"Process-averaged run time:\t{average_runtime} s")
``` ```
%% Cell type:markdown id:6c7900f7 tags: %% Cell type:markdown id:6c7900f7 tags:
### Aufgabe 2 ### Aufgabe 2
Untenstehend finden Sie eine parallele Implementierung des PSRS-Algorithmus. Mithilfe dieses Codes sollen unterschiedliche Sequenzen von ganzen Zahlen sortiert werden. Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5` auf dem bwUniCluster. Die Datei enthält fünf verschiedene Ganzzahl-Sequenzen als Datensätze `['duplicates_10', 'duplicates_5', 'many_duplicates', 'no_duplicates', 'triplicates']`, die jeweils einen unterschiedlichen Anteil an Duplikaten bzw. Triplikaten enthalten. Alle Sequenzen bestehen aus $10^9$ Elementen. Untenstehend finden Sie eine parallele Implementierung des PSRS-Algorithmus. Mithilfe dieses Codes sollen unterschiedliche Sequenzen von ganzen Zahlen sortiert werden. Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5` auf dem bwUniCluster. Die Datei enthält fünf verschiedene Ganzzahl-Sequenzen als Datensätze `['duplicates_10', 'duplicates_5', 'many_duplicates', 'no_duplicates', 'triplicates']`, die jeweils einen unterschiedlichen Anteil an Duplikaten bzw. Triplikaten enthalten. Alle Sequenzen bestehen aus $10^9$ Elementen.
Führen Sie den Code mithilfe des untenstehenden Submit-Skripts für alle fünf Datensätze auf vier CPU-basierten Knoten in der Partition "multiple" des bwUniClusters aus. Die Datensätze können über ein Command-Line-Argument des Python-Skripts spezifiziert werden, z.B. `mpirun python psrs.py --dataset no_duplicates`. Messen und vergleichen Sie die Laufzeiten. Was fällt Ihnen auf? Führen Sie den Code mithilfe des untenstehenden Submit-Skripts für alle fünf Datensätze auf vier CPU-basierten Knoten in der Partition "multiple" des bwUniClusters aus. Die Datensätze können über ein Command-Line-Argument des Python-Skripts spezifiziert werden, z.B. `mpirun python psrs.py --dataset no_duplicates`. Messen und vergleichen Sie die Laufzeiten. Was fällt Ihnen auf?
%% Cell type:code id:691eeb17 tags: %% Cell type:code id:691eeb17 tags:
``` python ``` python
#!/bin/bash #!/bin/bash
#SBATCH --job-name=psrs # Job name #SBATCH --job-name=psrs # Job name
#SBATCH --partition=dev_multiple # Queue for the resource allocation. #SBATCH --partition=dev_multiple # Queue for the resource allocation.
#SBATCH --time=5:00 # Wall-clock time limit #SBATCH --time=5:00 # Wall-clock time limit
#SBATCH --mem=90000 # Memory per node #SBATCH --mem=90000 # Memory per node
#SBATCH --nodes=4 # Number of nodes to be used #SBATCH --nodes=4 # Number of nodes to be used
#SBATCH --cpus-per-task=40 # Number of CPUs per MPI task #SBATCH --cpus-per-task=40 # Number of CPUs per MPI task
#SBATCH --ntasks-per-node=1 # Number of tasks per node #SBATCH --ntasks-per-node=1 # Number of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. #SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export IBV_FORK_SAFE=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment. export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script. export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules. # Set up modules.
module purge # Unload all currently loaded modules. module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules. module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1 module load mpi/openmpi/4.1
module load devel/cuda/12.4 module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment. source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun --mca mpi_warn_on_fork 0 python ${PYDIR}/psrs.py --dataset no_duplicates # Specify dataset via command-line argument. mpirun python ${PYDIR}/psrs.py --dataset no_duplicates # Specify dataset via command-line argument.
``` ```
%% Cell type:code id:529e07b3 tags: %% Cell type:code id:529e07b3 tags:
``` python ``` python
"""Parallel Sorting by Regular Sampling""" """Parallel Sorting by Regular Sampling"""
import argparse import argparse
import time import time
import h5py import h5py
import numpy import numpy
import torch import torch
from mpi4py import MPI from mpi4py import MPI
__mpi_type_mappings = { __mpi_type_mappings = {
torch.bool: MPI.BOOL, torch.bool: MPI.BOOL,
torch.uint8: MPI.UNSIGNED_CHAR, torch.uint8: MPI.UNSIGNED_CHAR,
torch.int8: MPI.SIGNED_CHAR, torch.int8: MPI.SIGNED_CHAR,
torch.int16: MPI.SHORT, torch.int16: MPI.SHORT,
torch.int32: MPI.INT, torch.int32: MPI.INT,
torch.int64: MPI.LONG, torch.int64: MPI.LONG,
torch.bfloat16: MPI.INT16_T, torch.bfloat16: MPI.INT16_T,
torch.float16: MPI.INT16_T, torch.float16: MPI.INT16_T,
torch.float32: MPI.FLOAT, torch.float32: MPI.FLOAT,
torch.float64: MPI.DOUBLE, torch.float64: MPI.DOUBLE,
torch.complex64: MPI.COMPLEX, torch.complex64: MPI.COMPLEX,
torch.complex128: MPI.DOUBLE_COMPLEX, torch.complex128: MPI.DOUBLE_COMPLEX,
} }
def sort(a: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor: def sort(a: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
""" """
Sort a's elements along given dimension in ascending order by their value. Sort a's elements along given dimension in ascending order by their value.
The sorting is not stable which means that equal elements in the result may have different ordering than in The sorting is not stable which means that equal elements in the result may have different ordering than in
original array. original array.
Parameters Parameters
---------- ----------
a : torch.Tensor a : torch.Tensor
The 1D input array to be sorted. The 1D input array to be sorted.
comm : MPI.Comm comm : MPI.Comm
The communicator to use. The communicator to use.
Returns Returns
------- -------
torch.Tensor torch.Tensor
Sorted local results. Sorted local results.
""" """
size, rank = comm.size, comm.rank size, rank = comm.size, comm.rank
if size == 1: if size == 1:
local_sorted, _ = torch.sort(a) local_sorted, _ = torch.sort(a)
return local_sorted return local_sorted
else: else:
########### ###########
# PHASE 1 # # PHASE 1 #
########### ###########
# p is comm size, n is overall number of samples. # P is comm size, n is overall number of samples.
# Each rank sorts its local chunk and chooses p regular samples as representatives. # Each rank sorts its local chunk and chooses P regular samples as representatives.
if rank == 0: if rank == 0:
print( print(
"###########\n" "###########\n"
"# PHASE 1 #\n" "# PHASE 1 #\n"
"###########" "###########"
) )
local_sorted, local_indices = torch.sort(a) local_sorted, local_indices = torch.sort(a)
print(f"Rank {rank}/{size}: Local sorting done...[OK]") print(f"Rank {rank}/{size}: Local sorting done...[OK]")
n_local = torch.numel(local_sorted) # Number of elements in local chunk. n_local = torch.numel(local_sorted) # Number of elements in local chunk.
print(f"Rank {rank}/{size}: Number of elements in local chunk is {n_local}.") print(f"Rank {rank}/{size}: Number of elements in local chunk is {n_local}.")
n_global = comm.allreduce(n_local, op=MPI.SUM) n_global = comm.allreduce(n_local, op=MPI.SUM)
# Each rank chooses p regular samples. # Each rank chooses P regular samples.
# For this, separate sorted tensor into p+1 equal-length partitions. # For this, separate sorted tensor into P+1 equal-length partitions.
# Regular samples have indices 0, w, 2w,...,(p−1)w where w=n/p^2. # Regular samples have indices 0, w, 2w,...,(P−1)w where w=n/P^2.
# Here: `size` = p # Here: `size` = P
w = int(n_global / size ** 2) w = int(n_global / size ** 2)
partitions = [idx * w for idx in range(0, size)] partitions = [idx * w for idx in range(0, size)]
regular_samples_local = local_sorted[partitions] regular_samples_local = local_sorted[partitions]
print( print(
f"Rank {rank}/{size}: There are {len(partitions)} local regular samples: {regular_samples_local}" f"Rank {rank}/{size}: There are {len(partitions)} local regular samples: {regular_samples_local}"
) )
# Root gathers regular samples. # Root gathers regular samples.
num_regs_global = int( # Each processor has drawn `size` regular samples and we have `size` processors overall.
comm.allreduce(torch.numel(regular_samples_local), op=MPI.SUM) num_regular_samples_global = size * size
) # Get overall number of regular samples.
if rank == 0: if rank == 0:
print(f"Overall number of regular samples is {num_regs_global}.") print(f"Overall number of regular samples is {num_regular_samples_global}.")
reg_samples_global = torch.zeros(num_regs_global, dtype=a.dtype) # Initialize buffer to gather all regular samples on root.
comm.Gather(regular_samples_local, reg_samples_global, root=0) regular_samples_global = torch.zeros(num_regular_samples_global, dtype=a.dtype)
comm.Gather(regular_samples_local, regular_samples_global, root=0)
if rank == 0: if rank == 0:
print("On root: Regular samples gathered...[OK]") print("On root: Regular samples gathered...[OK]")
########### ###########
# PHASE 2 # # PHASE 2 #
########### ###########
# Root sorts gathered regular samples, chooses pivots, and shares them with other processes. # Root
# - sorts gathered regular samples,
# - chooses P-1 pivots at index positions P + rho, 2P + rho, ..., (P-1)P + rho with rho = P/2 -1, and
# - shares them with other processes.
if rank == 0: if rank == 0:
print( print(
"###########\n" "###########\n"
"# PHASE 2 #\n" "# PHASE 2 #\n"
"###########" "###########"
) )
# Initialize buffer for sharing pivots later.
global_pivots = torch.zeros((size - 1,), dtype=local_sorted.dtype) global_pivots = torch.zeros((size - 1,), dtype=local_sorted.dtype)
if rank == 0: if rank == 0:
sorted_regs_global, _ = torch.sort(reg_samples_global) # Sort gathered regular samples on root.
print(f"On root: Regular samples are {sorted_regs_global}.") sorted_regular_samples_global, _ = torch.sort(regular_samples_global)
# Choose p-1 pivot indices (p = MPI size). print(f"On root: Regular samples are {sorted_regular_samples_global}.")
# Choose P-1 pivot indices (P = `size`).
rho = int(size/2) - 1
global_partitions = [ global_partitions = [
int(x * num_regs_global / size) for x in range(1, size) idx * size + rho for idx in range(1, size)
] ]
global_pivots = sorted_regs_global[global_partitions] global_pivots = sorted_regular_samples_global[global_partitions]
if len(global_partitions) == size - 1: if len(global_partitions) == size - 1:
print( print(
f"On root: There are {len(global_partitions)} global pivots: {global_pivots}" f"On root: There are {len(global_partitions)} global pivots: {global_pivots}"
) )
comm.Bcast( comm.Bcast(
global_pivots, root=0 global_pivots, root=0
) # Broadcast copy of pivots to all processes from root. ) # Broadcast copy of pivots to all processes from root.
if rank == 0: if rank == 0:
print("Pivots broadcast to all processes...") print("Pivots broadcast to all processes...")
########### ###########
# PHASE 3 # # PHASE 3 #
########### ###########
if rank == 0: if rank == 0:
print( print(
"###########\n" "###########\n"
"# PHASE 3 #\n" "# PHASE 3 #\n"
"###########\n" "###########\n"
"Each processor forms p disjunct partitions of locally sorted elements using pivots as splits." "Each processor forms P disjunct partitions of locally sorted elements using pivots as splits."
) )
# Each processor forms p disjunct partitions of locally sorted elements using p-1 pivots as splits. # Each processor forms P disjunct partitions of locally sorted elements using P-1 pivots as splits.
# Initialize buffer to store partition mask of local samples.
# `lt_partitions` is a tensor of shape [P, n_local]. We use it to assign local samples to the partitions as
# split by the global pivots. This is done via a boolean mask.
lt_partitions = torch.empty((size, local_sorted.shape[0]), dtype=torch.int64) lt_partitions = torch.empty((size, local_sorted.shape[0]), dtype=torch.int64)
last = torch.zeros_like(local_sorted, dtype=torch.int64) last = torch.zeros_like(local_sorted, dtype=torch.int64)
# Iterate over all pivots and store index of first pivot greater than element's value # Iterate over all pivots and store index of first pivot greater than element's value.
if rank == 0: if rank == 0:
print("Iterate over pivots to find index of first pivot > element's value.") print("Iterate over pivots to find index of first pivot > element's value.")
for idx, p in enumerate(global_pivots): for idx, pivot in enumerate(global_pivots):
# torch.lt(input, other, *, out=None) computes `input < other` element-wise. # torch.lt(input, other, *, out=None) computes `input < other` element-wise.
# Returns boolean tensor that is True where input is less than other and False elsewhere. # Returns boolean tensor that is True where input is less than other and False elsewhere.
lt = torch.lt(local_sorted, p).int() lt = torch.lt(local_sorted, pivot).int()
if idx > 0: lt_partitions[idx] = lt - last
lt_partitions[idx] = lt - last
else:
lt_partitions[idx] = lt
last = lt last = lt
lt_partitions[size - 1] = torch.ones_like(local_sorted, dtype=last.dtype) - last lt_partitions[size - 1] = torch.ones_like(local_sorted, dtype=last.dtype) - last
# lt_partitions contains p elements, first encodes which elements in local_sorted are smaller than 1st # `lt_partitions` contains P elements: the first one encodes which elements in `local_sorted` are smaller than
# (= smallest) pivot, second encodes which elements are larger than 1st and smaller than 2nd pivot, ..., last # the 1st (= smallest) pivot, the second one encodes which elements are larger than the 1st and smaller than the
# elements encodes which elements are larger than last ( = largest) pivot. Now set up matrix holding info how # 2nd pivot, ..., and the last encodes which elements are larger than last ( = largest) pivot.
# many values will be sent for each partition. Processor i keeps ith partitions and sends jth partition to # Now each processor has partitioned its locally sorted data into P partitions.
# processor j. # To obtain the globally sorted result, each processor needs to send its i-th partition to processor i so that
local_partitions = torch.sum( # each processor i finally holds all samples from partition i.
# This all-to-all communication requires information about how many values will be sent for each partition.
# Processor i keeps i-th partitions and sends j-th partition to processor j.
local_partition_sizes = torch.sum(
lt_partitions, dim=1 lt_partitions, dim=1
) # How many values will be sent where (local)? ) # How many values are in each local partition?
print( print(
f"Rank {rank}/{size}: Local # elements to be sent to other ranks (keep own section): {local_partitions}" f"Rank {rank}/{size}: Local # elements to be sent to other ranks (keep own section): {local_partition_sizes}"
) )
partition_matrix = torch.zeros_like( global_partition_sizes = torch.zeros_like(
local_partitions local_partition_sizes
) # How many values will be sent where (global)? ) # How many values are in each global partition?
comm.Allreduce(local_partitions, partition_matrix, op=MPI.SUM) comm.Allreduce(local_partition_sizes, global_partition_sizes, op=MPI.SUM)
if rank == 0: if rank == 0:
print( print(
f"Global # of elements on all ranks (partition matrix): {partition_matrix}" f"Global # of elements on all ranks (partition matrix): {global_partition_sizes}"
) )
# Matrix holding info which value will be shipped where. # Construct matrix holding information which local value will be shipped where.
index_matrix = torch.empty_like(local_sorted, dtype=torch.int64) communication_index_vector = torch.empty_like(local_sorted, dtype=torch.int64)
# Loop over `lt_partitions` (binary encoding of which elements is in which partition formed by pivots). # Loop over `lt_partitions` (binary encoding of which elements is in which partition formed by pivots).
for i, x in enumerate(lt_partitions): for idx_rank, partition_mask_value in enumerate(lt_partitions):
index_matrix[x > 0] = i communication_index_vector[partition_mask_value > 0] = idx_rank
# Elements in 0th partition (< first pivot) get 0, i.e., will be collected at rank 0, elements in 1st # Elements in 0th partition (< first pivot) get 0, i.e., will be collected at rank 0, elements in 1st
# partition (> than first + < than second pivot) get 1, i.e., will be collected at rank 1,... # partition (> than first + < than second pivot) get 1, i.e., will be collected at rank 1,...
print(f"Rank {rank}/{size}: Ship element to rank: {index_matrix}") print(f"Rank {rank}/{size}: Ship element to rank: {communication_index_vector}")
# Counts and displacements for Alltoallv are rank-specific!
# send_counts_local on rank i: Integer array, entry j specifies number of values to be sent to rank j.
# recv_counts_local on rank i: Integer array, entry j specifies number of values to be received from rank j.
# Determine how many elements are to be shipped to which rank.
send_counts_local = numpy.zeros(size, dtype=int) send_counts_local = numpy.zeros(size, dtype=int)
for s in numpy.arange(size): # Determine number of elements in each local partition.
send_counts_local[s] = int((index_matrix == s).sum(dim=0)) for idx_rank in numpy.arange(size):
send_displ = numpy.zeros(size, dtype=int) send_counts_local[idx_rank] = int((communication_index_vector == idx_rank).sum(dim=0))
send_displ[1:] = numpy.cumsum(send_counts_local, axis=0)[:-1] # Determine local send displacements from local send counts.
send_displ_local = numpy.zeros(size, dtype=int)
send_displ_local[1:] = numpy.cumsum(send_counts_local, axis=0)[:-1]
# Allgather local send counts to determine how many elements are sent by other ranks to this rank and thus are
# to be received.
send_counts_global = numpy.zeros((size, size), dtype=int) send_counts_global = numpy.zeros((size, size), dtype=int)
comm.Allgather([send_counts_local, MPI.INT], [send_counts_global, MPI.INT]) comm.Allgather([send_counts_local, MPI.INT], [send_counts_global, MPI.INT])
# Determine global receive counts from transposed global send counts.
recv_counts_global = numpy.transpose(send_counts_global) recv_counts_global = numpy.transpose(send_counts_global)
# Extract local receive counts.
recv_counts_local = recv_counts_global[rank] recv_counts_local = recv_counts_global[rank]
recv_displ = numpy.zeros(size, dtype=int) # Determine local receive displacements from local receive counts.
recv_displ[1:] = numpy.cumsum(recv_counts_local, axis=0)[:-1] recv_displ_local = numpy.zeros(size, dtype=int)
# Counts + displacements for Alltoallv are rank-specific! recv_displ_local[1:] = numpy.cumsum(recv_counts_local, axis=0)[:-1]
# send_counts_local on rank i: Integer array, entry j specifies number of values to be sent to rank j.
# recv_counts_local on rank i: Integer array, entry j specifies number of values to be received from rank j.
val_buf = torch.zeros((partition_matrix[rank],), dtype=local_sorted.dtype)
value_buffer = torch.zeros((global_partition_sizes[rank],), dtype=local_sorted.dtype)
# Communicating torch tensors using buffered MPI variants requires accessing the tensor's storage pointer!
send_buf = [ send_buf = [
MPI.memory.fromaddress(local_sorted.data_ptr(), 0), MPI.memory.fromaddress(local_sorted.data_ptr(), 0),
(send_counts_local.tolist(), send_displ.tolist()), (send_counts_local.tolist(), send_displ_local.tolist()),
__mpi_type_mappings[local_sorted.dtype], __mpi_type_mappings[local_sorted.dtype],
] ]
recv_buf = [ recv_buf = [
MPI.memory.fromaddress(val_buf.data_ptr(), 0), MPI.memory.fromaddress(value_buffer.data_ptr(), 0),
(recv_counts_local.tolist(), recv_displ.tolist()), (recv_counts_local.tolist(), recv_displ_local.tolist()),
__mpi_type_mappings[val_buf.dtype], __mpi_type_mappings[value_buffer.dtype],
] ]
comm.Alltoallv(send_buf, recv_buf) comm.Alltoallv(send_buf, recv_buf)
result, _ = torch.sort(val_buf) result, _ = torch.sort(value_buffer)
return result return result
if __name__ == "__main__": if __name__ == "__main__":
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5" data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5"
parser = argparse.ArgumentParser(prog="Parallel Sorting by Regular Samples") parser = argparse.ArgumentParser(prog="Parallel Sorting by Regular Samples")
parser.add_argument( parser.add_argument(
"--dataset", "--dataset",
type=str, type=str,
default="duplicates_1", default="duplicates_1",
help="The dataset to be sorted.", help="The dataset to be sorted.",
) )
args = parser.parse_args() args = parser.parse_args()
comm = MPI.COMM_WORLD comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size rank, size = comm.rank, comm.size
with h5py.File(data_path, "r") as f: with h5py.File(data_path, "r") as f:
chunk = int(f[args.dataset].shape[0] / size) chunk = int(f[args.dataset].shape[0] / size)
if rank == size - 1: if rank == size - 1:
data = torch.tensor(f[args.dataset][rank * chunk:]) data = torch.tensor(f[args.dataset][rank * chunk:])
else: else:
data = torch.tensor(f[args.dataset][rank * chunk:(rank + 1) * chunk]) data = torch.tensor(f[args.dataset][rank * chunk:(rank + 1) * chunk])
if rank == 0: if rank == 0:
print( print(
"########\n" "########\n"
"# PSRS #\n" "# PSRS #\n"
"########" "########"
) )
print(f"Local data on rank {rank} = {data}") print(f"Local data on rank {rank} = {data}")
if rank == 0: if rank == 0:
print("Start sorting...") print("Start sorting...")
start = time.perf_counter() start = time.perf_counter()
result = sort(data) result = sort(data)
elapsed_local = time.perf_counter() - start elapsed_local = time.perf_counter() - start
elapsed_global = comm.allreduce(elapsed_local, op=MPI.SUM) elapsed_global = comm.allreduce(elapsed_local, op=MPI.SUM)
elapsed_global /= size elapsed_global /= size
if rank == 0: if rank == 0:
print(f"Sorting done...\nRank-averaged run time: {elapsed_global} s") print(f"Sorting done...\nRank-averaged run time: {elapsed_global} s")
print(f"Sorted chunk on rank {rank}/{size}: {result}") print(f"Sorted chunk on rank {rank}/{size}: {result}")
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment