Skip to content
Snippets Groups Projects
Commit fa54ef86 authored by Marie Weiel's avatar Marie Weiel :zap:
Browse files

upload solutions

parent 6e64e105
No related branches found
No related tags found
No related merge requests found
Showing
with 1532 additions and 0 deletions
"""
Serial implementation of k-means clustering in PyTorch
"""
import time
import h5py
import torch
class KMeans:
"""
Serial k-means clustering in PyTorch.
Attributes
----------
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to perform.
tol : float
The tolerance for the convergence criterion.
_centroids : Union[None, torch.Tensor]
The current centroids.
_matching_centroids : Union[None, torch.Tensor]
Assigned centroids for all samples in dataset.
_inertia : float
The inertia (quantity to be checked for convergence).
Methods
-------
_initialize_centroids(x)
Randomly initialize centroids.
_fit_to_cluster(x)
Get the closest centroid for each sample in dataset.
fit(x)
Perform k-means clustering.
"""
def __init__(
self, n_clusters: int = 8, max_iter: int = 300, tol: float = -1.0
) -> None:
"""
Configure k-means clustering algorithm.
Parameters
----------
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self._centroids = None
self._matching_centroids = None
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
# Shuffle data and choose first `n_clusters` samples as initial centroids.
self._centroids = x[torch.randperm(x.shape[0])[: self.n_clusters]]
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._centroids
) # Calculate Euclidean distance of each data sample to each current centroid.
return distances.argmin(
dim=1, keepdim=True
) # Return index of the closest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._centroids.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine index of the closest centroid for each sample in dataset.
print(f"Iteration {idx}...")
self._matching_centroids = self._fit_to_cluster(
x
) # Array of length `n_samples` providing index of closest centroid for each sample in dataset.
# Update centroids.
for i in range(self.n_clusters): # Loop over clusters.
# Determine all points in current cluster.
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Array of length `n_samples` with binary encoding of whether each sample belongs to cluster i or not.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Compute vectorial sum of all points in current cluster.
points_in_cluster = selection_mask.sum(axis=0, keepdim=True).clamp(
1, torch.iinfo(torch.int64).max
) # Compute number of points in current cluster.
new_cluster_centers[i : i + 1, :] = (
assigned_points / points_in_cluster
) # Compute new centroids.
# Check whether centroid movement has converged.
self._inertia = (
(self._centroids - new_cluster_centers) ** 2
).sum() # Update inertia.
self._centroids = new_cluster_centers.clone()
if (
self.tol is not None and self._inertia <= self.tol
): # Check whether inertia is smaller than tolerance.
break
return self
if __name__ == "__main__":
print(
"##############################\n"
"# PyTorch k-Means Clustering #\n"
"##############################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu"
) # Set device as available.
if torch.cuda.is_available():
print("Running on GPU...")
else:
print("Running on CPU...")
print(f"Loading dataset from {data_path}[{dataset}]...")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
# Open file for reading. We use the Cityscapes dataset.
with h5py.File(data_path, "r") as handle:
print("Open h5 file...")
data = torch.tensor(
handle[dataset][:300], device=device
) # Default device is "cpu", set to "cuda" for GPU.
print("Torch tensor created.")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(n_clusters=num_clusters, max_iter=num_iterations)
print("Start fitting the data...")
start = time.perf_counter() # Start timer.
kmeans_clusterer.fit(data) # Perform k-means clustering.
print(
f"DONE.\nRun time: \t{time.perf_counter() - start} s"
) # Print measured runtime.
##############################
# PyTorch k-Means Clustering #
##############################
Running on CPU...
Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]...
Open h5 file...
Torch tensor created.
Start fitting the data...
Iteration 0...
Iteration 1...
Iteration 2...
Iteration 3...
Iteration 4...
Iteration 5...
Iteration 6...
Iteration 7...
Iteration 8...
Iteration 9...
Iteration 10...
Iteration 11...
Iteration 12...
Iteration 13...
Iteration 14...
Iteration 15...
Iteration 16...
Iteration 17...
Iteration 18...
Iteration 19...
DONE.
Run time: 892.7324264449999 s
============================= JOB FEEDBACK =============================
NodeName=uc2n265
Job ID: 24553849
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 1
Cores per node: 2
CPU Utilized: 00:12:41
CPU Efficiency: 40.31% of 00:31:28 core-walltime
Job Wall-clock time: 00:15:44
Memory Utilized: 20.56 GB
Memory Efficiency: 52.63% of 39.06 GB
##############################
# PyTorch k-Means Clustering #
##############################
Running on GPU...
Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]...
Open h5 file...
Torch tensor created.
Start fitting the data...
Iteration 0...
Iteration 1...
Iteration 2...
Iteration 3...
Iteration 4...
Iteration 5...
Iteration 6...
Iteration 7...
Iteration 8...
Iteration 9...
Iteration 10...
Iteration 11...
Iteration 12...
Iteration 13...
Iteration 14...
Iteration 15...
Iteration 16...
Iteration 17...
Iteration 18...
Iteration 19...
DONE.
Run time: 12.904788984917104 s
============================= JOB FEEDBACK =============================
NodeName=uc2n513
Job ID: 24553850
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 1
Cores per node: 10
CPU Utilized: 00:00:16
CPU Efficiency: 1.39% of 00:19:10 core-walltime
Job Wall-clock time: 00:01:55
Memory Utilized: 4.37 GB
Memory Efficiency: 4.76% of 91.80 GB
#!/bin/bash
#SBATCH --job-name=kmeans_cpu # job name
#SBATCH --partition=single # queue for resource allocation
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory
#SBATCH --nodes=1 # number of nodes to be used
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
python -u ${PYDIR}/kmeans.py # Run your Python script.
#!/bin/bash
#SBATCH --job-name=kmeans_gpu
#SBATCH --partition=gpu_4
#SBATCH --gres=gpu:1 # number of requested GPUs (GPU nodes shared between multiple jobs)
#SBATCH --time=10:00 # wall-clock time limit
#SBATCH --nodes=1
#SBATCH --mail-type=ALL
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
python -u ${PYDIR}/kmeans.py # Run your Python script.
"""
Sample-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import torch
from mpi4py import MPI
class KMeans:
"""
Sample-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Sample parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be
all-gathered by all ranks.
2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the
centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to
be all-reduced to all ranks.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure sample-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
For sample parallelism, each rank holds an exclusive fraction of samples from the global dataset.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`.
n_clusters_local = (
self.n_clusters // self.comm.size
) # Determine number of local cluster centers per rank.
# Choose first `n_cluster_local` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x.shape[0]` gives number of samples in data.
x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]]
print(
f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = "
f"{list(x_local.shape)}."
)
# --- Communication required for sample-parallel: All-gather local centroids to all ranks ---
self._cluster_centers = torch.empty(
(self.n_clusters, x.shape[1]), dtype=torch.float
) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids.
self.comm.Allgather(
[x_local, MPI.FLOAT], [self._cluster_centers, MPI.FLOAT]
) # All-gather local centroids to all ranks.
if self.comm.rank == 0:
print(
f"Global cluster centers have shape [#samples, #features] = {list(self._cluster_centers.shape)}.",
)
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._cluster_centers
) # Calculate Euclidean distances between samples and centroids.
return distances.argmin(
dim=1, keepdim=True
) # Determine index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(
f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...",
)
self._matching_centroids = self._fit_to_cluster(
x
) # Determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Local directed sum of samples in cluster i.
# Local number of samples in cluster i.
points_in_cluster = (
selection_mask.sum(axis=0, keepdim=True)
.clamp(0.0, torch.iinfo(torch.int64).max)
.type(torch.int64)
) # Clamp to 0 before communication
# --- Communication required for sample-parallel: Allreduce local results ---
assigned_points_global = torch.empty(
assigned_points.shape, dtype=torch.float
) # Initialize variable to store global directed sum of points assigned to cluster i.
points_in_cluster_global = torch.empty(
points_in_cluster.shape, dtype=torch.int64
) # Initialize variable to store global number of points assigned to cluster i.
self.comm.Allreduce(
assigned_points, assigned_points_global, op=MPI.SUM
) # All-reduce local directed sums of points assigned to cluster i.
self.comm.Allreduce(
points_in_cluster, points_in_cluster_global, op=MPI.SUM
) # All-reduce local numbers of points assigned to cluster i.
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points_global / points_in_cluster_global.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"##############################################\n"
"# Sample-parallel PyTorch k-means clustering #\n"
"##############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[0] / size
) # Calculate local number of samples in each chunk.
# Load data chunks from file.
if rank == size - 1:
data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float)
else:
data = torch.tensor(
handle[dataset][rank * chunk : (rank + 1) * chunk], dtype=torch.float
)
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter()-start} s"
) # Print measured runtime.
"""
Sample-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import numpy as np
import torch
from mpi4py import MPI
class KMeans:
"""
Sample-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Sample parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be
all-gathered by all ranks.
2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the
centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to
be all-reduced to all ranks.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure sample-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x):
"""
Randomly initialize the centroids.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`.
# Determine number of local cluster centers per rank.
n_clusters_base = (
self.n_clusters // self.comm.size
) # Determine base number of local cluster centers per rank.
n_clusters_remain = self.n_clusters % self.comm.size # Determine remainder.
n_clusters_local = n_clusters_base
if self.comm.rank < n_clusters_remain: # Distribute remainder over first ranks.
n_clusters_local += 1
# Choose first `n_cluster_local` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x` = data, `x.shape[0]` gives number of samples in data.
x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]]
print(
f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = "
f"{list(x_local.shape)}."
)
# --- Communication required for sample-parallel: All-gather local centroids to all ranks ---
cluster_centers = torch.empty(
(self.n_clusters, x.shape[1]), dtype=torch.float
) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids.
# For vector variants: recvbuf = [data, counts, displacements, type]
# counts and displacements are integer tuples with as many elements as tasks in communicator.
# counts[i] designates the size of i-th segment,
# displacements[i] the number of elements in data used as first element of i-th section.
# Set up counts array (of length comm.size) containing the number of elements to be received from each rank.
counts = n_clusters_base * torch.ones(self.comm.size)
counts[:n_clusters_remain] += 1
counts = torch.mul(
counts, x.shape[1]
)
if rank == 0:
print(f"Recvbuf counts = {counts}")
# Set up displs array (of length comm.size); entry i specifies the displacement (relative to recvbuf)
# at which to place the incoming data from process i.
displ = torch.zeros(self.comm.size)
displ[1:] = torch.cumsum(counts, dim=0)[
:-1
]
if rank == 0:
print(f"Recvbuf displacements = {displ}")
sendbuf = [x_local, MPI.FLOAT] # Set up send buffer.
recvbuf = [
cluster_centers,
np.asarray(counts, dtype=int),
np.asarray(displ, dtype=int),
MPI.FLOAT,
] # Set up receive buffer.
self.comm.Allgatherv(
sendbuf, recvbuf
) # All-gather local centroids to all ranks.
if rank == 0:
print(
f"Global cluster centers have shape [#samples, #features] = {list(cluster_centers.shape)}."
)
self._cluster_centers = cluster_centers # Set centroids as class attribute.
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
distances = torch.cdist(
x, self._cluster_centers
) # Calculate Euclidean distances between samples and centroids.
return distances.argmin(
dim=1, keepdim=True
) # Determine index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(
f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...",
)
self._matching_centroids = self._fit_to_cluster(
x
) # Determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Local directed sum of samples in cluster i.
# Local number of samples in cluster i.
points_in_cluster = (
selection_mask.sum(axis=0, keepdim=True)
.clamp(0.0, torch.iinfo(torch.int64).max)
.type(torch.int64)
) # Clamp to 0 before communication
# ------------ Communication required for sample-parallel: Allreduce local results ------------
assigned_points_global = torch.empty(
assigned_points.shape, dtype=torch.float
) # Initialize variable to store global directed sum of points assigned to cluster i.
points_in_cluster_global = torch.empty(
points_in_cluster.shape, dtype=torch.int64
) # Initialize variable to store global number of points assigned to cluster i.
self.comm.Allreduce(
assigned_points, assigned_points_global, op=MPI.SUM
) # All-reduce local directed sums of points assigned to cluster i.
self.comm.Allreduce(
points_in_cluster, points_in_cluster_global, op=MPI.SUM
) # All-reduce local numbers of points assigned to cluster i.
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points_global / points_in_cluster_global.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"##############################################\n"
"# Sample-parallel PyTorch k-means clustering #\n"
"##############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[0] / size
) # Calculate local number of samples in each chunk.
# Load data chunks from file (distributed along sample axis).
if rank == size - 1:
data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float)
else:
data = torch.tensor(
handle[dataset][rank * chunk : (rank + 1) * chunk],
dtype=torch.float,
)
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 9
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter() - start} s"
) # Print measured runtime.
##############################################
# PyTorch sample-parallel k-means clustering #
##############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Local cluster centers have shape [#samples, #features] = [3, 6291456].
Recvbuf counts = tensor([18874368., 12582912., 12582912., 12582912.])
Recvbuf displacements = tensor([ 0., 18874368., 31457280., 44040192.])
Rank 3/4: [OK]
Rank 2/4: [OK]
Rank 1/4: [OK]
Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Global cluster centers have shape [#samples, #features] = [9, 6291456].
Rank 2/4: Iteration 0...
Rank 1/4: Iteration 0...
Rank 0/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 0/4: Iteration 1...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 1/4: Iteration 2...
Rank 2/4: Iteration 2...
Rank 0/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 0/4: Iteration 3...
Rank 2/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 1/4: Iteration 4...
Rank 0/4: Iteration 4...
Rank 2/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 2/4: Iteration 5...
Rank 0/4: Iteration 5...
Rank 1/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 1/4: Iteration 6...
Rank 0/4: Iteration 6...
Rank 2/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 2/4: Iteration 7...
Rank 0/4: Iteration 7...
Rank 1/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 1/4: Iteration 8...
Rank 0/4: Iteration 8...
Rank 2/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 0/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 1/4: Iteration 10...
Rank 2/4: Iteration 10...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 2/4: Iteration 11...
Rank 1/4: Iteration 11...
Rank 0/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 1/4: Iteration 12...
Rank 0/4: Iteration 12...
Rank 2/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 0/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 1/4: Iteration 14...
Rank 0/4: Iteration 14...
Rank 2/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 2/4: Iteration 15...
Rank 0/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 0/4: Iteration 16...
Rank 1/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 1/4: Iteration 18...
Rank 0/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 2/4: Iteration 19...
Rank 1/4: Iteration 19...
Rank 0/4: Iteration 19...
Rank 3/4: Iteration 19...
DONE.
Run time: 205.8100015646778 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553925
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:14:02
CPU Efficiency: 1.07% of 21:52:00 core-walltime
Job Wall-clock time: 00:04:06
Memory Utilized: 4.62 GB
Memory Efficiency: 2.96% of 156.25 GB
##############################################
# PyTorch sample-parallel k-means clustering #
##############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 2/4: [OK]
Rank 1/4: [OK]
Rank 3/4: [OK]
Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Local cluster centers have shape [#samples, #features] = [2, 6291456].
Global cluster centers have shape [#samples, #features] = [8, 6291456].
Rank 1/4: Iteration 0...
Rank 2/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 0/4: Iteration 0...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 0/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 2/4: Iteration 2...
Rank 1/4: Iteration 2...
Rank 0/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 2/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 0/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 2/4: Iteration 4...
Rank 0/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 1/4: Iteration 4...
Rank 1/4: Iteration 5...
Rank 2/4: Iteration 5...
Rank 0/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 2/4: Iteration 6...
Rank 0/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 1/4: Iteration 6...
Rank 1/4: Iteration 7...
Rank 2/4: Iteration 7...
Rank 0/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 2/4: Iteration 8...
Rank 1/4: Iteration 8...
Rank 0/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 0/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 2/4: Iteration 10...
Rank 1/4: Iteration 10...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 1/4: Iteration 11...
Rank 2/4: Iteration 11...
Rank 0/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 2/4: Iteration 12...
Rank 0/4: Iteration 12...
Rank 1/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 2/4: Iteration 13...
Rank 0/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 14...
Rank 1/4: Iteration 14...
Rank 0/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 0/4: Iteration 15...
Rank 2/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 1/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 0/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 1/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 0/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 1/4: Iteration 19...
Rank 0/4: Iteration 19...
Rank 2/4: Iteration 19...
Rank 3/4: Iteration 19...
DONE.
Run time: 187.21997308498248 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553855
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:12:47
CPU Efficiency: 1.05% of 20:16:00 core-walltime
Job Wall-clock time: 00:03:48
Memory Utilized: 15.63 GB (estimated maximum)
Memory Efficiency: 10.00% of 156.25 GB (39.06 GB/node)
#!/bin/bash
#SBATCH --job-name=kmeans_sample # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_sample_parallel.py # Run your Python script.
#!/bin/bash
#SBATCH --job-name=kmeans_sample_v # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_sample_parallel_allgatherv.py # Run your Python script.
"""
Feature-parallel implementation of k-means clustering in PyTorch using MPI
"""
import time
import h5py
import torch
from mpi4py import MPI
class KMeans:
"""
Feature-parallel k-means clustering in PyTorch with MPI.
Each rank holds an exclusive chunk of the complete dataset (distributed along the feature axis, i.e., each rank
holds all samples but only an exclusive fraction of their features). and determines
locally which local sample belongs to which cluster before updating the centroids globally based on these local
assignments and continuing with the next iteration.
Feature parallelism for k-means requires communication at two points in the code, i.e.:
1. Centroid initialization: Either, one rank determines the initial centroids and broadcasts them to all other ranks
or all ranks randomly choose the initial centroids using the same seed.
2. Cluster assignment: To determine the nearest centroid for each sample, we need to calculate its distance to each
cluster center considering all features. To achieve this, we compute the contribution of each local fraction of
features to the squared distance before summing these local squared distances up to obtain a global value.
3. Convergence check: Again, the local contributions to the inertia need to be summed up to obtain a global value.
Attributes
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters.
max_iter : int
The maximum number of iterations.
tol : float
The convergence criterion.
_inertia : float
The inertia (quantity to be checked for convergence).
_cluster_centers : Union[None, torch.Tensor]
The centroids.
_matching_centroids : torch.Tensor
Indices of nearest centroid for each sample.
"""
def __init__(
self,
comm: MPI.Comm = MPI.COMM_WORLD,
n_clusters: int = 8,
max_iter: int = 300,
tol: float = -1.0,
) -> None:
"""
Configure feature-parallel k-means clustering algorithm.
Parameters
----------
comm : MPI.Comm
The MPI communicator.
n_clusters : int
The number of clusters, i.e., k.
max_iter : int
The maximum number of iterations to be performed.
tol : float
The tolerance for the convergence criterion.
"""
self.comm = comm # MPI communicator
self.n_clusters = n_clusters # Number of clusters
self.max_iter = max_iter # Maximum number of iterations
self.tol = tol # Tolerance for convergence criterion
self._inertia = float("nan")
self._cluster_centers = None
self._matching_centroids = None
def _initialize_centroids(self, x: torch.Tensor) -> None:
"""
Randomly initialize the centroids.
For feature parallelism, all samples of the dataset are partially present on each rank.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
"""
print(
f"Rank {self.comm.rank}/{self.comm.size}: Shape of data [#samples, #features] = {list(x.shape)}"
)
# As all samples are partially present on each rank, only shuffle on root to determine centroids.
# Alternatively, all ranks can shuffle using the same seed.
if rank == 0:
indices = torch.randperm(x.shape[0])[
: self.n_clusters
] # Choose first `n_cluster` randomly shuffled indices as centroid indices.
# `torch.randperm(n)` returns random permutation of integers from 0 to n - 1.
# `x.shape[0]` gives number of samples in data.
else:
indices = torch.empty(self.n_clusters, dtype=torch.int64)
self.comm.Bcast(indices, root=0)
self._cluster_centers = x[indices] # Set centroids as class attribute.
def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor:
"""
Determine the closest centroids for each sample in dataset as measured by their Euclidean distance.
Parameters
----------
x : torch.Tensor
The dataset to be clustered.
Returns
-------
torch.Tensor
Indices of matching centroids for each sample in dataset.
"""
squared_distances_local = (
torch.cdist(x, self._cluster_centers) ** 2
) # Calculate squared distance between samples and centroids for locally present fraction of features.
squared_distances_global = torch.empty(
squared_distances_local.shape, dtype=torch.float
) # Initialize variable for global squared distance between samples and centroids for all features.
self.comm.Allreduce(
squared_distances_local, squared_distances_global, op=MPI.SUM
) # Sum up squared distances over features.
return squared_distances_global.argmin(
dim=1, keepdim=True
) # Return index of nearest centroid for each sample.
def fit(self, x: torch.Tensor) -> "KMeans":
"""
Perform k-means clustering of given dataset.
Parameters
----------
x : torch.Tensor
The dataset to cluster.
Returns
-------
KMeans
The fitted KMeans object containing final centroids.
"""
self._initialize_centroids(x) # Initialize centroids.
new_cluster_centers = self._cluster_centers.clone()
# Iteratively fit points to centroids.
for idx in range(self.max_iter):
# Determine nearest centroids.
print(f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...")
self._matching_centroids = self._fit_to_cluster(
x
) # Redundantly determine index of nearest centroid for each sample.
# Update centroids.
for i in range(self.n_clusters):
# Locally determine samples in currently considered cluster i (binary encoding).
selection_mask = (self._matching_centroids == i).type(torch.int64)
# Locally accumulate samples and total number of samples in cluster i.
assigned_points = (x * selection_mask).sum(
axis=0, keepdim=True
) # Sum up samples in cluster i.
points_in_cluster = selection_mask.sum(
axis=0, keepdim=True
).clamp( # Determine overall number of samples in cluster i.
1.0, torch.iinfo(torch.int64).max
)
# Compute new centroids.
new_cluster_centers[
i : i + 1, :
] = assigned_points / points_in_cluster.clamp(
1.0, torch.iinfo(torch.int64).max
)
# Check whether centroid movement has converged.
inertia_local = ((self._cluster_centers - new_cluster_centers) ** 2).sum()
inertia_global = torch.empty(inertia_local.shape)
self.comm.Allreduce(inertia_local, inertia_global, op=MPI.SUM)
self._inertia = inertia_global
self._cluster_centers = new_cluster_centers.clone()
if self.tol is not None and self._inertia <= self.tol:
break
return self
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank == 0:
print(
"###############################################\n"
"# Feature-parallel PyTorch k-means clustering #\n"
"###############################################"
)
data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5"
dataset = "cityscapes_data"
if rank == 0:
print(f"\nLoading data... {data_path}[{dataset}]\n")
# Data is available in HDF5 format.
# An HDF5 file is a container for two kinds of objects:
# - datasets: array-like collections of data
# - groups: folder-like containers holding datasets and other groups
# Most fundamental thing to remember when using h5py is:
# Groups work like dictionaries, and datasets work like NumPy arrays.
with h5py.File(data_path, "r") as handle:
chunk = int(
handle[dataset].shape[1] / size
) # Calculate local number of features in each chunk.
if (
rank == size - 1
): # Load data chunks from file (distributed along feature axis).
data = torch.tensor(handle[dataset][:, rank * chunk :])
else:
data = torch.tensor(handle[dataset][:, rank * chunk : (rank + 1) * chunk])
print(f"Rank {rank}/{size}: \t[OK]")
# k-means hyperparameters
num_clusters = 8
num_iterations = 20
kmeans_clusterer = KMeans(
comm=comm, n_clusters=num_clusters, max_iter=num_iterations
)
if rank == 0:
print("Start fitting the data...")
start = time.perf_counter() # Start runtime measurement.
kmeans_clusterer.fit(data) # Perform actual k-means clustering.
if rank == 0:
print(
f"DONE.\nRun time:\t{time.perf_counter() - start} s"
) # Print measured runtime.
###############################################
# Feature-parallel PyTorch k-means clustering #
###############################################
Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]
Rank 1/4: [OK]
Rank 1/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 3/4: [OK]
Rank 3/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 2/4: [OK]
Rank 2/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 0/4: [OK]
Start fitting the data...
Rank 0/4: Shape of data [#samples, #features] = [300, 1572864]
Rank 0/4: Iteration 0...
Rank 3/4: Iteration 0...
Rank 2/4: Iteration 0...
Rank 1/4: Iteration 0...
Rank 0/4: Iteration 1...
Rank 3/4: Iteration 1...
Rank 1/4: Iteration 1...
Rank 2/4: Iteration 1...
Rank 0/4: Iteration 2...
Rank 1/4: Iteration 2...
Rank 3/4: Iteration 2...
Rank 2/4: Iteration 2...
Rank 0/4: Iteration 3...
Rank 1/4: Iteration 3...
Rank 3/4: Iteration 3...
Rank 2/4: Iteration 3...
Rank 0/4: Iteration 4...
Rank 1/4: Iteration 4...
Rank 3/4: Iteration 4...
Rank 2/4: Iteration 4...
Rank 0/4: Iteration 5...
Rank 2/4: Iteration 5...
Rank 1/4: Iteration 5...
Rank 3/4: Iteration 5...
Rank 0/4: Iteration 6...
Rank 1/4: Iteration 6...
Rank 3/4: Iteration 6...
Rank 2/4: Iteration 6...
Rank 0/4: Iteration 7...
Rank 3/4: Iteration 7...
Rank 2/4: Iteration 7...
Rank 1/4: Iteration 7...
Rank 0/4: Iteration 8...
Rank 2/4: Iteration 8...
Rank 1/4: Iteration 8...
Rank 3/4: Iteration 8...
Rank 0/4: Iteration 9...
Rank 1/4: Iteration 9...
Rank 2/4: Iteration 9...
Rank 3/4: Iteration 9...
Rank 0/4: Iteration 10...
Rank 3/4: Iteration 10...
Rank 1/4: Iteration 10...
Rank 2/4: Iteration 10...
Rank 0/4: Iteration 11...
Rank 2/4: Iteration 11...
Rank 1/4: Iteration 11...
Rank 3/4: Iteration 11...
Rank 0/4: Iteration 12...
Rank 1/4: Iteration 12...
Rank 2/4: Iteration 12...
Rank 3/4: Iteration 12...
Rank 0/4: Iteration 13...
Rank 1/4: Iteration 13...
Rank 2/4: Iteration 13...
Rank 3/4: Iteration 13...
Rank 0/4: Iteration 14...
Rank 1/4: Iteration 14...
Rank 3/4: Iteration 14...
Rank 2/4: Iteration 14...
Rank 0/4: Iteration 15...
Rank 2/4: Iteration 15...
Rank 1/4: Iteration 15...
Rank 3/4: Iteration 15...
Rank 0/4: Iteration 16...
Rank 1/4: Iteration 16...
Rank 3/4: Iteration 16...
Rank 2/4: Iteration 16...
Rank 0/4: Iteration 17...
Rank 3/4: Iteration 17...
Rank 2/4: Iteration 17...
Rank 1/4: Iteration 17...
Rank 0/4: Iteration 18...
Rank 1/4: Iteration 18...
Rank 3/4: Iteration 18...
Rank 2/4: Iteration 18...
Rank 0/4: Iteration 19...
Rank 1/4: Iteration 19...
Rank 3/4: Iteration 19...
Rank 2/4: Iteration 19...
DONE.
Run time: 174.21738585596904 s
============================= JOB FEEDBACK =============================
NodeName=uc2n[001-004]
Job ID: 24553857
Cluster: uc2
User/Group: ku4408/scc
State: COMPLETED (exit code 0)
Nodes: 4
Cores per node: 80
CPU Utilized: 00:11:56
CPU Efficiency: 1.04% of 19:06:40 core-walltime
Job Wall-clock time: 00:03:35
Memory Utilized: 5.43 GB
Memory Efficiency: 3.47% of 156.25 GB
#!/bin/bash
#SBATCH --job-name=kmeans_feature # job name
#SBATCH --partition=dev_multiple # queue for resource allocation
#SBATCH --nodes=4 # number of nodes to be used
#SBATCH --time=30:00 # wall-clock time limit
#SBATCH --mem=40000 # memory per node
#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
export VENVDIR=~/scai-venv # Export path to your virtual environment.
export PYDIR=./ # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python -u ${PYDIR}/kmeans_feature_parallel.py # Run your Python script.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment