From fa54ef86af6645e67686a4ee165d126e5707efde Mon Sep 17 00:00:00 2001 From: Marie Weiel <marie.weiel@kit.edu> Date: Mon, 18 Nov 2024 11:01:10 +0100 Subject: [PATCH] upload solutions --- 1_kmeans/solutions/A1/kmeans.py | 192 ++++++++++++ 1_kmeans/solutions/A1/slurm-kmeans-cpu.out | 45 +++ 1_kmeans/solutions/A1/slurm-kmeans-gpu.out | 45 +++ 1_kmeans/solutions/A1/submit_kmeans_cpu.sh | 21 ++ 1_kmeans/solutions/A1/submit_kmeans_gpu.sh | 22 ++ .../solutions/A2/kmeans_sample_parallel.py | 258 ++++++++++++++++ .../A2/kmeans_sample_parallel_allgatherv.py | 291 ++++++++++++++++++ ...lurm-kmeans-sample-parallel-allgatherv.out | 115 +++++++ .../A2/slurm-kmeans-sample-parallel.out | 113 +++++++ .../A2/submit_kmeans_sample_parallel.sh | 25 ++ ...ubmit_kmeans_sample_parallel_allgatherv.sh | 25 ++ .../solutions/A3/kmeans_feature_parallel.py | 243 +++++++++++++++ .../A3/slurm-kmeans-feature-parallel.out | 112 +++++++ .../A3/submit_kmeans_feature_parallel.sh | 25 ++ 14 files changed, 1532 insertions(+) create mode 100755 1_kmeans/solutions/A1/kmeans.py create mode 100644 1_kmeans/solutions/A1/slurm-kmeans-cpu.out create mode 100644 1_kmeans/solutions/A1/slurm-kmeans-gpu.out create mode 100644 1_kmeans/solutions/A1/submit_kmeans_cpu.sh create mode 100644 1_kmeans/solutions/A1/submit_kmeans_gpu.sh create mode 100644 1_kmeans/solutions/A2/kmeans_sample_parallel.py create mode 100644 1_kmeans/solutions/A2/kmeans_sample_parallel_allgatherv.py create mode 100644 1_kmeans/solutions/A2/slurm-kmeans-sample-parallel-allgatherv.out create mode 100644 1_kmeans/solutions/A2/slurm-kmeans-sample-parallel.out create mode 100644 1_kmeans/solutions/A2/submit_kmeans_sample_parallel.sh create mode 100644 1_kmeans/solutions/A2/submit_kmeans_sample_parallel_allgatherv.sh create mode 100644 1_kmeans/solutions/A3/kmeans_feature_parallel.py create mode 100644 1_kmeans/solutions/A3/slurm-kmeans-feature-parallel.out create mode 100644 1_kmeans/solutions/A3/submit_kmeans_feature_parallel.sh diff --git a/1_kmeans/solutions/A1/kmeans.py b/1_kmeans/solutions/A1/kmeans.py new file mode 100755 index 0000000..5e279d1 --- /dev/null +++ b/1_kmeans/solutions/A1/kmeans.py @@ -0,0 +1,192 @@ +""" +Serial implementation of k-means clustering in PyTorch +""" +import time + +import h5py +import torch + + +class KMeans: + """ + Serial k-means clustering in PyTorch. + + Attributes + ---------- + n_clusters : int + The number of clusters, i.e., k. + max_iter : int + The maximum number of iterations to perform. + tol : float + The tolerance for the convergence criterion. + _centroids : Union[None, torch.Tensor] + The current centroids. + _matching_centroids : Union[None, torch.Tensor] + Assigned centroids for all samples in dataset. + _inertia : float + The inertia (quantity to be checked for convergence). + + Methods + ------- + _initialize_centroids(x) + Randomly initialize centroids. + _fit_to_cluster(x) + Get the closest centroid for each sample in dataset. + fit(x) + Perform k-means clustering. + """ + + def __init__( + self, n_clusters: int = 8, max_iter: int = 300, tol: float = -1.0 + ) -> None: + """ + Configure k-means clustering algorithm. + + Parameters + ---------- + n_clusters : int + The number of clusters, i.e., k. + max_iter : int + The maximum number of iterations to be performed. + tol : float + The tolerance for the convergence criterion. + """ + self.n_clusters = n_clusters # Number of clusters + self.max_iter = max_iter # Maximum number of iterations + self._centroids = None + self._matching_centroids = None + self.tol = tol # Tolerance for convergence criterion + self._inertia = float("nan") + + def _initialize_centroids(self, x: torch.Tensor) -> None: + """ + Randomly initialize the centroids. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + """ + # Shuffle data and choose first `n_clusters` samples as initial centroids. + self._centroids = x[torch.randperm(x.shape[0])[: self.n_clusters]] + + def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor: + """ + Determine the closest centroids for each sample in dataset as measured by their Euclidean distance. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + + Returns + ------- + torch.Tensor + Indices of matching centroids for each sample in dataset. + """ + distances = torch.cdist( + x, self._centroids + ) # Calculate Euclidean distance of each data sample to each current centroid. + return distances.argmin( + dim=1, keepdim=True + ) # Return index of the closest centroid for each sample. + + def fit(self, x: torch.Tensor) -> "KMeans": + """ + Perform k-means clustering of given dataset. + + Parameters + ---------- + x : torch.Tensor + The dataset to cluster. + + Returns + ------- + KMeans + The fitted KMeans object containing final centroids. + """ + self._initialize_centroids(x) # Initialize centroids. + new_cluster_centers = self._centroids.clone() + + # Iteratively fit points to centroids. + for idx in range(self.max_iter): + # Determine index of the closest centroid for each sample in dataset. + print(f"Iteration {idx}...") + self._matching_centroids = self._fit_to_cluster( + x + ) # Array of length `n_samples` providing index of closest centroid for each sample in dataset. + + # Update centroids. + for i in range(self.n_clusters): # Loop over clusters. + # Determine all points in current cluster. + selection_mask = (self._matching_centroids == i).type(torch.int64) + # Array of length `n_samples` with binary encoding of whether each sample belongs to cluster i or not. + + assigned_points = (x * selection_mask).sum( + axis=0, keepdim=True + ) # Compute vectorial sum of all points in current cluster. + points_in_cluster = selection_mask.sum(axis=0, keepdim=True).clamp( + 1, torch.iinfo(torch.int64).max + ) # Compute number of points in current cluster. + new_cluster_centers[i : i + 1, :] = ( + assigned_points / points_in_cluster + ) # Compute new centroids. + + # Check whether centroid movement has converged. + self._inertia = ( + (self._centroids - new_cluster_centers) ** 2 + ).sum() # Update inertia. + self._centroids = new_cluster_centers.clone() + if ( + self.tol is not None and self._inertia <= self.tol + ): # Check whether inertia is smaller than tolerance. + break + + return self + + +if __name__ == "__main__": + print( + "##############################\n" + "# PyTorch k-Means Clustering #\n" + "##############################" + ) + + data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5" + dataset = "cityscapes_data" + device = torch.device( + "cuda" if torch.cuda.is_available() else "cpu" + ) # Set device as available. + + if torch.cuda.is_available(): + print("Running on GPU...") + else: + print("Running on CPU...") + + print(f"Loading dataset from {data_path}[{dataset}]...") + # Data is available in HDF5 format. + # An HDF5 file is a container for two kinds of objects: + # - datasets: array-like collections of data + # - groups: folder-like containers holding datasets and other groups + # Most fundamental thing to remember when using h5py is: + # Groups work like dictionaries, and datasets work like NumPy arrays. + + # Open file for reading. We use the Cityscapes dataset. + with h5py.File(data_path, "r") as handle: + print("Open h5 file...") + data = torch.tensor( + handle[dataset][:300], device=device + ) # Default device is "cpu", set to "cuda" for GPU. + print("Torch tensor created.") + + # k-means hyperparameters + num_clusters = 8 + num_iterations = 20 + + kmeans_clusterer = KMeans(n_clusters=num_clusters, max_iter=num_iterations) + print("Start fitting the data...") + start = time.perf_counter() # Start timer. + kmeans_clusterer.fit(data) # Perform k-means clustering. + print( + f"DONE.\nRun time: \t{time.perf_counter() - start} s" + ) # Print measured runtime. diff --git a/1_kmeans/solutions/A1/slurm-kmeans-cpu.out b/1_kmeans/solutions/A1/slurm-kmeans-cpu.out new file mode 100644 index 0000000..26f6a86 --- /dev/null +++ b/1_kmeans/solutions/A1/slurm-kmeans-cpu.out @@ -0,0 +1,45 @@ +############################## +# PyTorch k-Means Clustering # +############################## +Running on CPU... +Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]... +Open h5 file... +Torch tensor created. +Start fitting the data... +Iteration 0... +Iteration 1... +Iteration 2... +Iteration 3... +Iteration 4... +Iteration 5... +Iteration 6... +Iteration 7... +Iteration 8... +Iteration 9... +Iteration 10... +Iteration 11... +Iteration 12... +Iteration 13... +Iteration 14... +Iteration 15... +Iteration 16... +Iteration 17... +Iteration 18... +Iteration 19... +DONE. +Run time: 892.7324264449999 s + +============================= JOB FEEDBACK ============================= + +NodeName=uc2n265 +Job ID: 24553849 +Cluster: uc2 +User/Group: ku4408/scc +State: COMPLETED (exit code 0) +Nodes: 1 +Cores per node: 2 +CPU Utilized: 00:12:41 +CPU Efficiency: 40.31% of 00:31:28 core-walltime +Job Wall-clock time: 00:15:44 +Memory Utilized: 20.56 GB +Memory Efficiency: 52.63% of 39.06 GB diff --git a/1_kmeans/solutions/A1/slurm-kmeans-gpu.out b/1_kmeans/solutions/A1/slurm-kmeans-gpu.out new file mode 100644 index 0000000..fc20c96 --- /dev/null +++ b/1_kmeans/solutions/A1/slurm-kmeans-gpu.out @@ -0,0 +1,45 @@ +############################## +# PyTorch k-Means Clustering # +############################## +Running on GPU... +Loading dataset from /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data]... +Open h5 file... +Torch tensor created. +Start fitting the data... +Iteration 0... +Iteration 1... +Iteration 2... +Iteration 3... +Iteration 4... +Iteration 5... +Iteration 6... +Iteration 7... +Iteration 8... +Iteration 9... +Iteration 10... +Iteration 11... +Iteration 12... +Iteration 13... +Iteration 14... +Iteration 15... +Iteration 16... +Iteration 17... +Iteration 18... +Iteration 19... +DONE. +Run time: 12.904788984917104 s + +============================= JOB FEEDBACK ============================= + +NodeName=uc2n513 +Job ID: 24553850 +Cluster: uc2 +User/Group: ku4408/scc +State: COMPLETED (exit code 0) +Nodes: 1 +Cores per node: 10 +CPU Utilized: 00:00:16 +CPU Efficiency: 1.39% of 00:19:10 core-walltime +Job Wall-clock time: 00:01:55 +Memory Utilized: 4.37 GB +Memory Efficiency: 4.76% of 91.80 GB diff --git a/1_kmeans/solutions/A1/submit_kmeans_cpu.sh b/1_kmeans/solutions/A1/submit_kmeans_cpu.sh new file mode 100644 index 0000000..1b96d61 --- /dev/null +++ b/1_kmeans/solutions/A1/submit_kmeans_cpu.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +#SBATCH --job-name=kmeans_cpu # job name +#SBATCH --partition=single # queue for resource allocation +#SBATCH --time=30:00 # wall-clock time limit +#SBATCH --mem=40000 # memory +#SBATCH --nodes=1 # number of nodes to be used +#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. + +export VENVDIR=~/scai-venv # Export path to your virtual environment. +export PYDIR=./ # Export path to directory containing Python script. + +# Set up modules. +module purge # Unload all currently loaded modules. +module load compiler/gnu/13.3 # Load required modules. +module load mpi/openmpi/4.1 +module load devel/cuda/12.4 +module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 +source ${VENVDIR}/bin/activate # Activate your virtual environment. + +python -u ${PYDIR}/kmeans.py # Run your Python script. diff --git a/1_kmeans/solutions/A1/submit_kmeans_gpu.sh b/1_kmeans/solutions/A1/submit_kmeans_gpu.sh new file mode 100644 index 0000000..7641c7a --- /dev/null +++ b/1_kmeans/solutions/A1/submit_kmeans_gpu.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +#SBATCH --job-name=kmeans_gpu +#SBATCH --partition=gpu_4 +#SBATCH --gres=gpu:1 # number of requested GPUs (GPU nodes shared between multiple jobs) +#SBATCH --time=10:00 # wall-clock time limit +#SBATCH --nodes=1 +#SBATCH --mail-type=ALL + +export VENVDIR=~/scai-venv # Export path to your virtual environment. +export PYDIR=./ # Export path to directory containing Python script. + +# Set up modules. +module purge # Unload all currently loaded modules. +module load compiler/gnu/13.3 # Load required modules. +module load mpi/openmpi/4.1 +module load devel/cuda/12.4 +module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 + +source ${VENVDIR}/bin/activate # Activate your virtual environment. + +python -u ${PYDIR}/kmeans.py # Run your Python script. diff --git a/1_kmeans/solutions/A2/kmeans_sample_parallel.py b/1_kmeans/solutions/A2/kmeans_sample_parallel.py new file mode 100644 index 0000000..5876433 --- /dev/null +++ b/1_kmeans/solutions/A2/kmeans_sample_parallel.py @@ -0,0 +1,258 @@ +""" +Sample-parallel implementation of k-means clustering in PyTorch using MPI +""" +import time + +import h5py +import torch +from mpi4py import MPI + + +class KMeans: + """ + Sample-parallel k-means clustering in PyTorch with MPI. + + Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines + locally which local sample belongs to which cluster before updating the centroids globally based on these local + assignments and continuing with the next iteration. + + Sample parallelism for k-means requires communication at two points in the code, i.e.: + 1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be + all-gathered by all ranks. + 2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the + centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to + be all-reduced to all ranks. + + Attributes + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters. + max_iter : int + The maximum number of iterations. + tol : float + The convergence criterion. + _inertia : float + The inertia (quantity to be checked for convergence). + _cluster_centers : Union[None, torch.Tensor] + The centroids. + _matching_centroids : torch.Tensor + Indices of nearest centroid for each sample. + """ + + def __init__( + self, + comm: MPI.Comm = MPI.COMM_WORLD, + n_clusters: int = 8, + max_iter: int = 300, + tol: float = -1.0, + ) -> None: + """ + Configure sample-parallel k-means clustering algorithm. + + Parameters + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters, i.e., k. + max_iter : int + The maximum number of iterations to be performed. + tol : float + The tolerance for the convergence criterion. + """ + self.comm = comm # MPI communicator + self.n_clusters = n_clusters # Number of clusters + self.max_iter = max_iter # Maximum number of iterations + self.tol = tol # Tolerance for convergence criterion + + self._inertia = float("nan") + self._cluster_centers = None + self._matching_centroids = None + + def _initialize_centroids(self, x: torch.Tensor) -> None: + """ + Randomly initialize the centroids. + + For sample parallelism, each rank holds an exclusive fraction of samples from the global dataset. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + """ + assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`. + n_clusters_local = ( + self.n_clusters // self.comm.size + ) # Determine number of local cluster centers per rank. + # Choose first `n_cluster_local` randomly shuffled indices as centroid indices. + # `torch.randperm(n)` returns random permutation of integers from 0 to n - 1. + # `x.shape[0]` gives number of samples in data. + x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]] + print( + f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = " + f"{list(x_local.shape)}." + ) + # --- Communication required for sample-parallel: All-gather local centroids to all ranks --- + self._cluster_centers = torch.empty( + (self.n_clusters, x.shape[1]), dtype=torch.float + ) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids. + + self.comm.Allgather( + [x_local, MPI.FLOAT], [self._cluster_centers, MPI.FLOAT] + ) # All-gather local centroids to all ranks. + if self.comm.rank == 0: + print( + f"Global cluster centers have shape [#samples, #features] = {list(self._cluster_centers.shape)}.", + ) + + def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor: + """ + Determine the closest centroids for each sample in dataset as measured by their Euclidean distance. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + + Returns + ------- + torch.Tensor + Indices of matching centroids for each sample in dataset. + """ + distances = torch.cdist( + x, self._cluster_centers + ) # Calculate Euclidean distances between samples and centroids. + return distances.argmin( + dim=1, keepdim=True + ) # Determine index of nearest centroid for each sample. + + def fit(self, x: torch.Tensor) -> "KMeans": + """ + Perform k-means clustering of given dataset. + + Parameters + ---------- + x : torch.Tensor + The dataset to cluster. + + Returns + ------- + KMeans + The fitted KMeans object containing final centroids. + """ + self._initialize_centroids(x) # Initialize centroids. + new_cluster_centers = self._cluster_centers.clone() + + # Iteratively fit points to centroids. + for idx in range(self.max_iter): + # Determine nearest centroids. + print( + f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...", + ) + self._matching_centroids = self._fit_to_cluster( + x + ) # Determine index of nearest centroid for each sample. + + # Update centroids. + for i in range(self.n_clusters): + # Locally determine samples in currently considered cluster i (binary encoding). + selection_mask = (self._matching_centroids == i).type(torch.int64) + + # Locally accumulate samples and total number of samples in cluster i. + assigned_points = (x * selection_mask).sum( + axis=0, keepdim=True + ) # Local directed sum of samples in cluster i. + + # Local number of samples in cluster i. + points_in_cluster = ( + selection_mask.sum(axis=0, keepdim=True) + .clamp(0.0, torch.iinfo(torch.int64).max) + .type(torch.int64) + ) # Clamp to 0 before communication + + # --- Communication required for sample-parallel: Allreduce local results --- + assigned_points_global = torch.empty( + assigned_points.shape, dtype=torch.float + ) # Initialize variable to store global directed sum of points assigned to cluster i. + points_in_cluster_global = torch.empty( + points_in_cluster.shape, dtype=torch.int64 + ) # Initialize variable to store global number of points assigned to cluster i. + self.comm.Allreduce( + assigned_points, assigned_points_global, op=MPI.SUM + ) # All-reduce local directed sums of points assigned to cluster i. + self.comm.Allreduce( + points_in_cluster, points_in_cluster_global, op=MPI.SUM + ) # All-reduce local numbers of points assigned to cluster i. + + # Compute new centroids. + new_cluster_centers[ + i : i + 1, : + ] = assigned_points_global / points_in_cluster_global.clamp( + 1.0, torch.iinfo(torch.int64).max + ) + + # Check whether centroid movement has converged. + self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum() + self._cluster_centers = new_cluster_centers.clone() + if self.tol is not None and self._inertia <= self.tol: + break + return self + + +if __name__ == "__main__": + comm = MPI.COMM_WORLD + rank = comm.rank + size = comm.size + + if rank == 0: + print( + "##############################################\n" + "# Sample-parallel PyTorch k-means clustering #\n" + "##############################################" + ) + + data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5" + dataset = "cityscapes_data" + + if rank == 0: + print(f"\nLoading data... {data_path}[{dataset}]\n") + + # Data is available in HDF5 format. + # An HDF5 file is a container for two kinds of objects: + # - datasets: array-like collections of data + # - groups: folder-like containers holding datasets and other groups + # Most fundamental thing to remember when using h5py is: + # Groups work like dictionaries, and datasets work like NumPy arrays. + with h5py.File(data_path, "r") as handle: + chunk = int( + handle[dataset].shape[0] / size + ) # Calculate local number of samples in each chunk. + # Load data chunks from file. + if rank == size - 1: + data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float) + else: + data = torch.tensor( + handle[dataset][rank * chunk : (rank + 1) * chunk], dtype=torch.float + ) + + print(f"Rank {rank}/{size}: \t[OK]") + + # k-means hyperparameters + num_clusters = 8 + num_iterations = 20 + + kmeans_clusterer = KMeans( + comm=comm, n_clusters=num_clusters, max_iter=num_iterations + ) + if rank == 0: + print("Start fitting the data...") + start = time.perf_counter() # Start runtime measurement. + + kmeans_clusterer.fit(data) # Perform actual k-means clustering. + + if rank == 0: + print( + f"DONE.\nRun time:\t{time.perf_counter()-start} s" + ) # Print measured runtime. diff --git a/1_kmeans/solutions/A2/kmeans_sample_parallel_allgatherv.py b/1_kmeans/solutions/A2/kmeans_sample_parallel_allgatherv.py new file mode 100644 index 0000000..47bc44c --- /dev/null +++ b/1_kmeans/solutions/A2/kmeans_sample_parallel_allgatherv.py @@ -0,0 +1,291 @@ +""" +Sample-parallel implementation of k-means clustering in PyTorch using MPI +""" +import time + +import h5py +import numpy as np +import torch +from mpi4py import MPI + + +class KMeans: + """ + Sample-parallel k-means clustering in PyTorch with MPI. + + Each rank holds an exclusive chunk of the complete dataset (distributed along the sample axis) and determines + locally which local sample belongs to which cluster before updating the centroids globally based on these local + assignments and continuing with the next iteration. + + Sample parallelism for k-means requires communication at two points in the code, i.e.: + 1. Centroid initialization: Each rank determines a respective number of centroids from its local data chunk to be + all-gathered by all ranks. + 2. Centroid update: Each rank needs to know all samples from other ranks in a certain cluster to update the + centroids correctly, i.e., the samples assigned to a certain cluster on each local rank and their number need to + be all-reduced to all ranks. + + Attributes + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters. + max_iter : int + The maximum number of iterations. + tol : float + The convergence criterion. + _inertia : float + The inertia (quantity to be checked for convergence). + _cluster_centers : Union[None, torch.Tensor] + The centroids. + _matching_centroids : torch.Tensor + Indices of nearest centroid for each sample. + """ + + def __init__( + self, + comm: MPI.Comm = MPI.COMM_WORLD, + n_clusters: int = 8, + max_iter: int = 300, + tol: float = -1.0, + ) -> None: + """ + Configure sample-parallel k-means clustering algorithm. + + Parameters + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters, i.e., k. + max_iter : int + The maximum number of iterations to be performed. + tol : float + The tolerance for the convergence criterion. + """ + self.comm = comm # MPI communicator + self.n_clusters = n_clusters # Number of clusters + self.max_iter = max_iter # Maximum number of iterations + self.tol = tol # Tolerance for convergence criterion + + self._inertia = float("nan") + self._cluster_centers = None + self._matching_centroids = None + + def _initialize_centroids(self, x): + """ + Randomly initialize the centroids. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + """ + assert self.n_clusters >= self.comm.size # Assume `n_clusters` >= `comm.size`. + # Determine number of local cluster centers per rank. + n_clusters_base = ( + self.n_clusters // self.comm.size + ) # Determine base number of local cluster centers per rank. + n_clusters_remain = self.n_clusters % self.comm.size # Determine remainder. + n_clusters_local = n_clusters_base + if self.comm.rank < n_clusters_remain: # Distribute remainder over first ranks. + n_clusters_local += 1 + # Choose first `n_cluster_local` randomly shuffled indices as centroid indices. + # `torch.randperm(n)` returns random permutation of integers from 0 to n - 1. + # `x` = data, `x.shape[0]` gives number of samples in data. + x_local = x[torch.randperm(x.shape[0])[:n_clusters_local]] + print( + f"Rank {self.comm.rank}/{self.comm.size}: Local cluster centers have shape [#samples, #features] = " + f"{list(x_local.shape)}." + ) + # --- Communication required for sample-parallel: All-gather local centroids to all ranks --- + cluster_centers = torch.empty( + (self.n_clusters, x.shape[1]), dtype=torch.float + ) # Initialize empty tensor of shape [# clusters, # features] on each rank to store all centroids. + # For vector variants: recvbuf = [data, counts, displacements, type] + # counts and displacements are integer tuples with as many elements as tasks in communicator. + # counts[i] designates the size of i-th segment, + # displacements[i] the number of elements in data used as first element of i-th section. + + # Set up counts array (of length comm.size) containing the number of elements to be received from each rank. + counts = n_clusters_base * torch.ones(self.comm.size) + counts[:n_clusters_remain] += 1 + counts = torch.mul( + counts, x.shape[1] + ) + if rank == 0: + print(f"Recvbuf counts = {counts}") + # Set up displs array (of length comm.size); entry i specifies the displacement (relative to recvbuf) + # at which to place the incoming data from process i. + displ = torch.zeros(self.comm.size) + displ[1:] = torch.cumsum(counts, dim=0)[ + :-1 + ] + if rank == 0: + print(f"Recvbuf displacements = {displ}") + sendbuf = [x_local, MPI.FLOAT] # Set up send buffer. + recvbuf = [ + cluster_centers, + np.asarray(counts, dtype=int), + np.asarray(displ, dtype=int), + MPI.FLOAT, + ] # Set up receive buffer. + self.comm.Allgatherv( + sendbuf, recvbuf + ) # All-gather local centroids to all ranks. + if rank == 0: + print( + f"Global cluster centers have shape [#samples, #features] = {list(cluster_centers.shape)}." + ) + self._cluster_centers = cluster_centers # Set centroids as class attribute. + + def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor: + """ + Determine the closest centroids for each sample in dataset as measured by their Euclidean distance. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + + Returns + ------- + torch.Tensor + Indices of matching centroids for each sample in dataset. + """ + distances = torch.cdist( + x, self._cluster_centers + ) # Calculate Euclidean distances between samples and centroids. + return distances.argmin( + dim=1, keepdim=True + ) # Determine index of nearest centroid for each sample. + + def fit(self, x: torch.Tensor) -> "KMeans": + """ + Perform k-means clustering of given dataset. + + Parameters + ---------- + x : torch.Tensor + The dataset to cluster. + + Returns + ------- + KMeans + The fitted KMeans object containing final centroids. + """ + self._initialize_centroids(x) # Initialize centroids. + new_cluster_centers = self._cluster_centers.clone() + + # Iteratively fit points to centroids. + for idx in range(self.max_iter): + # Determine nearest centroids. + print( + f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...", + ) + self._matching_centroids = self._fit_to_cluster( + x + ) # Determine index of nearest centroid for each sample. + + # Update centroids. + for i in range(self.n_clusters): + # Locally determine samples in currently considered cluster i (binary encoding). + selection_mask = (self._matching_centroids == i).type(torch.int64) + + # Locally accumulate samples and total number of samples in cluster i. + assigned_points = (x * selection_mask).sum( + axis=0, keepdim=True + ) # Local directed sum of samples in cluster i. + + # Local number of samples in cluster i. + points_in_cluster = ( + selection_mask.sum(axis=0, keepdim=True) + .clamp(0.0, torch.iinfo(torch.int64).max) + .type(torch.int64) + ) # Clamp to 0 before communication + + # ------------ Communication required for sample-parallel: Allreduce local results ------------ + assigned_points_global = torch.empty( + assigned_points.shape, dtype=torch.float + ) # Initialize variable to store global directed sum of points assigned to cluster i. + points_in_cluster_global = torch.empty( + points_in_cluster.shape, dtype=torch.int64 + ) # Initialize variable to store global number of points assigned to cluster i. + self.comm.Allreduce( + assigned_points, assigned_points_global, op=MPI.SUM + ) # All-reduce local directed sums of points assigned to cluster i. + self.comm.Allreduce( + points_in_cluster, points_in_cluster_global, op=MPI.SUM + ) # All-reduce local numbers of points assigned to cluster i. + + # Compute new centroids. + new_cluster_centers[ + i : i + 1, : + ] = assigned_points_global / points_in_cluster_global.clamp( + 1.0, torch.iinfo(torch.int64).max + ) + + # Check whether centroid movement has converged. + self._inertia = ((self._cluster_centers - new_cluster_centers) ** 2).sum() + self._cluster_centers = new_cluster_centers.clone() + if self.tol is not None and self._inertia <= self.tol: + break + return self + + +if __name__ == "__main__": + comm = MPI.COMM_WORLD + rank = comm.rank + size = comm.size + + if rank == 0: + print( + "##############################################\n" + "# Sample-parallel PyTorch k-means clustering #\n" + "##############################################" + ) + + data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5" + dataset = "cityscapes_data" + + if rank == 0: + print(f"\nLoading data... {data_path}[{dataset}]\n") + + # Data is available in HDF5 format. + # An HDF5 file is a container for two kinds of objects: + # - datasets: array-like collections of data + # - groups: folder-like containers holding datasets and other groups + # Most fundamental thing to remember when using h5py is: + # Groups work like dictionaries, and datasets work like NumPy arrays. + with h5py.File(data_path, "r") as handle: + chunk = int( + handle[dataset].shape[0] / size + ) # Calculate local number of samples in each chunk. + # Load data chunks from file (distributed along sample axis). + if rank == size - 1: + data = torch.tensor(handle[dataset][rank * chunk :], dtype=torch.float) + else: + data = torch.tensor( + handle[dataset][rank * chunk : (rank + 1) * chunk], + dtype=torch.float, + ) + + print(f"Rank {rank}/{size}: \t[OK]") + + # k-means hyperparameters + num_clusters = 9 + num_iterations = 20 + + kmeans_clusterer = KMeans( + comm=comm, n_clusters=num_clusters, max_iter=num_iterations + ) + if rank == 0: + print("Start fitting the data...") + start = time.perf_counter() # Start runtime measurement. + + kmeans_clusterer.fit(data) # Perform actual k-means clustering. + + if rank == 0: + print( + f"DONE.\nRun time:\t{time.perf_counter() - start} s" + ) # Print measured runtime. diff --git a/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel-allgatherv.out b/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel-allgatherv.out new file mode 100644 index 0000000..fe77754 --- /dev/null +++ b/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel-allgatherv.out @@ -0,0 +1,115 @@ +############################################## +# PyTorch sample-parallel k-means clustering # +############################################## + +Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data] + +Rank 0/4: [OK] +Start fitting the data... +Rank 0/4: Local cluster centers have shape [#samples, #features] = [3, 6291456]. +Recvbuf counts = tensor([18874368., 12582912., 12582912., 12582912.]) +Recvbuf displacements = tensor([ 0., 18874368., 31457280., 44040192.]) +Rank 3/4: [OK] +Rank 2/4: [OK] +Rank 1/4: [OK] +Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Global cluster centers have shape [#samples, #features] = [9, 6291456]. +Rank 2/4: Iteration 0... +Rank 1/4: Iteration 0... +Rank 0/4: Iteration 0... +Rank 3/4: Iteration 0... +Rank 0/4: Iteration 1... +Rank 1/4: Iteration 1... +Rank 2/4: Iteration 1... +Rank 3/4: Iteration 1... +Rank 1/4: Iteration 2... +Rank 2/4: Iteration 2... +Rank 0/4: Iteration 2... +Rank 3/4: Iteration 2... +Rank 0/4: Iteration 3... +Rank 2/4: Iteration 3... +Rank 1/4: Iteration 3... +Rank 3/4: Iteration 3... +Rank 1/4: Iteration 4... +Rank 0/4: Iteration 4... +Rank 2/4: Iteration 4... +Rank 3/4: Iteration 4... +Rank 2/4: Iteration 5... +Rank 0/4: Iteration 5... +Rank 1/4: Iteration 5... +Rank 3/4: Iteration 5... +Rank 1/4: Iteration 6... +Rank 0/4: Iteration 6... +Rank 2/4: Iteration 6... +Rank 3/4: Iteration 6... +Rank 2/4: Iteration 7... +Rank 0/4: Iteration 7... +Rank 1/4: Iteration 7... +Rank 3/4: Iteration 7... +Rank 1/4: Iteration 8... +Rank 0/4: Iteration 8... +Rank 2/4: Iteration 8... +Rank 3/4: Iteration 8... +Rank 1/4: Iteration 9... +Rank 2/4: Iteration 9... +Rank 0/4: Iteration 9... +Rank 3/4: Iteration 9... +Rank 1/4: Iteration 10... +Rank 2/4: Iteration 10... +Rank 0/4: Iteration 10... +Rank 3/4: Iteration 10... +Rank 2/4: Iteration 11... +Rank 1/4: Iteration 11... +Rank 0/4: Iteration 11... +Rank 3/4: Iteration 11... +Rank 1/4: Iteration 12... +Rank 0/4: Iteration 12... +Rank 2/4: Iteration 12... +Rank 3/4: Iteration 12... +Rank 0/4: Iteration 13... +Rank 1/4: Iteration 13... +Rank 2/4: Iteration 13... +Rank 3/4: Iteration 13... +Rank 1/4: Iteration 14... +Rank 0/4: Iteration 14... +Rank 2/4: Iteration 14... +Rank 3/4: Iteration 14... +Rank 2/4: Iteration 15... +Rank 0/4: Iteration 15... +Rank 1/4: Iteration 15... +Rank 3/4: Iteration 15... +Rank 0/4: Iteration 16... +Rank 1/4: Iteration 16... +Rank 2/4: Iteration 16... +Rank 3/4: Iteration 16... +Rank 2/4: Iteration 17... +Rank 1/4: Iteration 17... +Rank 0/4: Iteration 17... +Rank 3/4: Iteration 17... +Rank 1/4: Iteration 18... +Rank 0/4: Iteration 18... +Rank 2/4: Iteration 18... +Rank 3/4: Iteration 18... +Rank 2/4: Iteration 19... +Rank 1/4: Iteration 19... +Rank 0/4: Iteration 19... +Rank 3/4: Iteration 19... +DONE. +Run time: 205.8100015646778 s + +============================= JOB FEEDBACK ============================= + +NodeName=uc2n[001-004] +Job ID: 24553925 +Cluster: uc2 +User/Group: ku4408/scc +State: COMPLETED (exit code 0) +Nodes: 4 +Cores per node: 80 +CPU Utilized: 00:14:02 +CPU Efficiency: 1.07% of 21:52:00 core-walltime +Job Wall-clock time: 00:04:06 +Memory Utilized: 4.62 GB +Memory Efficiency: 2.96% of 156.25 GB diff --git a/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel.out b/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel.out new file mode 100644 index 0000000..7055a12 --- /dev/null +++ b/1_kmeans/solutions/A2/slurm-kmeans-sample-parallel.out @@ -0,0 +1,113 @@ +############################################## +# PyTorch sample-parallel k-means clustering # +############################################## + +Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data] + +Rank 2/4: [OK] +Rank 1/4: [OK] +Rank 3/4: [OK] +Rank 2/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Rank 1/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Rank 3/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Rank 0/4: [OK] +Start fitting the data... +Rank 0/4: Local cluster centers have shape [#samples, #features] = [2, 6291456]. +Global cluster centers have shape [#samples, #features] = [8, 6291456]. +Rank 1/4: Iteration 0... +Rank 2/4: Iteration 0... +Rank 3/4: Iteration 0... +Rank 0/4: Iteration 0... +Rank 1/4: Iteration 1... +Rank 2/4: Iteration 1... +Rank 0/4: Iteration 1... +Rank 3/4: Iteration 1... +Rank 2/4: Iteration 2... +Rank 1/4: Iteration 2... +Rank 0/4: Iteration 2... +Rank 3/4: Iteration 2... +Rank 2/4: Iteration 3... +Rank 1/4: Iteration 3... +Rank 0/4: Iteration 3... +Rank 3/4: Iteration 3... +Rank 2/4: Iteration 4... +Rank 0/4: Iteration 4... +Rank 3/4: Iteration 4... +Rank 1/4: Iteration 4... +Rank 1/4: Iteration 5... +Rank 2/4: Iteration 5... +Rank 0/4: Iteration 5... +Rank 3/4: Iteration 5... +Rank 2/4: Iteration 6... +Rank 0/4: Iteration 6... +Rank 3/4: Iteration 6... +Rank 1/4: Iteration 6... +Rank 1/4: Iteration 7... +Rank 2/4: Iteration 7... +Rank 0/4: Iteration 7... +Rank 3/4: Iteration 7... +Rank 2/4: Iteration 8... +Rank 1/4: Iteration 8... +Rank 0/4: Iteration 8... +Rank 3/4: Iteration 8... +Rank 1/4: Iteration 9... +Rank 2/4: Iteration 9... +Rank 0/4: Iteration 9... +Rank 3/4: Iteration 9... +Rank 2/4: Iteration 10... +Rank 1/4: Iteration 10... +Rank 0/4: Iteration 10... +Rank 3/4: Iteration 10... +Rank 1/4: Iteration 11... +Rank 2/4: Iteration 11... +Rank 0/4: Iteration 11... +Rank 3/4: Iteration 11... +Rank 2/4: Iteration 12... +Rank 0/4: Iteration 12... +Rank 1/4: Iteration 12... +Rank 3/4: Iteration 12... +Rank 2/4: Iteration 13... +Rank 0/4: Iteration 13... +Rank 3/4: Iteration 13... +Rank 1/4: Iteration 13... +Rank 2/4: Iteration 14... +Rank 1/4: Iteration 14... +Rank 0/4: Iteration 14... +Rank 3/4: Iteration 14... +Rank 0/4: Iteration 15... +Rank 2/4: Iteration 15... +Rank 3/4: Iteration 15... +Rank 1/4: Iteration 15... +Rank 1/4: Iteration 16... +Rank 2/4: Iteration 16... +Rank 0/4: Iteration 16... +Rank 3/4: Iteration 16... +Rank 2/4: Iteration 17... +Rank 1/4: Iteration 17... +Rank 0/4: Iteration 17... +Rank 3/4: Iteration 17... +Rank 1/4: Iteration 18... +Rank 2/4: Iteration 18... +Rank 0/4: Iteration 18... +Rank 3/4: Iteration 18... +Rank 1/4: Iteration 19... +Rank 0/4: Iteration 19... +Rank 2/4: Iteration 19... +Rank 3/4: Iteration 19... +DONE. +Run time: 187.21997308498248 s + +============================= JOB FEEDBACK ============================= + +NodeName=uc2n[001-004] +Job ID: 24553855 +Cluster: uc2 +User/Group: ku4408/scc +State: COMPLETED (exit code 0) +Nodes: 4 +Cores per node: 80 +CPU Utilized: 00:12:47 +CPU Efficiency: 1.05% of 20:16:00 core-walltime +Job Wall-clock time: 00:03:48 +Memory Utilized: 15.63 GB (estimated maximum) +Memory Efficiency: 10.00% of 156.25 GB (39.06 GB/node) diff --git a/1_kmeans/solutions/A2/submit_kmeans_sample_parallel.sh b/1_kmeans/solutions/A2/submit_kmeans_sample_parallel.sh new file mode 100644 index 0000000..7f45138 --- /dev/null +++ b/1_kmeans/solutions/A2/submit_kmeans_sample_parallel.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#SBATCH --job-name=kmeans_sample # job name +#SBATCH --partition=dev_multiple # queue for resource allocation +#SBATCH --nodes=4 # number of nodes to be used +#SBATCH --time=30:00 # wall-clock time limit +#SBATCH --mem=40000 # memory per node +#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task +#SBATCH --ntasks-per-node=1 # maximum count of tasks per node +#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. + +export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} +export VENVDIR=~/scai-venv # Export path to your virtual environment. +export PYDIR=./ # Export path to directory containing Python script. + +# Set up modules. +module purge # Unload all currently loaded modules. +module load compiler/gnu/13.3 # Load required modules. +module load mpi/openmpi/4.1 +module load devel/cuda/12.4 +module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 + +source ${VENVDIR}/bin/activate # Activate your virtual environment. + +mpirun python -u ${PYDIR}/kmeans_sample_parallel.py # Run your Python script. diff --git a/1_kmeans/solutions/A2/submit_kmeans_sample_parallel_allgatherv.sh b/1_kmeans/solutions/A2/submit_kmeans_sample_parallel_allgatherv.sh new file mode 100644 index 0000000..0eb7ead --- /dev/null +++ b/1_kmeans/solutions/A2/submit_kmeans_sample_parallel_allgatherv.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#SBATCH --job-name=kmeans_sample_v # job name +#SBATCH --partition=dev_multiple # queue for resource allocation +#SBATCH --nodes=4 # number of nodes to be used +#SBATCH --time=30:00 # wall-clock time limit +#SBATCH --mem=40000 # memory per node +#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task +#SBATCH --ntasks-per-node=1 # maximum count of tasks per node +#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. + +export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} +export VENVDIR=~/scai-venv # Export path to your virtual environment. +export PYDIR=./ # Export path to directory containing Python script. + +# Set up modules. +module purge # Unload all currently loaded modules. +module load compiler/gnu/13.3 # Load required modules. +module load mpi/openmpi/4.1 +module load devel/cuda/12.4 +module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 + +source ${VENVDIR}/bin/activate # Activate your virtual environment. + +mpirun python -u ${PYDIR}/kmeans_sample_parallel_allgatherv.py # Run your Python script. diff --git a/1_kmeans/solutions/A3/kmeans_feature_parallel.py b/1_kmeans/solutions/A3/kmeans_feature_parallel.py new file mode 100644 index 0000000..4bf41a9 --- /dev/null +++ b/1_kmeans/solutions/A3/kmeans_feature_parallel.py @@ -0,0 +1,243 @@ +""" +Feature-parallel implementation of k-means clustering in PyTorch using MPI +""" +import time + +import h5py +import torch +from mpi4py import MPI + + +class KMeans: + """ + Feature-parallel k-means clustering in PyTorch with MPI. + + Each rank holds an exclusive chunk of the complete dataset (distributed along the feature axis, i.e., each rank + holds all samples but only an exclusive fraction of their features). and determines + locally which local sample belongs to which cluster before updating the centroids globally based on these local + assignments and continuing with the next iteration. + + Feature parallelism for k-means requires communication at two points in the code, i.e.: + 1. Centroid initialization: Either, one rank determines the initial centroids and broadcasts them to all other ranks + or all ranks randomly choose the initial centroids using the same seed. + 2. Cluster assignment: To determine the nearest centroid for each sample, we need to calculate its distance to each + cluster center considering all features. To achieve this, we compute the contribution of each local fraction of + features to the squared distance before summing these local squared distances up to obtain a global value. + 3. Convergence check: Again, the local contributions to the inertia need to be summed up to obtain a global value. + + + Attributes + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters. + max_iter : int + The maximum number of iterations. + tol : float + The convergence criterion. + _inertia : float + The inertia (quantity to be checked for convergence). + _cluster_centers : Union[None, torch.Tensor] + The centroids. + _matching_centroids : torch.Tensor + Indices of nearest centroid for each sample. + """ + + def __init__( + self, + comm: MPI.Comm = MPI.COMM_WORLD, + n_clusters: int = 8, + max_iter: int = 300, + tol: float = -1.0, + ) -> None: + """ + Configure feature-parallel k-means clustering algorithm. + + Parameters + ---------- + comm : MPI.Comm + The MPI communicator. + n_clusters : int + The number of clusters, i.e., k. + max_iter : int + The maximum number of iterations to be performed. + tol : float + The tolerance for the convergence criterion. + """ + self.comm = comm # MPI communicator + self.n_clusters = n_clusters # Number of clusters + self.max_iter = max_iter # Maximum number of iterations + self.tol = tol # Tolerance for convergence criterion + + self._inertia = float("nan") + self._cluster_centers = None + self._matching_centroids = None + + def _initialize_centroids(self, x: torch.Tensor) -> None: + """ + Randomly initialize the centroids. + + For feature parallelism, all samples of the dataset are partially present on each rank. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + """ + print( + f"Rank {self.comm.rank}/{self.comm.size}: Shape of data [#samples, #features] = {list(x.shape)}" + ) + # As all samples are partially present on each rank, only shuffle on root to determine centroids. + # Alternatively, all ranks can shuffle using the same seed. + if rank == 0: + indices = torch.randperm(x.shape[0])[ + : self.n_clusters + ] # Choose first `n_cluster` randomly shuffled indices as centroid indices. + + # `torch.randperm(n)` returns random permutation of integers from 0 to n - 1. + # `x.shape[0]` gives number of samples in data. + else: + indices = torch.empty(self.n_clusters, dtype=torch.int64) + self.comm.Bcast(indices, root=0) + self._cluster_centers = x[indices] # Set centroids as class attribute. + + def _fit_to_cluster(self, x: torch.Tensor) -> torch.Tensor: + """ + Determine the closest centroids for each sample in dataset as measured by their Euclidean distance. + + Parameters + ---------- + x : torch.Tensor + The dataset to be clustered. + + Returns + ------- + torch.Tensor + Indices of matching centroids for each sample in dataset. + """ + squared_distances_local = ( + torch.cdist(x, self._cluster_centers) ** 2 + ) # Calculate squared distance between samples and centroids for locally present fraction of features. + squared_distances_global = torch.empty( + squared_distances_local.shape, dtype=torch.float + ) # Initialize variable for global squared distance between samples and centroids for all features. + self.comm.Allreduce( + squared_distances_local, squared_distances_global, op=MPI.SUM + ) # Sum up squared distances over features. + return squared_distances_global.argmin( + dim=1, keepdim=True + ) # Return index of nearest centroid for each sample. + + def fit(self, x: torch.Tensor) -> "KMeans": + """ + Perform k-means clustering of given dataset. + + Parameters + ---------- + x : torch.Tensor + The dataset to cluster. + + Returns + ------- + KMeans + The fitted KMeans object containing final centroids. + """ + self._initialize_centroids(x) # Initialize centroids. + new_cluster_centers = self._cluster_centers.clone() + + # Iteratively fit points to centroids. + for idx in range(self.max_iter): + # Determine nearest centroids. + print(f"Rank {self.comm.rank}/{self.comm.size}: Iteration {idx}...") + self._matching_centroids = self._fit_to_cluster( + x + ) # Redundantly determine index of nearest centroid for each sample. + + # Update centroids. + for i in range(self.n_clusters): + # Locally determine samples in currently considered cluster i (binary encoding). + selection_mask = (self._matching_centroids == i).type(torch.int64) + + # Locally accumulate samples and total number of samples in cluster i. + assigned_points = (x * selection_mask).sum( + axis=0, keepdim=True + ) # Sum up samples in cluster i. + points_in_cluster = selection_mask.sum( + axis=0, keepdim=True + ).clamp( # Determine overall number of samples in cluster i. + 1.0, torch.iinfo(torch.int64).max + ) + + # Compute new centroids. + new_cluster_centers[ + i : i + 1, : + ] = assigned_points / points_in_cluster.clamp( + 1.0, torch.iinfo(torch.int64).max + ) + + # Check whether centroid movement has converged. + inertia_local = ((self._cluster_centers - new_cluster_centers) ** 2).sum() + inertia_global = torch.empty(inertia_local.shape) + self.comm.Allreduce(inertia_local, inertia_global, op=MPI.SUM) + self._inertia = inertia_global + self._cluster_centers = new_cluster_centers.clone() + if self.tol is not None and self._inertia <= self.tol: + break + return self + + +if __name__ == "__main__": + comm = MPI.COMM_WORLD + rank = comm.rank + size = comm.size + + if rank == 0: + print( + "###############################################\n" + "# Feature-parallel PyTorch k-means clustering #\n" + "###############################################" + ) + + data_path = "/pfs/work7/workspace/scratch/ku4408-VL_ScalableAI/data/cityscapes_300.h5" + dataset = "cityscapes_data" + + if rank == 0: + print(f"\nLoading data... {data_path}[{dataset}]\n") + + # Data is available in HDF5 format. + # An HDF5 file is a container for two kinds of objects: + # - datasets: array-like collections of data + # - groups: folder-like containers holding datasets and other groups + # Most fundamental thing to remember when using h5py is: + # Groups work like dictionaries, and datasets work like NumPy arrays. + with h5py.File(data_path, "r") as handle: + chunk = int( + handle[dataset].shape[1] / size + ) # Calculate local number of features in each chunk. + if ( + rank == size - 1 + ): # Load data chunks from file (distributed along feature axis). + data = torch.tensor(handle[dataset][:, rank * chunk :]) + else: + data = torch.tensor(handle[dataset][:, rank * chunk : (rank + 1) * chunk]) + + print(f"Rank {rank}/{size}: \t[OK]") + + # k-means hyperparameters + num_clusters = 8 + num_iterations = 20 + + kmeans_clusterer = KMeans( + comm=comm, n_clusters=num_clusters, max_iter=num_iterations + ) + if rank == 0: + print("Start fitting the data...") + start = time.perf_counter() # Start runtime measurement. + + kmeans_clusterer.fit(data) # Perform actual k-means clustering. + + if rank == 0: + print( + f"DONE.\nRun time:\t{time.perf_counter() - start} s" + ) # Print measured runtime. diff --git a/1_kmeans/solutions/A3/slurm-kmeans-feature-parallel.out b/1_kmeans/solutions/A3/slurm-kmeans-feature-parallel.out new file mode 100644 index 0000000..4f2f7be --- /dev/null +++ b/1_kmeans/solutions/A3/slurm-kmeans-feature-parallel.out @@ -0,0 +1,112 @@ +############################################### +# Feature-parallel PyTorch k-means clustering # +############################################### + +Loading data... /pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/cityscapes_300.h5[cityscapes_data] + +Rank 1/4: [OK] +Rank 1/4: Shape of data [#samples, #features] = [300, 1572864] +Rank 3/4: [OK] +Rank 3/4: Shape of data [#samples, #features] = [300, 1572864] +Rank 2/4: [OK] +Rank 2/4: Shape of data [#samples, #features] = [300, 1572864] +Rank 0/4: [OK] +Start fitting the data... +Rank 0/4: Shape of data [#samples, #features] = [300, 1572864] +Rank 0/4: Iteration 0... +Rank 3/4: Iteration 0... +Rank 2/4: Iteration 0... +Rank 1/4: Iteration 0... +Rank 0/4: Iteration 1... +Rank 3/4: Iteration 1... +Rank 1/4: Iteration 1... +Rank 2/4: Iteration 1... +Rank 0/4: Iteration 2... +Rank 1/4: Iteration 2... +Rank 3/4: Iteration 2... +Rank 2/4: Iteration 2... +Rank 0/4: Iteration 3... +Rank 1/4: Iteration 3... +Rank 3/4: Iteration 3... +Rank 2/4: Iteration 3... +Rank 0/4: Iteration 4... +Rank 1/4: Iteration 4... +Rank 3/4: Iteration 4... +Rank 2/4: Iteration 4... +Rank 0/4: Iteration 5... +Rank 2/4: Iteration 5... +Rank 1/4: Iteration 5... +Rank 3/4: Iteration 5... +Rank 0/4: Iteration 6... +Rank 1/4: Iteration 6... +Rank 3/4: Iteration 6... +Rank 2/4: Iteration 6... +Rank 0/4: Iteration 7... +Rank 3/4: Iteration 7... +Rank 2/4: Iteration 7... +Rank 1/4: Iteration 7... +Rank 0/4: Iteration 8... +Rank 2/4: Iteration 8... +Rank 1/4: Iteration 8... +Rank 3/4: Iteration 8... +Rank 0/4: Iteration 9... +Rank 1/4: Iteration 9... +Rank 2/4: Iteration 9... +Rank 3/4: Iteration 9... +Rank 0/4: Iteration 10... +Rank 3/4: Iteration 10... +Rank 1/4: Iteration 10... +Rank 2/4: Iteration 10... +Rank 0/4: Iteration 11... +Rank 2/4: Iteration 11... +Rank 1/4: Iteration 11... +Rank 3/4: Iteration 11... +Rank 0/4: Iteration 12... +Rank 1/4: Iteration 12... +Rank 2/4: Iteration 12... +Rank 3/4: Iteration 12... +Rank 0/4: Iteration 13... +Rank 1/4: Iteration 13... +Rank 2/4: Iteration 13... +Rank 3/4: Iteration 13... +Rank 0/4: Iteration 14... +Rank 1/4: Iteration 14... +Rank 3/4: Iteration 14... +Rank 2/4: Iteration 14... +Rank 0/4: Iteration 15... +Rank 2/4: Iteration 15... +Rank 1/4: Iteration 15... +Rank 3/4: Iteration 15... +Rank 0/4: Iteration 16... +Rank 1/4: Iteration 16... +Rank 3/4: Iteration 16... +Rank 2/4: Iteration 16... +Rank 0/4: Iteration 17... +Rank 3/4: Iteration 17... +Rank 2/4: Iteration 17... +Rank 1/4: Iteration 17... +Rank 0/4: Iteration 18... +Rank 1/4: Iteration 18... +Rank 3/4: Iteration 18... +Rank 2/4: Iteration 18... +Rank 0/4: Iteration 19... +Rank 1/4: Iteration 19... +Rank 3/4: Iteration 19... +Rank 2/4: Iteration 19... +DONE. +Run time: 174.21738585596904 s + +============================= JOB FEEDBACK ============================= + +NodeName=uc2n[001-004] +Job ID: 24553857 +Cluster: uc2 +User/Group: ku4408/scc +State: COMPLETED (exit code 0) +Nodes: 4 +Cores per node: 80 +CPU Utilized: 00:11:56 +CPU Efficiency: 1.04% of 19:06:40 core-walltime +Job Wall-clock time: 00:03:35 +Memory Utilized: 5.43 GB +Memory Efficiency: 3.47% of 156.25 GB diff --git a/1_kmeans/solutions/A3/submit_kmeans_feature_parallel.sh b/1_kmeans/solutions/A3/submit_kmeans_feature_parallel.sh new file mode 100644 index 0000000..5801f0f --- /dev/null +++ b/1_kmeans/solutions/A3/submit_kmeans_feature_parallel.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#SBATCH --job-name=kmeans_feature # job name +#SBATCH --partition=dev_multiple # queue for resource allocation +#SBATCH --nodes=4 # number of nodes to be used +#SBATCH --time=30:00 # wall-clock time limit +#SBATCH --mem=40000 # memory per node +#SBATCH --cpus-per-task=40 # number of CPUs required per MPI task +#SBATCH --ntasks-per-node=1 # maximum count of tasks per node +#SBATCH --mail-type=ALL # Notify user by email when certain event types occur. + +export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} +export VENVDIR=~/scai-venv # Export path to your virtual environment. +export PYDIR=./ # Export path to directory containing Python script. + +# Set up modules. +module purge # Unload all currently loaded modules. +module load compiler/gnu/13.3 # Load required modules. +module load mpi/openmpi/4.1 +module load devel/cuda/12.4 +module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1 + +source ${VENVDIR}/bin/activate # Activate your virtual environment. + +mpirun python -u ${PYDIR}/kmeans_feature_parallel.py # Run your Python script. -- GitLab