Gradient Descent

basic version: just calculate gradient, scaling it with learning rate then update

import numpy as np
 
 
# --- Batch Gradient Descent ---
class BatchGD:
 
    def __init__(
        self, max_iter: int = 1000, eta: float = 0.001, tol: float = 1e-6
    ) -> None:
        # Initialize parameters for the gradient descent
        self.max_iter = max_iter  # Maximum number of iterations
        self.eta = eta  # Learning rate
        self.tol = tol  # Tolerance for stopping criterion
        self.weights = self.bias = None  # Initialize weights and bias
 
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        m, n = X.shape  # Get the number of samples (m) and features (n)
        self.weights = np.random.rand(n)  # Initialize weights randomly
        self.bias = np.random.rand(1)[0]  # Initialize bias randomly
        costs = []  # List to store cost values for each iteration
 
        for i in range(1, self.max_iter + 1):
            # Calculate predictions
            y_pred = np.dot(X, self.weights) + self.bias  # $y = wx + b$
            # Calculate mean squared error
            mse = (1 / m) * np.sum(np.square(y_pred - y))
            costs.append(mse)  # Store the cost
 
            # Calculate gradients
            dw = (2 / m) * np.dot(X.T, y_pred - y)  # Gradient for weights
            db = (2 / m) * np.sum(y_pred - y)  # Gradient for bias
 
            temp = self.weights  # Store current weights for convergence check
            # Update weights and bias
            self.weights = self.weights - self.eta * dw
            self.bias = self.bias - self.eta * db
 
            if i % 100 == 0:
                # Print cost every 100 iterations
                print(f"Epoch: {i}\tCost: {mse}")
 
            # Check for convergence
            if all(abs(self.weights - temp) <= self.tol):
                print(
                    f"Stopped at iteration {i} !"
                )  # Stop if weights change is below tolerance
                break
        print(
            f"dim of each var:\t weight:{self.weights.shape}\t bias: {self.bias.shape}\t X:{X.shape}\t y:{y.shape}\t dw:{dw.shape}\t db: {db.shape}"
        )
 
    def predict(self, X: np.ndarray) -> np.ndarray:
        # Make predictions using the learned weights and bias
        y_pred = np.dot(X, self.weights) + self.bias
        return y_pred  # Return the predicted values
 
 
# --- Stochastic Gradient Descent ---
class StochasticGD:
 
    def __init__(self, max_iter=100, t0=5, t1=50, tol=1e-7):
        self.max_iter = max_iter
        self.tol = tol
        self.t0, self.t1 = 5, 50
        self.weights = self.bias = None
 
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        m, n = X.shape
        self.weights = np.random.rand(n)
        self.bias = np.random.rand(1)[0]
        losses, flag = [], False
 
        for epoch in range(1, self.max_iter + 1):
            perm_idx = np.random.permutation(m)
            X, y = X[perm_idx], y[perm_idx]
 
            if epoch % 10 == 0:
                print(f"Epoch: {epoch}\tLoss: {losses[epoch - 1]}")
 
            for i in range(m):
                rand_idx = np.random.randint(m)
                X_i = X[rand_idx : rand_idx + 1]
                y_i = y[rand_idx : rand_idx + 1]
 
                y_pred = np.dot(X_i, self.weights) + self.bias
                loss = np.sum(np.square(y_pred - y_i))
                losses.append(loss)
 
                dw = np.dot(X_i.T, y_pred - y_i)
                db = np.sum(y_pred - y_i)
 
                temp = self.weights
                eta = self.learning_schedule(m * epoch + i)
 
                self.weights = self.weights - eta * dw
                self.bias = self.bias - eta * db
 
                if all(abs(self.weights - temp) <= self.tol):
                    print(f"Stopped at iteration {epoch} !")
                    flag = True
                    break
 
    def learning_schedule(self, t: int) -> float:
        return self.t0 / (t + self.t1)  # learning rate will get smaller
 
    def predict(self, X: np.ndarray) -> np.ndarray:
        y_pred = np.dot(X, self.weights) + self.bias
        return y_pred
 
 
# --- Mini-Batch Gradient Descent ---
class MiniBatchGD:
 
    def __init__(self, max_iter=100, eta=0.001, tol=1e-7, batch_size=32):
        self.max_iter = max_iter
        self.tol = tol
        self.eta = eta
        self.batch_size = batch_size
        self.weights = self.bias = None
 
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        m, n = X.shape
        self.weights = np.random.rand(n)
        self.bias = np.random.rand(1)[0]
        costs, flag = [], False
 
        for epoch in range(1, self.max_iter + 1):
            # shuffle first
            perm_idx = np.random.permutation(m)
            X, y = X[perm_idx], y[perm_idx]
 
            if flag:
                break
 
            if epoch % 100 == 0:
                print(f"Epoch: {epoch}\tLoss: {costs[epoch - 1]}")
 
            for i in range(0, m, self.batch_size):
                # each time -> take a batch
                X_mini = X[i : i + self.batch_size]
                y_mini = y[i : i + self.batch_size]
 
                y_pred = np.dot(X_mini, self.weights) + self.bias
                cost = (
                    1 / self.batch_size * np.sum(np.square(y_pred - y_mini))
                )  # sum over a batch to get a total loss
                costs.append(cost)
 
                dw = (
                    2 / self.batch_size * np.dot(X_mini.T, y_pred - y_mini)
                )  # (n, m) @ (m, 1) -> (n,1), aka size of w
                db = 2 / self.batch_size * np.sum(y_pred - y_mini)
 
                temp = self.weights
                self.weights = self.weights - self.eta * dw
                self.bias = self.bias - self.eta * db
 
                if all(abs(self.weights - temp) <= self.tol):
                    print(f"Stopped at iteration {epoch} !")
                    flag = True
                    break
 
    def predict(self, X: np.ndarray) -> np.ndarray:
        y_pred = np.dot(X, self.weights) + self.bias
        return y_pred
 

comparison

  • BatchGD: take entire batch and calculate gradient
  • SGD: take random example
  • Mini-BatchGD: take part of batch, middle ground of previous two
BatchGDSGDMini-BatchGD
fast?slowfastestin between
optimalguranteeno guaranteeno guarantee
need shufflenoyesyes
local minimalyesnoyes

gradient exploding and clipping

why exploding:

  • high learning rate: update to weight is too large
  • model structure: RNN, LSTM, error will accumulate
  • loss function
  • preprocessing of data (no normalization)

two common approaches:

  1. gradient clipping (exceeds upper-bound/lower-bound ➡ clipping)
  2. gradient scaling (typically L2 norm exceeds threshold ➡ rescale)