Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • marie.weiel/scalableai2425
1 result
Show changes
Commits on Source (2)
Showing
with 1553 additions and 27 deletions
"""
Serial implementation of k-means clustering in PyTorch
"""
import time
import h5py
import torch
class KMeans:
"""
Serial k-means clustering in PyTorch.
Attributes
----------
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to perform.
tol : float
The tolerance for the convergence criterion.
_centroids : Union[None, torch.Tensor]
The current centroids.
_matching_centroids : Union[None, torch.Tensor]
Assigned centroids for all samples in dataset.
_inertia : float
The inertia (quantity to be checked for convergence).
Methods
-------
_initialize_centroids(x)
Randomly initialize centroids.
_fit_to_cluster(x)
Get the closest centroid for each sample in dataset.
fit(x)
Perform k-means clustering.
"""
def __init__(
self, n_clusters: int = 8, max_iter: int = 300, tol: float = -1.0
) -> None:
"""
Configure k-means clustering algorithm.
Parameters
----------
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self._centroids = None
self._matching_centroids = None
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
# Shuffle data and choose first `n_clusters` samples as initial centroids.
self._centroids = x[torch.randperm(x.shape[0])[: self.n_clusters]]
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._centroids
) # Calculate Euclidean distance of each data sample to each current centroid.
return distances.argmin(
dim=1, keepdim=True
) # Return index of the closest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._centroids.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine index of the closest centroid for each sample in dataset.
print(f"Iteration {idx}...")
self._matching_centroids = self._fit_to_cluster(
x
) # Array of length `n_samples` providing index of closest centroid for each sample in dataset.
# Update centroids.
for i in range(self.n_clusters): # Loop over clusters.
# Determine all points in current cluster.
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Array of length `n_samples` with binary encoding of whether each sample belongs to cluster i or not.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Compute vectorial sum of all points in current cluster.
points_in_cluster = selection_mask.sum(axis=0, keepdim=True).clamp(
1, torch.iinfo(torch.int64).max
) # Compute number of points in current cluster.
new_cluster_centers[i : i + 1, :] = (
assigned_points / points_in_cluster
) # Compute new centroids.
# Check whether centroid movement has converged.
self._inertia = (
(self._centroids - new_cluster_centers) ** 2
).sum() # Update inertia.
self._centroids = new_cluster_centers.clone()
if (
self.tol is not None and self._inertia <= self.tol
): # Check whether inertia is smaller than tolerance.
break
return self
if __name__ == "__main__":
print(
"##############################\n"
"# PyTorch k-Means Clustering #\n"
"##############################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu"
) # Set device as available.
if torch.cuda.is_available():
print("Running on GPU...")
else:
print("Running on CPU...")
print(f"Loading dataset from {data_path}[{dataset}]...")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
# Open file for reading. We use the Cityscapes dataset.
with h5py.File(data_path, "r") as handle:
print("Open h5 file...")
data = torch.tensor(
handle[dataset][:300], device=device
) # Default device is "cpu", set to "cuda" for GPU.
print("Torch tensor created.")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(n_clusters=num_clusters, max_iter=num_iterations)
print("Start fitting the data...")
start = time.perf_counter() # Start timer.
kmeans_clusterer.fit(data) # Perform k-means clustering.
print(
f"DONE.\nRun time: \t{time.perf_counter() - start} s"
) # Print measured runtime.
##############################
# PyTorch k-Means Clustering #
##############################
Running on CPU...
Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]...
Open h5 file...
Torch tensor created.
Start fitting the data...
Iteration 0...
Iteration 1...
Iteration 2...
Iteration 3...
Iteration 4...
Iteration 5...
Iteration 6...
Iteration 7...
Iteration 8...
Iteration 9...
Iteration 10...
Iteration 11...
Iteration 12...
Iteration 13...
Iteration 14...
Iteration 15...
Iteration 16...
Iteration 17...
Iteration 18...
Iteration 19...
DONE.
Run time: 892.7324264449999 s
============================= JOB FEEDBACK =============================
NodeName=uc2n265
Job ID: 24553849
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 1
Cores per node: 2
CPU Utilized: 00:12:41
CPU Efficiency: 40.31% of 00:31:28 core-walltime
Job Wall-clock time: 00:15:44
Memory Utilized: 20.56 GB
Memory Efficiency: 52.63% of 39.06 GB
##############################
# PyTorch k-Means Clustering #
##############################
Running on GPU...
Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]...
Open h5 file...
Torch tensor created.
Start fitting the data...
Iteration 0...
Iteration 1...
Iteration 2...
Iteration 3...
Iteration 4...
Iteration 5...
Iteration 6...
Iteration 7...
Iteration 8...
Iteration 9...
Iteration 10...
Iteration 11...
Iteration 12...
Iteration 13...
Iteration 14...
Iteration 15...
Iteration 16...
Iteration 17...
Iteration 18...
Iteration 19...
DONE.
Run time: 12.904788984917104 s
============================= JOB FEEDBACK =============================
NodeName=uc2n513
Job ID: 24553850
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 1
Cores per node: 10
CPU Utilized: 00:00:16
CPU Efficiency: 1.39% of 00:19:10 core-walltime
Job Wall-clock time: 00:01:55
Memory Utilized: 4.37 GB
Memory Efficiency: 4.76% of 91.80 GB
#!/bin/bash
#SBATCH --job-name=kmeans_cpu # job name
#SBATCH --partition=single # queue for resource allocation
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory
#SBATCH --nodes=1 # number of nodes to be used
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
python -u ${PYDIR}/kmeans.py # Run your Python script.
#!/bin/bash
#SBATCH --job-name=kmeans_gpu
#SBATCH --partition=gpu_4
#SBATCH --gres=gpu:1 # number of requested GPUs (GPU nodes shared between multiple jobs)
#SBATCH --time=10:00 # wall-clock time limit
#SBATCH --nodes=1
#SBATCH --mail-type=ALL
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
python -u ${PYDIR}/kmeans.py # Run your Python script.
"""
Sample-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import torch
from mpi4py import MPI
class KMeans:
"""
Sample-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Sample parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be
all-gathered by all ranks.
2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the
centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to
be all-reduced to all ranks.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure sample-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
For sample parallelism, each rank holds an exclusive fraction of samples from the global dataset.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`.
n_clusters_local = (
self.n_clusters // self.comm.size
) # Determine number of local cluster centers per rank.
# Choose first `n_cluster_local` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x.shape[0]` gives number of samples in data.
x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]]
print(
f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = "
f"{list(x_local.shape)}."
)
# --- Communication required for sample-parallel: All-gather local centroids to all ranks ---
self._cluster_centers = torch.empty(
(self.n_clusters, x.shape[1]), dtype=torch.float
) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids.
self.comm.Allgather(
[x_local, MPI.FLOAT], [self._cluster_centers, MPI.FLOAT]
) # All-gather local centroids to all ranks.
if self.comm.rank == 0:
print(
f"Global cluster centers have shape [#samples, #features] = {list(self._cluster_centers.shape)}.",
)
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._cluster_centers
) # Calculate Euclidean distances between samples and centroids.
return distances.argmin(
dim=1, keepdim=True
) # Determine index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(
f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...",
)
self._matching_centroids = self._fit_to_cluster(
x
) # Determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Local directed sum of samples in cluster i.
# Local number of samples in cluster i.
points_in_cluster = (
selection_mask.sum(axis=0, keepdim=True)
.clamp(0.0, torch.iinfo(torch.int64).max)
.type(torch.int64)
) # Clamp to 0 before communication
# --- Communication required for sample-parallel: Allreduce local results ---
assigned_points_global = torch.empty(
assigned_points.shape, dtype=torch.float
) # Initialize variable to store global directed sum of points assigned to cluster i.
points_in_cluster_global = torch.empty(
points_in_cluster.shape, dtype=torch.int64
) # Initialize variable to store global number of points assigned to cluster i.
self.comm.Allreduce(
assigned_points, assigned_points_global, op=MPI.SUM
) # All-reduce local directed sums of points assigned to cluster i.
self.comm.Allreduce(
points_in_cluster, points_in_cluster_global, op=MPI.SUM
) # All-reduce local numbers of points assigned to cluster i.
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points_global / points_in_cluster_global.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"##############################################\n"
"# Sample-parallel PyTorch k-means clustering #\n"
"##############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[0] / size
) # Calculate local number of samples in each chunk.
# Load data chunks from file.
if rank == size - 1:
data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float)
else:
data = torch.tensor(
handle[dataset][rank * chunk : (rank + 1) * chunk], dtype=torch.float
)
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter()-start} s"
) # Print measured runtime.
"""
Sample-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import numpy as np
import torch
from mpi4py import MPI
class KMeans:
"""
Sample-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Sample parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be
all-gathered by all ranks.
2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the
centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to
be all-reduced to all ranks.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure sample-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x):
"""
Randomly initialize the centroids.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`.
# Determine number of local cluster centers per rank.
n_clusters_base = (
self.n_clusters // self.comm.size
) # Determine base number of local cluster centers per rank.
n_clusters_remain = self.n_clusters % self.comm.size # Determine remainder.
n_clusters_local = n_clusters_base
if self.comm.rank < n_clusters_remain: # Distribute remainder over first ranks.
n_clusters_local += 1
# Choose first `n_cluster_local` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x` = data, `x.shape[0]` gives number of samples in data.
x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]]
print(
f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = "
f"{list(x_local.shape)}."
)
# --- Communication required for sample-parallel: All-gather local centroids to all ranks ---
cluster_centers = torch.empty(
(self.n_clusters, x.shape[1]), dtype=torch.float
) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids.
# For vector variants: recvbuf = [data, counts, displacements, type]
# counts and displacements are integer tuples with as many elements as tasks in communicator.
# counts[i] designates the size of i-th segment,
# displacements[i] the number of elements in data used as first element of i-th section.
# Set up counts array (of length comm.size) containing the number of elements to be received from each rank.
counts = n_clusters_base * torch.ones(self.comm.size)
counts[:n_clusters_remain] += 1
counts = torch.mul(
counts, x.shape[1]
)
if rank == 0:
print(f"Recvbuf counts = {counts}")
# Set up displs array (of length comm.size); entry i specifies the displacement (relative to recvbuf)
# at which to place the incoming data from process i.
displ = torch.zeros(self.comm.size)
displ[1:] = torch.cumsum(counts, dim=0)[
:-1
]
if rank == 0:
print(f"Recvbuf displacements = {displ}")
sendbuf = [x_local, MPI.FLOAT] # Set up send buffer.
recvbuf = [
cluster_centers,
np.asarray(counts, dtype=int),
np.asarray(displ, dtype=int),
MPI.FLOAT,
] # Set up receive buffer.
self.comm.Allgatherv(
sendbuf, recvbuf
) # All-gather local centroids to all ranks.
if rank == 0:
print(
f"Global cluster centers have shape [#samples, #features] = {list(cluster_centers.shape)}."
)
self._cluster_centers = cluster_centers # Set centroids as class attribute.
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._cluster_centers
) # Calculate Euclidean distances between samples and centroids.
return distances.argmin(
dim=1, keepdim=True
) # Determine index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(
f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...",
)
self._matching_centroids = self._fit_to_cluster(
x
) # Determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Local directed sum of samples in cluster i.
# Local number of samples in cluster i.
points_in_cluster = (
selection_mask.sum(axis=0, keepdim=True)
.clamp(0.0, torch.iinfo(torch.int64).max)
.type(torch.int64)
) # Clamp to 0 before communication
# ------------ Communication required for sample-parallel: Allreduce local results ------------
assigned_points_global = torch.empty(
assigned_points.shape, dtype=torch.float
) # Initialize variable to store global directed sum of points assigned to cluster i.
points_in_cluster_global = torch.empty(
points_in_cluster.shape, dtype=torch.int64
) # Initialize variable to store global number of points assigned to cluster i.
self.comm.Allreduce(
assigned_points, assigned_points_global, op=MPI.SUM
) # All-reduce local directed sums of points assigned to cluster i.
self.comm.Allreduce(
points_in_cluster, points_in_cluster_global, op=MPI.SUM
) # All-reduce local numbers of points assigned to cluster i.
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points_global / points_in_cluster_global.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"##############################################\n"
"# Sample-parallel PyTorch k-means clustering #\n"
"##############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[0] / size
) # Calculate local number of samples in each chunk.
# Load data chunks from file (distributed along sample axis).
if rank == size - 1:
data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float)
else:
data = torch.tensor(
handle[dataset][rank * chunk : (rank + 1) * chunk],
dtype=torch.float,
)
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 9
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter() - start} s"
) # Print measured runtime.
##############################################
# PyTorch sample-parallel k-means clustering #
##############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Local cluster centers have shape [#samples, #features] = [3, 6291456].
Recvbuf counts = tensor([18874368., 12582912., 12582912., 12582912.])
Recvbuf displacements = tensor([ 0., 18874368., 31457280., 44040192.])
Rank 3/4: [OK]
Rank 2/4: [OK]
Rank 1/4: [OK]
Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Global cluster centers have shape [#samples, #features] = [9, 6291456].
Rank 2/4: Iteration 0...
Rank 1/4: Iteration 0...
Rank 0/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 0/4: Iteration 1...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 1/4: Iteration 2...
Rank 2/4: Iteration 2...
Rank 0/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 0/4: Iteration 3...
Rank 2/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 1/4: Iteration 4...
Rank 0/4: Iteration 4...
Rank 2/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 2/4: Iteration 5...
Rank 0/4: Iteration 5...
Rank 1/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 1/4: Iteration 6...
Rank 0/4: Iteration 6...
Rank 2/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 2/4: Iteration 7...
Rank 0/4: Iteration 7...
Rank 1/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 1/4: Iteration 8...
Rank 0/4: Iteration 8...
Rank 2/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 0/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 1/4: Iteration 10...
Rank 2/4: Iteration 10...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 2/4: Iteration 11...
Rank 1/4: Iteration 11...
Rank 0/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 1/4: Iteration 12...
Rank 0/4: Iteration 12...
Rank 2/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 0/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 1/4: Iteration 14...
Rank 0/4: Iteration 14...
Rank 2/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 2/4: Iteration 15...
Rank 0/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 0/4: Iteration 16...
Rank 1/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 1/4: Iteration 18...
Rank 0/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 2/4: Iteration 19...
Rank 1/4: Iteration 19...
Rank 0/4: Iteration 19...
Rank 3/4: Iteration 19...
DONE.
Run time: 205.8100015646778 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553925
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:14:02
CPU Efficiency: 1.07% of 21:52:00 core-walltime
Job Wall-clock time: 00:04:06
Memory Utilized: 4.62 GB
Memory Efficiency: 2.96% of 156.25 GB
##############################################
# PyTorch sample-parallel k-means clustering #
##############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 2/4: [OK]
Rank 1/4: [OK]
Rank 3/4: [OK]
Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Global cluster centers have shape [#samples, #features] = [8, 6291456].
Rank 1/4: Iteration 0...
Rank 2/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 0/4: Iteration 0...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 0/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 2/4: Iteration 2...
Rank 1/4: Iteration 2...
Rank 0/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 2/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 0/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 2/4: Iteration 4...
Rank 0/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 1/4: Iteration 4...
Rank 1/4: Iteration 5...
Rank 2/4: Iteration 5...
Rank 0/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 2/4: Iteration 6...
Rank 0/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 1/4: Iteration 6...
Rank 1/4: Iteration 7...
Rank 2/4: Iteration 7...
Rank 0/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 2/4: Iteration 8...
Rank 1/4: Iteration 8...
Rank 0/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 0/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 2/4: Iteration 10...
Rank 1/4: Iteration 10...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 1/4: Iteration 11...
Rank 2/4: Iteration 11...
Rank 0/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 2/4: Iteration 12...
Rank 0/4: Iteration 12...
Rank 1/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 2/4: Iteration 13...
Rank 0/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 14...
Rank 1/4: Iteration 14...
Rank 0/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 0/4: Iteration 15...
Rank 2/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 1/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 0/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 1/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 0/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 1/4: Iteration 19...
Rank 0/4: Iteration 19...
Rank 2/4: Iteration 19...
Rank 3/4: Iteration 19...
DONE.
Run time: 187.21997308498248 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553855
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:12:47
CPU Efficiency: 1.05% of 20:16:00 core-walltime
Job Wall-clock time: 00:03:48
Memory Utilized: 15.63 GB (estimated maximum)
Memory Efficiency: 10.00% of 156.25 GB (39.06 GB/node)
#!/bin/bash
#SBATCH --job-name=kmeans_sample # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_sample_parallel.py # Run your Python script.
#!/bin/bash
#SBATCH --job-name=kmeans_sample_v # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_sample_parallel_allgatherv.py # Run your Python script.
"""
Feature-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import torch
from mpi4py import MPI
class KMeans:
"""
Feature-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the feature axis, i.e., each rank
holds all samples but only an exclusive fraction of their features). and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Feature parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Either, one rank determines the initial centroids and broadcasts them to all other ranks
or all ranks randomly choose the initial centroids using the same seed.
2. Cluster assignment: To determine the nearest centroid for each sample, we need to calculate its distance to each
cluster center considering all features. To achieve this, we compute the contribution of each local fraction of
features to the squared distance before summing these local squared distances up to obtain a global value.
3. Convergence check: Again, the local contributions to the inertia need to be summed up to obtain a global value.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure feature-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
For feature parallelism, all samples of the dataset are partially present on each rank.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
print(
f"Rank {self.comm.rank}/{self.comm.size}: Shape of data [#samples, #features] = {list(x.shape)}"
)
# As all samples are partially present on each rank, only shuffle on root to determine centroids.
# Alternatively, all ranks can shuffle using the same seed.
if rank == 0:
indices = torch.randperm(x.shape[0])[
: self.n_clusters
] # Choose first `n_cluster` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x.shape[0]` gives number of samples in data.
else:
indices = torch.empty(self.n_clusters, dtype=torch.int64)
self.comm.Bcast(indices, root=0)
self._cluster_centers = x[indices] # Set centroids as class attribute.
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
squared_distances_local = (
torch.cdist(x, self._cluster_centers) ** 2
) # Calculate squared distance between samples and centroids for locally present fraction of features.
squared_distances_global = torch.empty(
squared_distances_local.shape, dtype=torch.float
) # Initialize variable for global squared distance between samples and centroids for all features.
self.comm.Allreduce(
squared_distances_local, squared_distances_global, op=MPI.SUM
) # Sum up squared distances over features.
return squared_distances_global.argmin(
dim=1, keepdim=True
) # Return index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...")
self._matching_centroids = self._fit_to_cluster(
x
) # Redundantly determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Sum up samples in cluster i.
points_in_cluster = selection_mask.sum(
axis=0, keepdim=True
).clamp( # Determine overall number of samples in cluster i.
1.0, torch.iinfo(torch.int64).max
)
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points / points_in_cluster.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
inertia_local = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
inertia_global = torch.empty(inertia_local.shape)
self.comm.Allreduce(inertia_local, inertia_global, op=MPI.SUM)
self._inertia = inertia_global
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"###############################################\n"
"# Feature-parallel PyTorch k-means clustering #\n"
"###############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[1] / size
) # Calculate local number of features in each chunk.
if (
rank == size - 1
): # Load data chunks from file (distributed along feature axis).
data = torch.tensor(handle[dataset][:, rank * chunk :])
else:
data = torch.tensor(handle[dataset][:, rank * chunk : (rank + 1) * chunk])
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter() - start} s"
) # Print measured runtime.
###############################################
# Feature-parallel PyTorch k-means clustering #
###############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 1/4: [OK]
Rank 1/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 3/4: [OK]
Rank 3/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 2/4: [OK]
Rank 2/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 0/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 2/4: Iteration 0...
Rank 1/4: Iteration 0...
Rank 0/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 0/4: Iteration 2...
Rank 1/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 2/4: Iteration 2...
Rank 0/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 2/4: Iteration 3...
Rank 0/4: Iteration 4...
Rank 1/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 2/4: Iteration 4...
Rank 0/4: Iteration 5...
Rank 2/4: Iteration 5...
Rank 1/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 0/4: Iteration 6...
Rank 1/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 2/4: Iteration 6...
Rank 0/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 2/4: Iteration 7...
Rank 1/4: Iteration 7...
Rank 0/4: Iteration 8...
Rank 2/4: Iteration 8...
Rank 1/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 0/4: Iteration 9...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 1/4: Iteration 10...
Rank 2/4: Iteration 10...
Rank 0/4: Iteration 11...
Rank 2/4: Iteration 11...
Rank 1/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 0/4: Iteration 12...
Rank 1/4: Iteration 12...
Rank 2/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 0/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 0/4: Iteration 14...
Rank 1/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 2/4: Iteration 14...
Rank 0/4: Iteration 15...
Rank 2/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 0/4: Iteration 16...
Rank 1/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 18...
Rank 1/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 0/4: Iteration 19...
Rank 1/4: Iteration 19...
Rank 3/4: Iteration 19...
Rank 2/4: Iteration 19...
DONE.
Run time: 174.21738585596904 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553857
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:11:56
CPU Efficiency: 1.04% of 19:06:40 core-walltime
Job Wall-clock time: 00:03:35
Memory Utilized: 5.43 GB
Memory Efficiency: 3.47% of 156.25 GB
#!/bin/bash
#SBATCH --job-name=kmeans_feature # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_feature_parallel.py # Run your Python script.
%% Cell type:markdown id:ea5b6890 tags:
# Skalierbare Methoden der Künstlichen Intelligenz
Dr. Charlotte Debus (charlotte.debus@kit.edu)
Dr. Markus Götz (markus.goetz@kit.edu)
Dr. Marie Weiel (marie.weiel@kit.edu)
Dr. Kaleb Phipps (kaleb.phipps@kit.edu)
## Übung 2 am 03.12.24: Parallel Sorting by Regular Sampling und Pairwise Distances
In der zweiten Übung beschäftigen wir uns mit dem "Parallel Sorting by Regular Sampling" (PSRS) Algorithmus (siehe Vorlesung vom 07.11.24) und der parallelen Berechnung paarweiser Distanzen ("pairwise distances", siehe Vorlesung vom 14.11.24).
## Übung 2 am 03.12.24: Pairwise Distances und Parallel Sorting by Regular Sampling
In der zweiten Übung beschäftigen wir uns mit der parallelen Berechnung paarweiser Distanzen ("pairwise distances", siehe Vorlesung vom 14.11.24) und dem "Parallel Sorting by Regular Sampling" (PSRS) Algorithmus (siehe Vorlesung vom 07.11.24).
### Aufgabe 1
Untenstehend finden Sie eine parallele Implementierung eines Algorithmus zur Berechnung paarweiser Distanzen in `Python3`. Wir verwenden 50 000 Samples des [SUSY-Datensatzes](https://archive.ics.uci.edu/dataset/279/susy). Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5` auf dem bwUniCluster. Der SUSY-Datensatz enthält insgesamt 5 000 000 Samples aus Monte-Carlo-Simulationen hochenergetischer Teilchenkollisionen. Jedes Sample hat 18 Features, bestehend aus kinematischen Eigenschaften, die typischerweise von Teilchendetektoren gemessen werden, sowie aus diesen Messungen abgeleiteten Größen. Führen Sie den Code auf einem, zwei, vier, acht sowie 16 CPU-basierten Knoten in den Partitionen "single" bzw. "multiple" des bwUniClusters aus. Untersuchen Sie das schwache sowie das starke Skalierungsverhalten des Algorithmus und stellen Sie diese grafisch dar, z.B. mit `matplotlib.pyplot` in `Python3`.
**Zur Erinnerung (siehe auch Vorlesung vom 2.11.23):** Bei der starken Skalierung wird die Problemgröße konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein Problem konstanter Größe durch Hinzunahme von mehr Rechenressourcen schneller lösen lässt. Bei der schwachen Skalierung wird die Problemgröße pro Prozess konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein größeres Problem durch Hinzunahme von mehr Rechenressourcen in gleicher Zeit lösen lässt. Das bedeutet, dass Sie die Problemgröße zur Untersuchung des schwachen Skalierungsverhaltens proportional anpassen müssen!
**Zur Erinnerung (siehe auch Vorlesung vom 31.10.24):** Bei der starken Skalierung wird die Problemgröße konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein Problem konstanter Größe durch Hinzunahme von mehr Rechenressourcen schneller lösen lässt. Bei der schwachen Skalierung wird die Problemgröße pro Prozess konstant gehalten, während man die Anzahl der Prozesse erhöht, d.h. es wird untersucht, inwieweit sich ein größeres Problem durch Hinzunahme von mehr Rechenressourcen in gleicher Zeit lösen lässt. Das bedeutet, dass Sie die Problemgröße zur Untersuchung des schwachen Skalierungsverhaltens proportional anpassen müssen!
**Vorgehensweise (analog zum ersten Übungsblatt):**
- Laden Sie zunächst die benötigten Module auf dem bwUniCluster.
- Setzen Sie dann eine virtuelle Umgebung mit `Python` auf, in der Sie die benötigten Pakete installieren. An dieser Stelle können Sie auch Ihre virtuelle Umgebung vom letzten Übungsblatt nutzen.
- Erstellen Sie basierend auf untenstehendem Code ein `Python`-Skript, welches Sie mithilfe eines `bash`-Skripts über SLURM auf dem Cluster submittieren (siehe Übung vom 05.11.24). Nachfolgend finden Sie ein beispielhaftes Template für das Submit-Skript für einen Multi-Node-Job inklusive der benötigten Module. Wenn Sie eine andere Anzahl an Knoten verwenden möchten, müssen Sie die `#SBATCH`-Optionen entsprechend modifizieren. Weitere Informationen dazu finden Sie [hier](https://wiki.bwhpc.de/e/BwUniCluster_2.0_Slurm_common_Features).
%% Cell type:code id:bd16f08b-584d-4913-86bb-f5bb6d4822b6 tags:
``` python
#!/bin/bash
#SBATCH --job-name=cdist # job name
#SBATCH --partition=multiple # queue for resource allocation
#SBATCH --nodes=2 # number of nodes to be used
#SBATCH --time=4:00 # wall-clock time limit
#SBATCH --mem=90000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --job-name=cdist # Job name
#SBATCH --partition=multiple # Queue for resource allocation
#SBATCH --nodes=2 # Number of nodes to be used
#SBATCH --time=4:00 # Wall-clock time limit
#SBATCH --mem=90000 # Memory per node
#SBATCH --cpus-per-task=40 # Number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export IBV_FORK_SAFE=1
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun --mca mpi_warn_on_fork 0 python -u ${PYDIR}/cdist.py
```
%% Cell type:code id:9e172b4f tags:
``` python
"""Parallel calculation of pairwise distances"""
import time
import h5py
import torch
from mpi4py import MPI
torch.set_default_dtype(torch.float32)
def dist(x: torch.Tensor, y: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
"""
Calculate pairwise distances between all rows (samples, i.e., along axis 0) of two tensors x and y in parallel.
The distance matrix is calculated tile-wise with ring communication between processes, each holding a piece of x
and/or y.
Parameters
----------
x : torch.Tensor
First 2d tensor (of shape m/p x f). m is the total number of samples in x, distributed over p processors.
f is the number of features.
y : torch.Tensor
Second 2d tensor (of shape n/p x f). n is the total number of samples in x, distributed over p processors.
The number of features f must be the same as for x.
comm : MPI.Comm
Communicator to use. Default is ``MPI.COMM_WORLD``.
"""
# Check whether two input tensors are compatible.
if len(x.shape) != len(y.shape) != 2:
raise ValueError("Input tensors must be two-dimensional.")
if x.shape[1] != y.shape[1]:
raise ValueError(f"Input tensors must have the same number of features but {x.shape[1]} != {y.shape[1]}.")
size, rank = comm.size, comm.rank # Set up communication.
if size == 1: # Use torch functionality in non-parallel case.
return torch.cdist(x, y)
else: # Parallel case
# --- Setup and Matrix Initialization ---
mp, f = x.shape # Get number of samples in local chunk of x and number of features.
np = y.shape[0] # Get number of samples in local chunk of y.
# Each process initializes a local matrix, `local_distances`, of shape `(mp, n)`, where `mp` is the local chunk
# size of `x`, and `n` is the total number of samples in `y`. Each rank thus calculates the distance matrix
# chunk of size `mp x n`, i.e., rank 0 has distances from its own local `x` to all other `y`'s.
# Determine overall number of samples in y.
n = comm.allreduce(np, op=MPI.SUM)
print(f"Overall number of samples is {n}.")
# Initialize rank-local chunk of mp x n distance matrix with zeros.
local_distances = torch.zeros((mp, n))
# --- Managing Chunks and Displacements ---
# Determine where to put each result in the rank-local distance matrix chunk.
# Determine number of samples (rows) in each rank-local y.
y_counts = torch.tensor(comm.allgather(torch.numel(y) // f), dtype=torch.int)
# Calculate corresponding displacements from counts to record the starting index of each chunk in y. Thus, each
# process can identify where in the result matrix it should write the distances.
y_displ = (0,) + tuple(torch.cumsum(y_counts, dim=0, dtype=torch.int)[:-1])
# Extract actual result columns in distance matrix chunk for each rank.
cols = (y_displ[rank], y_displ[rank + 1] if (rank + 1) != size else n)
# --- Ring Communication Pattern ---
# Calculate distances in a "ring" pattern. Each process calculates distances for its local x chunk against its
# local y chunk (diagonal calculation). Then, through `size - 1` iterations, each process sends its y chunk to
# the next process in the "ring" while receiving a new y chunk from the previous process. This continues until
# each process has calculated distances between its x chunk and all chunks of y across all processes.
stationary = y
# 0th iteration: Calculate diagonal of global distance matrix.
# Each process calculates distances for its local x chunk against its local y chunk.
print(f"Rank [{rank}/{size}]: Calculate diagonal blocks...")
local_distances[:, cols[0]: cols[1]] = torch.cdist(x, stationary)
print(f"Rank [{rank}/{size}]: Start tile-wise ring communication...")
# Remaining `size-1` iterations: Send rank-local part of y to next process in circular fashion.
for iter_idx in range(1, size):
receiver = (rank + iter_idx) % size # Determine receiving process.
sender = (rank - iter_idx) % size # Determine sending process.
# Determine columns of rank-local distance matrix chunk to write result to.
col1 = y_displ[sender]
col2 = y_displ[sender + 1] if sender != size - 1 else n
columns = (col1, col2)
# All but first `iter_idx` processes are first receiving, then sending.
if (rank // iter_idx) != 0:
stat = MPI.Status()
# Probe for incoming message containing the next chunk of y to consider.
comm.Probe(source=sender, tag=iter_idx, status=stat)
# Determine number of samples to receive (= overall number of floats to receive / number of features).
count = int(stat.Get_count(MPI.FLOAT) / f)
# Initialize tensor for incoming chunk of y with zeros.
moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx)
# Send rank-local chunk of y to next process.
comm.Send(stationary, dest=receiver, tag=iter_idx)
# First `iter_idx` processes can now receive after sending.
if (rank // iter_idx) == 0:
stat = MPI.Status()
comm.Probe(source=sender, tag=iter_idx, status=stat)
count = int(stat.Get_count(MPI.FLOAT) / f)
moving = torch.zeros((count, f))
comm.Recv(moving, source=sender, tag=iter_idx)
# Calculate distances between stationary chunk of x and currently considered, moving chunk of y.
# Write result at correct position in distance matrix.
local_distances[:, columns[0]: columns[1]] = torch.cdist(x, moving)
print(f"Rank [{rank}/{size}]: [DONE]")
return local_distances
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/SUSY_50k.h5"
dataset = "data"
if rank == 0:
print(
"######################\n"
"# Pairwise distances #\n"
"######################\n"
f"COMM_WORLD size is {size}.\n"
f"Loading data... {data_path}[{dataset}]"
)
# Parallel data loader for SUSY data.
with h5py.File(data_path, "r") as handle:
chunk = int(handle[dataset].shape[0]/size)
if rank == size - 1:
data = torch.FloatTensor(handle[dataset][rank*chunk:])
else:
data = torch.FloatTensor(handle[dataset][rank*chunk:(rank+1)*chunk])
print(f"\t[OK]\nRank [{rank}/{size}]: Local data chunk has shape {list(data.shape)}...")
if rank == 0:
print("Start distance calculations...")
# Calculate distances of all SUSY samples w.r.t. each other and measure runtime.
start = time.perf_counter()
distances = dist(data, data, comm)
local_runtime = time.perf_counter() - start
# Calculate process-averaged runtime.
average_runtime = comm.allreduce(local_runtime, op=MPI.SUM) / size
print(f"Rank [{rank}/{size}]: Local distance matrix has shape {list(distances.shape)}.")
if rank == 0:
print(f"Process-averaged run time:\t{average_runtime} s")
```
%% Cell type:markdown id:6c7900f7 tags:
### Aufgabe 2
Untenstehend finden Sie eine parallele Implementierung des PSRS-Algorithmus. Mithilfe dieses Codes sollen unterschiedliche Sequenzen von ganzen Zahlen sortiert werden. Diese finden Sie in der HDF5-Datei `/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5` auf dem bwUniCluster. Die Datei enthält fünf verschiedene Ganzzahl-Sequenzen als Datensätze `['duplicates_10', 'duplicates_5', 'many_duplicates', 'no_duplicates', 'triplicates']`, die jeweils einen unterschiedlichen Anteil an Duplikaten bzw. Triplikaten enthalten. Alle Sequenzen bestehen aus $10^9$ Elementen.
Führen Sie den Code mithilfe des untenstehenden Submit-Skripts für alle fünf Datensätze auf vier CPU-basierten Knoten in der Partition "multiple" des bwUniClusters aus. Die Datensätze können über ein Command-Line-Argument des Python-Skripts spezifiziert werden, z.B. `mpirun python psrs.py --dataset no_duplicates`. Messen und vergleichen Sie die Laufzeiten. Was fällt Ihnen auf?
%% Cell type:code id:691eeb17 tags:
``` python
#!/bin/bash
#SBATCH --job-name=psrs # Job name
#SBATCH --partition=dev_multiple # Queue for the resource allocation.
#SBATCH --time=5:00 # Wall-clock time limit
#SBATCH --mem=90000 # Memory per node
#SBATCH --nodes=4 # Number of nodes to be used
#SBATCH --cpus-per-task=40 # Number of CPUs per MPI task
#SBATCH --ntasks-per-node=1 # Number of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export IBV_FORK_SAFE=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=<path/to/your/venv/folder> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun --mca mpi_warn_on_fork 0 python ${PYDIR}/psrs.py --dataset no_duplicates # Specify dataset via command-line argument.
```
%% Cell type:code id:529e07b3 tags:
``` python
"""Parallel Sorting by Regular Sampling"""
import argparse
import time
import h5py
import numpy
import torch
from mpi4py import MPI
__mpi_type_mappings = {
torch.bool: MPI.BOOL,
torch.uint8: MPI.UNSIGNED_CHAR,
torch.int8: MPI.SIGNED_CHAR,
torch.int16: MPI.SHORT,
torch.int32: MPI.INT,
torch.int64: MPI.LONG,
torch.bfloat16: MPI.INT16_T,
torch.float16: MPI.INT16_T,
torch.float32: MPI.FLOAT,
torch.float64: MPI.DOUBLE,
torch.complex64: MPI.COMPLEX,
torch.complex128: MPI.DOUBLE_COMPLEX,
}
def sort(a: torch.Tensor, comm: MPI.Comm = MPI.COMM_WORLD) -> torch.Tensor:
"""
Sort a's elements along given dimension in ascending order by their value.
The sorting is not stable which means that equal elements in the result may have different ordering than in
original array.
Parameters
----------
a : torch.Tensor
The 1D input array to be sorted.
comm : MPI.Comm
The communicator to use.
Returns
-------
torch.Tensor
Sorted local results.
"""
size, rank = comm.size, comm.rank
if size == 1:
local_sorted, _ = torch.sort(a)
return local_sorted
else:
###########
# PHASE 1 #
###########
# p is comm size, n is overall number of samples.
# Each rank sorts its local chunk and chooses p regular samples as representatives.
if rank == 0:
print(
"###########\n"
"# PHASE 1 #\n"
"###########"
)
local_sorted, local_indices = torch.sort(a)
print(f"Rank {rank}/{size}: Local sorting done...[OK]")
n_local = torch.tensor(
torch.numel(local_sorted), dtype=torch.int
) # Number of elements in local chunk.
n_local = torch.numel(local_sorted) # Number of elements in local chunk.
print(f"Rank {rank}/{size}: Number of elements in local chunk is {n_local}.")
counts = torch.zeros(
size, dtype=torch.int
) # Initialize array for local element numbers.
comm.Allgather([n_local, MPI.INT], [counts, MPI.INT])
n_global = comm.allreduce(n_local, op=MPI.SUM)
# Each rank chooses p regular samples.
# For this, separate sorted tensor into p+1 equal-length partitions.
# Regular samples have indices 1, w+1, 2w+1,...,(p−1)w+1
# where w=n/p^2 (here: `size` = p, `n_local` = overall number of samples/p).
partitions = [int(x * n_local / size) for x in range(0, size)]
reg_samples_local = local_sorted[partitions]
assert len(partitions) == size
print(
f"Rank {rank}/{size}: There are {len(partitions)} local regular samples: {reg_samples_local}"
# Regular samples have indices 0, w, 2w,...,(p−1)w where w=n/p^2.
# Here: `size` = p
w = int(n_global / size ** 2)
partitions = [idx * w for idx in range(0, size)]
regular_samples_local = local_sorted[partitions]
print(
f"Rank {rank}/{size}: There are {len(partitions)} local regular samples: {regular_samples_local}"
)
# Root gathers regular samples.
num_regs_global = int(
comm.allreduce(torch.numel(reg_samples_local), op=MPI.SUM)
comm.allreduce(torch.numel(regular_samples_local), op=MPI.SUM)
) # Get overall number of regular samples.
if rank == 0:
print(f"Overall number of regular samples is {num_regs_global}.")
reg_samples_global = torch.zeros(num_regs_global, dtype=a.dtype)
comm.Gather(reg_samples_local, reg_samples_global, root=0)
comm.Gather(regular_samples_local, reg_samples_global, root=0)
if rank == 0:
print("On root: Regular samples gathered...[OK]")
###########
# PHASE 2 #
###########
# Root sorts gathered regular samples, chooses pivots, and shares them with other processes.
if rank == 0:
print(
"###########\n"
"# PHASE 2 #\n"
"###########"
)
global_pivots = torch.zeros((size - 1,), dtype=local_sorted.dtype)
if rank == 0:
sorted_regs_global, _ = torch.sort(reg_samples_global)
print(f"On root: Regular samples are {sorted_regs_global}.")
# Choose p-1 pivot indices (p = MPI size).
global_partitions = [
int(x * num_regs_global / size) for x in range(1, size)
]
global_pivots = sorted_regs_global[global_partitions]
if len(global_partitions) == size - 1:
print(
f"On root: There are {len(global_partitions)} global pivots: {global_pivots}"
)
comm.Bcast(
global_pivots, root=0
) # Broadcast copy of pivots to all processes from root.
if rank == 0:
print("Pivots broadcast to all processes...")
###########
# PHASE 3 #
###########
if rank == 0:
print(
"###########\n"
"# PHASE 3 #\n"
"###########\n"
"Each processor forms p disjunct partitions of locally sorted elements using pivots as splits."
)
# Each processor forms p disjunct partitions of locally sorted elements using p-1 pivots as splits.
lt_partitions = torch.empty((size, local_sorted.shape[0]), dtype=torch.int64)
last = torch.zeros_like(local_sorted, dtype=torch.int64)
# Iterate over all pivots and store index of first pivot greater than element's value
if rank == 0:
print("Iterate over pivots to find index of first pivot > element's value.")
for idx, p in enumerate(global_pivots):
# torch.lt(input, other, *, out=None) computes `input < other` element-wise.
# Returns boolean tensor that is True where input is less than other and False elsewhere.
lt = torch.lt(local_sorted, p).int()
if idx > 0:
lt_partitions[idx] = lt - last
else:
lt_partitions[idx] = lt
last = lt
lt_partitions[size - 1] = torch.ones_like(local_sorted, dtype=last.dtype) - last
# lt_partitions contains p elements, first encodes which elements in local_sorted are smaller than 1st
# (= smallest) pivot, second encodes which elements are larger than 1st and smaller than 2nd pivot, ..., last
# elements encodes which elements are larger than last ( = largest) pivot. Now set up matrix holding info how
# many values will be sent for each partition. Processor i keeps ith partitions and sends jth partition to
# processor j.
local_partitions = torch.sum(
lt_partitions, dim=1
) # How many values will be sent where (local)?
print(
f"Rank {rank}/{size}: Local # elements to be sent to other ranks (keep own section): {local_partitions}"
)
partition_matrix = torch.zeros_like(
local_partitions
) # How many values will be sent where (global)?
comm.Allreduce(local_partitions, partition_matrix, op=MPI.SUM)
if rank == 0:
print(
f"Global # of elements on all ranks (partition matrix): {partition_matrix}"
)
# Matrix holding info which value will be shipped where.
index_matrix = torch.empty_like(local_sorted, dtype=torch.int64)
# Loop over `lt_partitions` (binary encoding of which elements is in which partition formed by pivots).
for i, x in enumerate(lt_partitions):
index_matrix[x > 0] = i
# Elements in 0th partition (< first pivot) get 0, i.e., will be collected at rank 0, elements in 1st
# partition (> than first + < than second pivot) get 1, i.e., will be collected at rank 1,...
print(f"Rank {rank}/{size}: Ship element to rank: {index_matrix}")
send_counts_local = numpy.zeros(size, dtype=int)
for s in numpy.arange(size):
send_counts_local[s] = int((index_matrix == s).sum(dim=0))
send_displ = numpy.zeros(size, dtype=int)
send_displ[1:] = numpy.cumsum(send_counts_local, axis=0)[:-1]
send_counts_global = numpy.zeros((size, size), dtype=int)
comm.Allgather([send_counts_local, MPI.INT], [send_counts_global, MPI.INT])
recv_counts_global = numpy.transpose(send_counts_global)
recv_counts_local = recv_counts_global[rank]
recv_displ = numpy.zeros(size, dtype=int)
recv_displ[1:] = numpy.cumsum(recv_counts_local, axis=0)[:-1]
# Counts + displacements for Alltoallv are rank-specific!
# send_counts_local on rank i: Integer array, entry j specifies number of values to be sent to rank j.
# recv_counts_local on rank i: Integer array, entry j specifies number of values to be received from rank j.
val_buf = torch.zeros((partition_matrix[rank],), dtype=local_sorted.dtype)
send_buf = [
MPI.memory.fromaddress(local_sorted.data_ptr(), 0),
(send_counts_local.tolist(), send_displ.tolist()),
__mpi_type_mappings[local_sorted.dtype],
]
recv_buf = [
MPI.memory.fromaddress(val_buf.data_ptr(), 0),
(recv_counts_local.tolist(), recv_displ.tolist()),
__mpi_type_mappings[val_buf.dtype],
]
comm.Alltoallv(send_buf, recv_buf)
result, _ = torch.sort(val_buf)
return result
if __name__ == "__main__":
data_path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/psrs_data.h5"
parser = argparse.ArgumentParser(prog="Parallel Sorting by Regular Samples")
parser.add_argument(
"--dataset",
type=str,
default="duplicates_1",
help="The dataset to be sorted.",
)
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank, size = comm.rank, comm.size
with h5py.File(data_path, "r") as f:
chunk = int(f[args.dataset].shape[0] / size)
if rank == size - 1:
data = torch.tensor(f[args.dataset][rank * chunk:])
else:
data = torch.tensor(f[args.dataset][rank * chunk:(rank + 1) * chunk])
if rank == 0:
print(
"########\n"
"# PSRS #\n"
"########"
)
print(f"Local data on rank {rank} = {data}")
if rank == 0:
print("Start sorting...")
start = time.perf_counter()
result = sort(data)
elapsed_local = time.perf_counter() - start
elapsed_global = comm.allreduce(elapsed_local, op=MPI.SUM)
elapsed_global /= size
if rank == 0:
print(f"Sorting done...\nRank-averaged run time: {elapsed_global} s")
print(f"Sorted chunk on rank {rank}/{size}: {result}")
```
......