Skip to content
Snippets Groups Projects
Commit c1d1f7bf authored by Kaleb Phipps's avatar Kaleb Phipps
Browse files

add solutions for logit exercise

parent 0e4f5c9d
No related branches found
No related tags found
No related merge requests found
"""
Data-parallel logistic regression
"""
import argparse
import time
from typing import Union
import h5py
from mpi4py import MPI
import numpy as np
np.random.seed(842424) # Fix random seed for reproducibility.
def sigmoid(z: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Compute sigmoid.
Parameters
----------
z : float
The input for the sigmoid function.
Returns
----------
float
The input's sigmoid function value.
"""
return 1.0 / (1.0 + np.exp(-z))
def lr_predict(w: np.ndarray, x: np.ndarray) -> np.ndarray:
"""
Return prediction of logit model for data x using model weights w.
Parameters
----------
x : np.ndarray[float]
The dataset (after bias trick), shape = [n_samples, n_features +1].
The 0th input should be 1.0 to take the bias into account in a simple dot product.
w : np.ndarray[float]
The parameters, i.e., weights to be learned (after bias trick), shape = [n_features + 1, ].
There is one weight for every input dimension plus a bias.
Returns
-------
np.ndarray[float]
The predicted activations of the logit model for the input dataset, shape = [n_samples, ],
i.e., the sigmoid of the dot product of the weights and the input data.
"""
return sigmoid(x @ w)
def mse(y_est: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Compute mean-square-error loss.
Parameters
----------
y_est : np.ndarray[float]
The predictions, shape = [n_samples, ].
y : np.ndarray[float]
The ground-truth labels, shape = [n_samples, ].
Returns
----------
np.ndarray[float]
MSE loss
"""
return (
(1.0 / y.shape[0]) * (y - y_est).T @ (y - y_est)
) # Return MSE loss for considered batch.
def lr_loss(
w: np.ndarray, x: np.ndarray, y: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
"""
Return the loss and the gradient with respect to the weights.
Parameters
----------
w : np.ndarray[float]
The model's weights to be learned, where weights[0] is the bias.
x : np.ndarray[float]
The input data of shape [N x D+1], 0th element of each sample is assumed to be 1 (bias trick).
y : np.ndarray[float]
The ground-truth labels of shape [N,].
Returns
-------
np.ndarray[float]
The scalar mean-square-error loss for the input batch of samples.
np.ndarray[float]
The gradient of the loss with respect to the weights for the batch.
"""
y_est = lr_predict(w, x) # Compute logit prediction for all samples in batch.
loss = mse(y_est, y) # Compute MSE loss over all samples in batch.
# Compute gradient vector of loss w.r.t. weights.
gradient = (
(-2.0 / y.shape[0]) * ((y - y_est) * y_est * y_est * np.exp(-x @ w)).T @ x
)
return loss, gradient
def lr_train(
w: np.ndarray,
x: np.ndarray,
y: np.ndarray,
comm: MPI.Comm = MPI.COMM_WORLD,
epochs: int = 100,
eta: float = 0.001,
b: int = 10,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""
Train the model, i.e., update the weights following the negative gradient until the model converges.
Parameters
----------
w : np.ndarray[float]
The model weights to be learned, where weights[0] is the bias.
x : np.ndarray[float]
The input data of shape [N x D+1], where each sample's 0th element is assumed to be 1 for bias trick.
y : np.ndarray[float]
The ground-truth labels of shape [N,].
epochs : int
The number of epochs to be trained.
eta : float
The learning rate.
b : int
The batch size.
Returns
-------
np.ndarray[float]
The trained weights.
np.ndarray[float]
The history array with each epoch's loss.
np.ndarray[float]
The history array with each epoch's accuracy.
float
The average training time per epoch.
"""
size, rank = comm.size, comm.rank
# Apply bias trick.
n_samples = y.shape[0] # Determine number of samples.
n_batches = n_samples // b # Determine number of full batches in data (drop last).
print(f"Rank {rank}/{size}: Data is divided into {n_batches} batches.")
loss_history = np.zeros(epochs)
acc_history = np.zeros(epochs)
training_time_per_epoch = 0.0 # Initiate training time per epoch.
for epoch in range(epochs): # Loop over epochs.
# The number of epochs is a hyperparameter of gradient descent
# that controls the number of complete passes through the training dataset.
# The batch size is a hyperparameter of gradient descent
# that controls the number of training samples to work through before the
# model’s internal parameters are updated.
loss_sum = 0.0 # Initiate loss for each epoch.
accuracy = 0.0 # Initiate accuracy for each epoch.
start = time.perf_counter()
for nb in range(n_batches):
x_ = x[nb * b : (nb + 1) * b]
y_ = y[nb * b : (nb + 1) * b]
loss, gradient = lr_loss(w, x_, y_)
loss_sum += loss
corr = np.sum((lr_predict(w, x_) + 0.5).astype(int) == y_)
accuracy += corr
gradient_global = np.zeros_like(gradient)
comm.Allreduce(gradient, gradient_global, op=MPI.SUM)
gradient_global /= size
w -= eta * gradient_global
end = time.perf_counter()
# Calculate loss + accuracy after each epoch.
loss_sum /= n_batches
accuracy /= n_samples
accuracy *= 100
loss_sum_global = comm.allreduce(loss_sum, op=MPI.SUM) / size
accuracy_global = comm.allreduce(accuracy, op=MPI.SUM) / size
loss_history[epoch] = loss_sum_global
acc_history[epoch] = accuracy_global
training_time_per_epoch += end - start
# Print every tenth epoch the training status.
if rank == 0:
if epoch % 10 == 0:
print(
f"Epoch: {epoch}, Loss: {loss_sum_global}, Accuracy: {accuracy_global}"
)
training_time_per_epoch /= epochs
training_time_per_epoch_global = (
comm.allreduce(training_time_per_epoch, op=MPI.SUM) / size
)
return w, loss_history, acc_history, training_time_per_epoch_global
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="Logit")
parser.add_argument(
"--epochs",
type=int,
default=100,
help="The number of epochs to train.",
)
parser.add_argument(
"--batch_size",
type=int,
default=10,
help="The batch size.",
)
args = parser.parse_args()
comm = MPI.COMM_WORLD # Set up communicator.
size, rank = comm.size, comm.rank
if rank == 0:
print(
"################################\n"
"# Parallel Logistic Regression #\n"
"################################"
)
print(
f"We train for {args.epochs} epochs with an effective batch size of {args.batch_size}."
)
path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/logit_data_n100000_d2.h5"
with h5py.File(path, "r") as f: # Load data in sample-parallel fashion.
chunk = int(f["data"].shape[0] / size)
if rank == size - 1:
data = np.array(f["data"][rank * chunk :])
labels = np.array(f["labels"][rank * chunk :])
else:
data = np.array(f["data"][rank * chunk : (rank + 1) * chunk])
labels = np.array(f["labels"][rank * chunk : (rank + 1) * chunk])
print(
f"Rank {rank}/{size}: Local data has {data.shape[0]} samples with {data.shape[1]} features and "
f"{labels.shape[0]} labels.\n0th elements are: {data[0]}\n{labels[0]}"
)
# Bias trick: Prepend data with 1's for additional bias dimension.
ones = np.ones(
(
data.shape[0],
1,
)
)
data_bt = np.hstack([ones, data])
# Initialize model parameters randomly.
# After bias trick, weights have shape [n_features+1, ]
if rank == 0:
weights = np.random.rand(data_bt.shape[1])
else:
weights = np.zeros(data_bt.shape[1])
# Broadcast weights from root to other processors.
comm.Bcast(weights, root=0)
b_local = args.batch_size // size # Calculate local batch size.
print(f"Rank {rank}/{size}: Local batch size is {b_local}.")
# Train model.
(weights, loss_history, acc_history, training_time_per_epoch) = lr_train(
weights, data_bt, labels, b=b_local, epochs=args.epochs
)
if rank == 0:
print(
f"Final loss: {loss_history[-1]}, final accuracy: {acc_history[-1]}\n"
f"Average training time per epoch: {training_time_per_epoch} s"
)
import argparse
import time
from typing import Union, Tuple
import h5py
import numpy as np
np.random.seed(842424) # Fix random seed for reproducibility.
def sigmoid(z: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Compute sigmoid.
Parameters
----------
z : float
The input for the sigmoid function.
Returns
----------
float
The input's sigmoid function value.
"""
return 1.0 / (1.0 + np.exp(-z))
def lr_predict(w: np.ndarray, x: np.ndarray) -> np.ndarray:
"""
Return prediction of logit model for data x using model weights w.
Parameters
----------
x : np.ndarray[float]
The dataset (after bias trick), shape = [n_samples, n_features +1].
The 0th input should be 1.0 to take the bias into account in a simple dot product.
w : np.ndarray[float]
The parameters, i.e., weights to be learned (after bias trick), shape = [n_features + 1, ].
There is one weight for every input dimension plus a bias.
Returns
-------
np.ndarray[float]
The predicted activations of the logit model for the input dataset, shape = [n_samples, ],
i.e., the sigmoid of the dot product of the weights and the input data.
"""
return sigmoid(x @ w)
def mse(y_est: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Compute mean-square-error loss.
Parameters
----------
y_est : np.ndarray[float]
The predictions, shape = [n_samples, ].
y : np.ndarray[float]
The ground-truth labels, shape = [n_samples, ].
Returns
----------
np.ndarray[float]
MSE loss
"""
return (
(1.0 / y.shape[0]) * (y - y_est).T @ (y - y_est)
) # Return MSE loss for considered batch.
def lr_loss(
w: np.ndarray, x: np.ndarray, y: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Return the loss and the gradient with respect to the weights.
Parameters
----------
w : np.ndarray[float]
The model's weights to be learned, where weights[0] is the bias.
x : np.ndarray[float]
The input data of shape [N x D+1], 0th element of each sample is assumed to be 1 (bias trick).
y : np.ndarray[float]
The ground-truth labels of shape [N,].
Returns
-------
np.ndarray[float]
The scalar mean-square-error loss for the input batch of samples.
np.ndarray[float]
The gradient of the loss with respect to the weights for the batch.
"""
y_est = lr_predict(w, x) # Compute logit prediction for all samples in batch.
loss = mse(y_est, y) # Compute MSE loss over all samples in batch.
# Compute gradient vector of loss w.r.t. weights.
gradient = (
(-2.0 / y.shape[0]) * ((y - y_est) * y_est * y_est * np.exp(-x @ w)).T @ x
)
return loss, gradient
def lr_train(
w: np.ndarray,
x: np.ndarray,
y: np.ndarray,
epochs: int = 100,
eta: float = 0.001,
b: int = 10,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""
Train the model, i.e., update the weights following the negative gradient until the model converges.
Parameters
----------
w : np.ndarray[float]
The model weights to be learned, where weights[0] is the bias.
x : np.ndarray[float]
The input data of shape [N x D+1], where each sample's 0th element is assumed to be 1 for bias trick.
y : np.ndarray[float]
The ground-truth labels of shape [N,].
epochs : int
The number of epochs to be trained.
eta : float
The learning rate.
b : int
The batch size.
Returns
-------
np.ndarray[float]
The trained weights.
np.ndarray[float]
The history array with each epoch's loss.
np.ndarray[float]
The history array with each epoch's accuracy.
float
The average training time per epoch.
"""
n_samples = y.shape[0] # Determine number of samples in batch.
n_batches = n_samples // b # Determine number of full batches in data (drop last).
print(f"Data is divided into {n_batches} batches.")
loss_history = np.zeros(epochs)
acc_history = np.zeros(epochs)
training_time_per_epoch = 0.0 # Initiate training time per epoch.
for epoch in range(epochs): # Loop over epochs.
# The number of epochs is a hyperparameter of gradient descent
# that controls the number of complete passes through the training dataset.
# The batch size is a hyperparameter of gradient descent
# that controls the number of training samples to work through before the
# model’s internal parameters are updated.
loss_sum = 0.0 # Initiate loss for each epoch.
accuracy = 0.0 # Initiate accuracy for each epoch.
start = time.perf_counter() # Start timer.
for nb in range(n_batches):
x_ = x[nb * b : (nb + 1) * b]
y_ = y[nb * b : (nb + 1) * b]
loss, gradient = lr_loss(w, x_, y_)
loss_sum += loss
corr = np.sum((lr_predict(w, x_) + 0.5).astype(int) == y_)
accuracy += corr
w -= eta * gradient
end = time.perf_counter() # Stop timer.
# Calculate loss + accuracy after each epoch.
loss_sum /= n_batches
accuracy /= n_samples
accuracy *= 100
# Append loss + accuracy of current epoch to history arrays.
loss_history[epoch] = loss_sum
acc_history[epoch] = accuracy
training_time_per_epoch += end - start
# Print every tenth epoch the training status.
if epochs < 100:
mod = 5
elif 100 <= epochs < 1000:
mod = 10
elif 1000 <= epochs < 10000:
mod = 100
elif 10000 <= epochs < 100000:
mod = 1000
else:
mod = 10000
if epoch % mod == 0:
print(f"Epoch: {epoch}, Loss: {loss_sum}, Accuracy: {accuracy}")
training_time_per_epoch /= epochs
return w, loss_history, acc_history, training_time_per_epoch
if __name__ == "__main__":
path = "/pfs/work7/workspace/scratch/ku4408-VL-ScalableAI/data/logit_data_n10000_d2.h5"
parser = argparse.ArgumentParser(prog="Logit")
parser.add_argument(
"--epochs",
type=int,
default=100,
help="The number of epochs to train.",
)
parser.add_argument(
"--batch_size",
type=int,
default=10,
help="The batch size.",
)
args = parser.parse_args()
with h5py.File(path, "r") as f:
data = np.array(f["data"])
labels = np.array(f["labels"])
print(
f"We have {data.shape[0]} samples with {data.shape[1]} features and {labels.shape[0]} labels."
)
# Bias trick: Prepend data with 1's for additional bias dimension.
ones = np.ones(
(
data.shape[0],
1,
)
)
data_bt = np.hstack([ones, data])
weights = np.random.rand(data_bt.shape[1]) # Initialize model parameters randomly.
weights, loss_history, acc_history, time_per_epoch = lr_train(
weights, data_bt, labels, epochs=args.epochs, b=args.batch_size
)
print(f"Final loss is {loss_history[-1]}, final accuracy is {acc_history[-1]}.")
print(f"Training time per epoch is {time_per_epoch} s.")
This diff is collapsed.
#!/bin/bash
#SBATCH --job-name=logit_parallel # Job name
#SBATCH --partition=dev_multiple # Queue for the resource allocation
#SBATCH --nodes=4 # Number of nodes
#SBATCH --time=5:00 # Wall-clock time limit
#SBATCH --cpus-per-task=40 # Number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=40
export VENVDIR=<path/to/your/venv> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
mpirun python ${PYDIR}/logit_parallel.py --epochs 100 --batch_size 100
#!/bin/bash
#SBATCH --job-name=logit_serial # Job name
#SBATCH --partition=dev_single # Queue for the resource allocation.
#SBATCH --time=5:00 # Wall-clock time limit
#SBATCH --cpus-per-task=40 # Number of CPUs required per MPI task
#SBATCH --ntasks-per-node=1 # Maximum count of tasks per node
#SBATCH --mail-type=ALL # Notify user by email when certain event types occur.
export OMP_NUM_THREADS=40
export VENVDIR=<path/to/your/venv> # Export path to your virtual environment.
export PYDIR=<path/to/your/python/script> # Export path to directory containing Python script.
# Set up modules.
module purge # Unload all currently loaded modules.
module load compiler/gnu/13.3 # Load required modules.
module load mpi/openmpi/4.1
module load devel/cuda/12.4
module load lib/hdf5/1.14.4-gnu-13.3-openmpi-4.1
source ${VENVDIR}/bin/activate # Activate your virtual environment.
python ${PYDIR}/logit_serial.py --epochs 100 --batch_size 10
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment