Gradient Descent
basic version: just calculate gradient, scaling it with learning rate then update
import numpy as np
# --- Batch Gradient Descent ---
class BatchGD:
def __init__(
self, max_iter: int = 1000, eta: float = 0.001, tol: float = 1e-6
) -> None:
# Initialize parameters for the gradient descent
self.max_iter = max_iter # Maximum number of iterations
self.eta = eta # Learning rate
self.tol = tol # Tolerance for stopping criterion
self.weights = self.bias = None # Initialize weights and bias
def fit(self, X: np.ndarray, y: np.ndarray) -> None:
m, n = X.shape # Get the number of samples (m) and features (n)
self.weights = np.random.rand(n) # Initialize weights randomly
self.bias = np.random.rand(1)[0] # Initialize bias randomly
costs = [] # List to store cost values for each iteration
for i in range(1, self.max_iter + 1):
# Calculate predictions
y_pred = np.dot(X, self.weights) + self.bias # $y = wx + b$
# Calculate mean squared error
mse = (1 / m) * np.sum(np.square(y_pred - y))
costs.append(mse) # Store the cost
# Calculate gradients
dw = (2 / m) * np.dot(X.T, y_pred - y) # Gradient for weights
db = (2 / m) * np.sum(y_pred - y) # Gradient for bias
temp = self.weights # Store current weights for convergence check
# Update weights and bias
self.weights = self.weights - self.eta * dw
self.bias = self.bias - self.eta * db
if i % 100 == 0:
# Print cost every 100 iterations
print(f"Epoch: {i}\tCost: {mse}")
# Check for convergence
if all(abs(self.weights - temp) <= self.tol):
print(
f"Stopped at iteration {i} !"
) # Stop if weights change is below tolerance
break
print(
f"dim of each var:\t weight:{self.weights.shape}\t bias: {self.bias.shape}\t X:{X.shape}\t y:{y.shape}\t dw:{dw.shape}\t db: {db.shape}"
)
def predict(self, X: np.ndarray) -> np.ndarray:
# Make predictions using the learned weights and bias
y_pred = np.dot(X, self.weights) + self.bias
return y_pred # Return the predicted values
# --- Stochastic Gradient Descent ---
class StochasticGD:
def __init__(self, max_iter=100, t0=5, t1=50, tol=1e-7):
self.max_iter = max_iter
self.tol = tol
self.t0, self.t1 = 5, 50
self.weights = self.bias = None
def fit(self, X: np.ndarray, y: np.ndarray) -> None:
m, n = X.shape
self.weights = np.random.rand(n)
self.bias = np.random.rand(1)[0]
losses, flag = [], False
for epoch in range(1, self.max_iter + 1):
perm_idx = np.random.permutation(m)
X, y = X[perm_idx], y[perm_idx]
if epoch % 10 == 0:
print(f"Epoch: {epoch}\tLoss: {losses[epoch - 1]}")
for i in range(m):
rand_idx = np.random.randint(m)
X_i = X[rand_idx : rand_idx + 1]
y_i = y[rand_idx : rand_idx + 1]
y_pred = np.dot(X_i, self.weights) + self.bias
loss = np.sum(np.square(y_pred - y_i))
losses.append(loss)
dw = np.dot(X_i.T, y_pred - y_i)
db = np.sum(y_pred - y_i)
temp = self.weights
eta = self.learning_schedule(m * epoch + i)
self.weights = self.weights - eta * dw
self.bias = self.bias - eta * db
if all(abs(self.weights - temp) <= self.tol):
print(f"Stopped at iteration {epoch} !")
flag = True
break
def learning_schedule(self, t: int) -> float:
return self.t0 / (t + self.t1) # learning rate will get smaller
def predict(self, X: np.ndarray) -> np.ndarray:
y_pred = np.dot(X, self.weights) + self.bias
return y_pred
# --- Mini-Batch Gradient Descent ---
class MiniBatchGD:
def __init__(self, max_iter=100, eta=0.001, tol=1e-7, batch_size=32):
self.max_iter = max_iter
self.tol = tol
self.eta = eta
self.batch_size = batch_size
self.weights = self.bias = None
def fit(self, X: np.ndarray, y: np.ndarray) -> None:
m, n = X.shape
self.weights = np.random.rand(n)
self.bias = np.random.rand(1)[0]
costs, flag = [], False
for epoch in range(1, self.max_iter + 1):
# shuffle first
perm_idx = np.random.permutation(m)
X, y = X[perm_idx], y[perm_idx]
if flag:
break
if epoch % 100 == 0:
print(f"Epoch: {epoch}\tLoss: {costs[epoch - 1]}")
for i in range(0, m, self.batch_size):
# each time -> take a batch
X_mini = X[i : i + self.batch_size]
y_mini = y[i : i + self.batch_size]
y_pred = np.dot(X_mini, self.weights) + self.bias
cost = (
1 / self.batch_size * np.sum(np.square(y_pred - y_mini))
) # sum over a batch to get a total loss
costs.append(cost)
dw = (
2 / self.batch_size * np.dot(X_mini.T, y_pred - y_mini)
) # (n, m) @ (m, 1) -> (n,1), aka size of w
db = 2 / self.batch_size * np.sum(y_pred - y_mini)
temp = self.weights
self.weights = self.weights - self.eta * dw
self.bias = self.bias - self.eta * db
if all(abs(self.weights - temp) <= self.tol):
print(f"Stopped at iteration {epoch} !")
flag = True
break
def predict(self, X: np.ndarray) -> np.ndarray:
y_pred = np.dot(X, self.weights) + self.bias
return y_pred
comparison
- BatchGD: take entire batch and calculate gradient
- SGD: take random example
- Mini-BatchGD: take part of batch, middle ground of previous two
| BatchGD | SGD | Mini-BatchGD | |
|---|---|---|---|
| fast? | slow | fastest | in between |
| optimal | gurantee | no guarantee | no guarantee |
| need shuffle | no | yes | yes |
| local minimal | yes | no | yes |
gradient exploding and clipping
why exploding:
- high learning rate: update to weight is too large
- model structure: RNN, LSTM, error will accumulate
- loss function
- preprocessing of data (no normalization)
two common approaches:
- gradient clipping (exceeds upper-bound/lower-bound ➡ clipping)
- gradient scaling (typically L2 norm exceeds threshold ➡ rescale)