Skip to content
Snippets Groups Projects
Commit 64534acb authored by Kaleb Phipps's avatar Kaleb Phipps
Browse files

Merge remote-tracking branch 'origin/main'

parents 50207328 60b7f205
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
#SBATCH --job-name=kmeans_cpu # job name
#SBATCH --partition=single # queue for resource allocation
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory
#SBATCH --nodes=1 # number of nodes to be used
#SBATCH --job-name=kmeans_cpu # Job name
#SBATCH --partition=single # Queue for resource allocation
#SBATCH --time=30:00 # Wall-clock time limit
#SBATCH --ntasks=1 # Number of tasks
#SBATCH --cpus-per-task=40 # Number of CPUs per tasks
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export VENVDIR=~/scai-venv # Export path to your virtual environment.
......
......@@ -4,7 +4,6 @@
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
......
......@@ -4,7 +4,6 @@
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
......
......@@ -4,7 +4,6 @@
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
......
%% Cell type:markdown id:ea5b6890 tags:
# Skalierbare Methoden der Künstlichen Intelligenz
Dr. Charlotte Debus (charlotte.debus@kit.edu)
Dr. Markus Götz (markus.goetz@kit.edu)
Dr. Marie Weiel (marie.weiel@kit.edu)
Dr. Kaleb Phipps (kaleb.phipps@kit.edu)
## Übung 2 am 03.12.24: Pairwise Distances und Parallel Sorting by Regular Sampling
In der zweiten Übung beschäftigen wir uns mit der parallelen Berechnung paarweiser Distanzen ("pairwise distances", siehe Vorlesung vom 14.11.24) und dem "Parallel Sorting by Regular Sampling" (PSRS) Algorithmus (siehe Vorlesung vom 07.11.24).
### Aufgabe 1
Untenstehend finden Sie eine parallele Implementierung eines Algorithmus zur Berechnung paarweiser Distanzen in `Python3`. Wir verwenden 50 000 Samples des [SUSY-Datensatzes](https://archive.ics.uci.edu/dataset/279/susy). Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5` auf dem bwUniCluster. Der SUSY-Datensatz enthält insgesamt 5 000 000 Samples aus Monte-Carlo-Simulationen hochenergetischer Teilchenkollisionen. Jedes Sample hat 18 Features, bestehend aus kinematischen Eigenschaften, die typischerweise von Teilchendetektoren gemessen werden, sowie aus diesen Messungen abgeleiteten Größen. Führen Sie den Code auf einem, zwei, vier, acht sowie 16 CPU-basierten Knoten in den Partitionen "single" bzw. "multiple" des bwUniClusters aus. Untersuchen Sie das schwache sowie das starke Skalierungsverhalten des Algorithmus und stellen Sie diese grafisch dar, z.B. mit `matplotlib.pyplot` in `Python3`.
**Zur Erinnerung (siehe auch Vorlesung vom 31.10.24):** Bei der starken Skalierung wird die Problemgröße konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein Problem konstanter Größe durch Hinzunahme von mehr Rechenressourcen schneller lösen lässt. Bei der schwachen Skalierung wird die Problemgröße pro Prozess konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein größeres Problem durch Hinzunahme von mehr Rechenressourcen in gleicher Zeit lösen lässt. Das bedeutet, dass Sie die Problemgröße zur Untersuchung des schwachen Skalierungsverhaltens proportional anpassen müssen!
**Vorgehensweise (analog zum ersten Übungsblatt):**
- Laden Sie zunächst die benötigten Module auf dem bwUniCluster.
- Setzen Sie dann eine virtuelle Umgebung mit `Python` auf, in der Sie die benötigten Pakete installieren. An dieser Stelle können Sie auch Ihre virtuelle Umgebung vom letzten Übungsblatt nutzen.
- Erstellen Sie basierend auf untenstehendem Code ein `Python`-Skript, welches Sie mithilfe eines `bash`-Skripts über SLURM auf dem Cluster submittieren (siehe Übung vom 05.11.24). Nachfolgend finden Sie ein beispielhaftes Template für das Submit-Skript für einen Multi-Node-Job inklusive der benötigten Module. Wenn Sie eine andere Anzahl an Knoten verwenden möchten, müssen Sie die `#SBATCH`-Optionen entsprechend modifizieren. Weitere Informationen dazu finden Sie [hier](https://wiki.bwhpc.de/e/BwUniCluster_2.0_Slurm_common_Features).
%% Cell type:code id:bd16f08b-584d-4913-86bb-f5bb6d4822b6 tags:
``` python
#!/bin/bash
#SBATCH --job-name=cdist # Job name
#SBATCH --partition=multiple # Queue for resource allocation
#SBATCH --nodes=2 # Number of nodes to be used
#SBATCH --time=4:00 # Wall-clock time limit
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --cpus-per-task=40 # Number of CPUs per MPI task
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/cdist.py
```
%% Cell type:code id:9e172b4f tags:
``` python
"""Parallel calculation of pairwise distances"""
import time
import h5py
import torch
from mpi4py import MPI
torch.set_default_dtype(torch.float32)
def dist(x: torch.Tensor, y: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
"""
Calculate pairwise distances between all rows (samples, i.e., along axis 0) of two tensors x and y in parallel.
The distance matrix is calculated tile-wise with ring communication between processes, each holding a piece of x
and/or y.
Parameters
----------
x : torch.Tensor
First 2d tensor (of shape m/p x f). m is the total number of samples in x, distributed over p processors.
f is the number of features.
y : torch.Tensor
Second 2d tensor (of shape n/p x f). n is the total number of samples in x, distributed over p processors.
The number of features f must be the same as for x.
comm : MPI.Comm
Communicator to use. Default is ``MPI.COMM_WORLD``.
"""
# Check whether two input tensors are compatible.
if len(x.shape) != len(y.shape) != 2:
raise ValueError("Input tensors must be two-dimensional.")
if x.shape[1] != y.shape[1]:
raise ValueError(f"Input tensors must have the same number of features but {x.shape[1]} != {y.shape[1]}.")
size, rank = comm.size, comm.rank # Set up communication.
if size == 1: # Use torch functionality in non-parallel case.
return torch.cdist(x, y)
else: # Parallel case
# --- Setup and Matrix Initialization ---
mp, f = x.shape # Get number of samples in local chunk of x and number of features.
np = y.shape[0] # Get number of samples in local chunk of y.
# Each process initializes a local matrix, `local_distances`, of shape `(mp, n)`, where `mp` is the local chunk
# size of `x`, and `n` is the total number of samples in `y`. Each rank thus calculates the distance matrix
# chunk of size `mp x n`, i.e., rank 0 has distances from its own local `x` to all other `y`'s.
# Determine overall number of samples in y.
n = comm.allreduce(np, op=MPI.SUM)
print(f"Overall number of samples is {n}.")
# Initialize rank-local chunk of mp x n distance matrix with zeros.
local_distances = torch.zeros((mp, n))
# --- Managing Chunks and Displacements ---
# Determine where to put each result in the rank-local distance matrix chunk.
# Determine number of samples (rows) in each rank-local y.
y_counts = torch.tensor(comm.allgather(np), dtype=torch.int)
# Calculate corresponding displacements from counts to record the starting index of each chunk in y. Thus, each
# process can identify where in the result matrix it should write the distances.
y_displ = (0,) + tuple(torch.cumsum(y_counts, dim=0, dtype=torch.int)[:-1])
# --- Ring Communication Pattern ---
# Calculate distances in a "ring" pattern. Each process calculates distances for its local x chunk against its
# local y chunk (diagonal calculation). Then, through `size - 1` iterations, each process sends its y chunk to
# the next process in the "ring" while receiving a new y chunk from the previous process. This continues until
# each process has calculated distances between its x chunk and all chunks of y across all processes.
# 0th iteration: Calculate diagonal of global distance matrix.
# Each process calculates distances for its local x chunk against its local y chunk.
print(f"Rank [{rank}/{size}]: Calculate diagonal blocks in global distance matrix...")
# Extract actual result columns in distance matrix chunk for each rank.
cols = (y_displ[rank], y_displ[rank + 1] if (rank + 1) != size else n)
local_distances[:, cols[0]: cols[1]] = torch.cdist(x, y)
print(f"Rank [{rank}/{size}]: Start tile-wise ring communication...")
# Remaining `size-1` iterations: Send rank-local part of y to next process in circular fashion.
for iter_idx in range(1, size):
receiver = (rank + iter_idx) % size # Determine receiving process.
sender = (rank - iter_idx) % size # Determine sending process.
# Determine columns of rank-local distance matrix chunk to write result to.
col1 = y_displ[sender]
col2 = y_displ[sender + 1] if sender != size - 1 else n
# All but first `iter_idx` processes are first receiving, then sending.
if (rank // iter_idx) != 0:
stat = MPI.Status()
# Probe for incoming message containing the next chunk of y to consider.
comm.Probe(source=sender, tag=iter_idx, status=stat)
# Determine number of samples to receive (= overall number of floats to receive / number of features).
count = int(stat.Get_count(MPI.FLOAT) / f)
# Initialize tensor for incoming chunk of y with zeros.
moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx)
# Send rank-local chunk of y to next process.
comm.Send(y, dest=receiver, tag=iter_idx)
# First `iter_idx` processes can now receive after sending.
if (rank // iter_idx) == 0:
stat = MPI.Status()
comm.Probe(source=sender, tag=iter_idx, status=stat)
count = int(stat.Get_count(MPI.FLOAT) / f)
moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx)
# Calculate distances between stationary chunk of x and currently considered, moving chunk of y.
# Write result at correct position in distance matrix.
local_distances[:, col1: col2] = torch.cdist(x, moving)
print(f"Rank [{rank}/{size}]: [DONE]")
return local_distances
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5"
dataset = "data"
if rank == 0:
print(
"######################\n"
"# Pairwise distances #\n"
"######################\n"
f"COMM_WORLD size is {size}.\n"
f"Loading data... {data_path}[{dataset}]"
)
# Parallel data loader for SUSY data.
with h5py.File(data_path, "r") as handle:
chunk = int(handle[dataset].shape[0]/size)
if rank == size - 1:
data = torch.FloatTensor(handle[dataset][rank*chunk:])
else:
data = torch.FloatTensor(handle[dataset][rank*chunk:(rank+1)*chunk])
print(f"\t[OK]\nRank [{rank}/{size}]: Local data chunk has shape {list(data.shape)}...")
if rank == 0:
print("Start distance calculations...")
# Calculate distances of all SUSY samples w.r.t. each other and measure runtime.
start = time.perf_counter()
distances = dist(data, data, comm)
local_runtime = time.perf_counter() - start
# Calculate process-averaged runtime.
average_runtime = comm.allreduce(local_runtime, op=MPI.SUM) / size
print(f"Rank [{rank}/{size}]: Local distance matrix has shape {list(distances.shape)}.")
if rank == 0:
print(f"Process-averaged run time:\t{average_runtime} s")
```
%% Cell type:markdown id:6c7900f7 tags:
### Aufgabe 2
Untenstehend finden Sie eine parallele Implementierung des PSRS-Algorithmus. Mithilfe dieses Codes sollen unterschiedliche Sequenzen von ganzen Zahlen sortiert werden. Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5` auf dem bwUniCluster. Die Datei enthält fünf verschiedene Ganzzahl-Sequenzen als Datensätze `['duplicates_10', 'duplicates_5', 'many_duplicates', 'no_duplicates', 'triplicates']`, die jeweils einen unterschiedlichen Anteil an Duplikaten bzw. Triplikaten enthalten. Alle Sequenzen bestehen aus $10^9$ Elementen.
Führen Sie den Code mithilfe des untenstehenden Submit-Skripts für alle fünf Datensätze auf vier CPU-basierten Knoten in der Partition "multiple" des bwUniClusters aus. Die Datensätze können über ein Command-Line-Argument des Python-Skripts spezifiziert werden, z.B. `mpirun python psrs.py --dataset no_duplicates`. Messen und vergleichen Sie die Laufzeiten. Was fällt Ihnen auf?
%% Cell type:code id:691eeb17 tags:
``` python
#!/bin/bash
#SBATCH --job-name=psrs # Job name
#SBATCH --partition=dev_multiple # Queue for the resource allocation.
#SBATCH --time=5:00 # Wall-clock time limit
#SBATCH --mem=90000 # Memory per node
#SBATCH --nodes=4 # Number of nodes to be used
#SBATCH --cpus-per-task=40 # Number of CPUs per MPI task
#SBATCH --ntasks-per-node=1 # Number of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python ${PYDIR}/psrs.py --dataset no_duplicates # Specify dataset via command-line argument.
```
%% Cell type:code id:529e07b3 tags:
``` python
"""Parallel Sorting by Regular Sampling"""
import argparse
import time
import h5py
import numpy
import torch
from mpi4py import MPI
__mpi_type_mappings = {
torch.bool: MPI.BOOL,
torch.uint8: MPI.UNSIGNED_CHAR,
torch.int8: MPI.SIGNED_CHAR,
torch.int16: MPI.SHORT,
torch.int32: MPI.INT,
torch.int64: MPI.LONG,
torch.bfloat16: MPI.INT16_T,
torch.float16: MPI.INT16_T,
torch.float32: MPI.FLOAT,
torch.float64: MPI.DOUBLE,
torch.complex64: MPI.COMPLEX,
torch.complex128: MPI.DOUBLE_COMPLEX,
}
def sort(a: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
"""
Sort a's elements along given dimension in ascending order by their value.
The sorting is not stable which means that equal elements in the result may have different ordering than in
original array.
Parameters
----------
a : torch.Tensor
The 1D input array to be sorted.
comm : MPI.Comm
The communicator to use.
Returns
-------
torch.Tensor
Sorted local results.
"""
size, rank = comm.size, comm.rank
if size == 1:
local_sorted, _ = torch.sort(a)
return local_sorted
else:
###########
# PHASE 1 #
###########
# P is comm size, n is overall number of samples.
# Each rank sorts its local chunk and chooses P regular samples as representatives.
if rank == 0:
print(
"###########\n"
"# PHASE 1 #\n"
"###########"
)
local_sorted, local_indices = torch.sort(a)
print(f"Rank {rank}/{size}: Local sorting done...[OK]")
n_local = torch.numel(local_sorted) # Number of elements in local chunk.
print(f"Rank {rank}/{size}: Number of elements in local chunk is {n_local}.")
n_global = comm.allreduce(n_local, op=MPI.SUM)
# Each rank chooses P regular samples.
# For this, separate sorted tensor into P+1 equal-length partitions.
# Regular samples have indices 0, w, 2w,...,(P−1)w where w=n/P^2.
# Here: `size` = P
w = int(n_global / size ** 2)
partitions = [idx * w for idx in range(0, size)]
regular_samples_local = local_sorted[partitions]
print(
f"Rank {rank}/{size}: There are {len(partitions)} local regular samples: {regular_samples_local}"
)
# Root gathers regular samples.
# Each processor has drawn `size` regular samples and we have `size` processors overall.
num_regular_samples_global = size * size
if rank == 0:
print(f"Overall number of regular samples is {num_regular_samples_global}.")
# Initialize buffer to gather all regular samples on root.
regular_samples_global = torch.zeros(num_regular_samples_global, dtype=a.dtype)
comm.Gather(regular_samples_local, regular_samples_global, root=0)
if rank == 0:
print("On root: Regular samples gathered...[OK]")
###########
# PHASE 2 #
###########
# Root
# - sorts gathered regular samples,
# - chooses P-1 pivots at index positions P + rho, 2P + rho, ..., (P-1)P + rho with rho = P/2 -1, and
# - shares them with other processes.
if rank == 0:
print(
"###########\n"
"# PHASE 2 #\n"
"###########"
)
# Initialize buffer for sharing pivots later.
global_pivots = torch.zeros((size - 1,), dtype=local_sorted.dtype)
if rank == 0:
# Sort gathered regular samples on root.
sorted_regular_samples_global, _ = torch.sort(regular_samples_global)
print(f"On root: Regular samples are {sorted_regular_samples_global}.")
# Choose P-1 pivot indices (P = `size`).
rho = int(size/2) - 1
global_partitions = [
idx * size + rho for idx in range(1, size)
]
global_pivots = sorted_regular_samples_global[global_partitions]
if len(global_partitions) == size - 1:
print(
f"On root: There are {len(global_partitions)} global pivots: {global_pivots}"
)
comm.Bcast(
global_pivots, root=0
) # Broadcast copy of pivots to all processes from root.
if rank == 0:
print("Pivots broadcast to all processes...")
###########
# PHASE 3 #
###########
if rank == 0:
print(
"###########\n"
"# PHASE 3 #\n"
"###########\n"
"Each processor forms P disjunct partitions of locally sorted elements using pivots as splits."
)
# Each processor forms P disjunct partitions of locally sorted elements using P-1 pivots as splits.
# Initialize buffer to store partition mask of local samples.
# `lt_partitions` is a tensor of shape [P, n_local]. We use it to assign local samples to the partitions as
# split by the global pivots. This is done via a boolean mask.
lt_partitions = torch.empty((size, local_sorted.shape[0]), dtype=torch.int64)
last = torch.zeros_like(local_sorted, dtype=torch.int64)
# Iterate over all pivots and store index of first pivot greater than element's value.
if rank == 0:
print("Iterate over pivots to find index of first pivot > element's value.")
for idx, pivot in enumerate(global_pivots):
# torch.lt(input, other, *, out=None) computes `input < other` element-wise.
# Returns boolean tensor that is True where input is less than other and False elsewhere.
lt = torch.lt(local_sorted, pivot).int()
lt_partitions[idx] = lt - last
last = lt
lt_partitions[size - 1] = torch.ones_like(local_sorted, dtype=last.dtype) - last
# `lt_partitions` contains P elements: the first one encodes which elements in `local_sorted` are smaller than
# the 1st (= smallest) pivot, the second one encodes which elements are larger than the 1st and smaller than the
# 2nd pivot, ..., and the last encodes which elements are larger than last ( = largest) pivot.
# Now each processor has partitioned its locally sorted data into P partitions.
# To obtain the globally sorted result, each processor needs to send its i-th partition to processor i so that
# each processor i finally holds all samples from partition i.
# This all-to-all communication requires information about how many values will be sent for each partition.
# Processor i keeps i-th partitions and sends j-th partition to processor j.
local_partition_sizes = torch.sum(
lt_partitions, dim=1
) # How many values are in each local partition?
print(
f"Rank {rank}/{size}: Local # elements to be sent to other ranks (keep own section): {local_partition_sizes}"
)
global_partition_sizes = torch.zeros_like(
local_partition_sizes
) # How many values are in each global partition?
comm.Allreduce(local_partition_sizes, global_partition_sizes, op=MPI.SUM)
if rank == 0:
print(
f"Global # of elements on all ranks (partition matrix): {global_partition_sizes}"
)
# Construct matrix holding information which local value will be shipped where.
communication_index_vector = torch.empty_like(local_sorted, dtype=torch.int64)
# Loop over `lt_partitions` (binary encoding of which elements is in which partition formed by pivots).
for idx_rank, partition_mask_value in enumerate(lt_partitions):
communication_index_vector[partition_mask_value > 0] = idx_rank
# Elements in 0th partition (< first pivot) get 0, i.e., will be collected at rank 0, elements in 1st
# partition (> than first + < than second pivot) get 1, i.e., will be collected at rank 1,...
print(f"Rank {rank}/{size}: Ship element to rank: {communication_index_vector}")
# Counts and displacements for Alltoallv are rank-specific!
# send_counts_local on rank i: Integer array, entry j specifies number of values to be sent to rank j.
# recv_counts_local on rank i: Integer array, entry j specifies number of values to be received from rank j.
# Determine how many elements are to be shipped to which rank.
send_counts_local = numpy.zeros(size, dtype=int)
# Determine number of elements in each local partition.
for idx_rank in numpy.arange(size):
send_counts_local[idx_rank] = int((communication_index_vector == idx_rank).sum(dim=0))
# Determine local send displacements from local send counts.
send_displ_local = numpy.zeros(size, dtype=int)
send_displ_local[1:] = numpy.cumsum(send_counts_local, axis=0)[:-1]
# Allgather local send counts to determine how many elements are sent by other ranks to this rank and thus are
# to be received.
send_counts_global = numpy.zeros((size, size), dtype=int)
comm.Allgather([send_counts_local, MPI.INT], [send_counts_global, MPI.INT])
# Determine global receive counts from transposed global send counts.
recv_counts_global = numpy.transpose(send_counts_global)
# Extract local receive counts.
recv_counts_local = recv_counts_global[rank]
# Determine local receive displacements from local receive counts.
recv_displ_local = numpy.zeros(size, dtype=int)
recv_displ_local[1:] = numpy.cumsum(recv_counts_local, axis=0)[:-1]
value_buffer = torch.zeros((global_partition_sizes[rank],), dtype=local_sorted.dtype)
# Communicating torch tensors using buffered MPI variants requires accessing the tensor's storage pointer!
send_buf = [
MPI.memory.fromaddress(local_sorted.data_ptr(), 0),
(send_counts_local.tolist(), send_displ_local.tolist()),
__mpi_type_mappings[local_sorted.dtype],
]
recv_buf = [
MPI.memory.fromaddress(value_buffer.data_ptr(), 0),
(recv_counts_local.tolist(), recv_displ_local.tolist()),
__mpi_type_mappings[value_buffer.dtype],
]
comm.Alltoallv(send_buf, recv_buf)
result, _ = torch.sort(value_buffer)
return result
if __name__ == "__main__":
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5"
parser = argparse.ArgumentParser(prog="Parallel Sorting by Regular Samples")
parser.add_argument(
"--dataset",
type=str,
default="duplicates_1",
help="The dataset to be sorted.",
)
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
with h5py.File(data_path, "r") as f:
chunk = int(f[args.dataset].shape[0] / size)
if rank == size - 1:
data = torch.tensor(f[args.dataset][rank * chunk:])
else:
data = torch.tensor(f[args.dataset][rank * chunk:(rank + 1) * chunk])
if rank == 0:
print(
"########\n"
"# PSRS #\n"
"########"
)
print(f"Local data on rank {rank} = {data}")
if rank == 0:
print("Start sorting...")
start = time.perf_counter()
result = sort(data)
elapsed_local = time.perf_counter() - start
elapsed_global = comm.allreduce(elapsed_local, op=MPI.SUM)
elapsed_global /= size
if rank == 0:
print(f"Sorting done...\nRank-averaged run time: {elapsed_global} s")
print(f"Sorted chunk on rank {rank}/{size}: {result}")
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment