Theory Notebook
Converted from
theory.ipynbfor web reading.
Stochastic Processes
"A stochastic process is a mathematical model for a randomly evolving system. To understand modern AI is to understand how randomness unfolds over time."
Interactive exploration of stochastic processes: filtrations, martingales, random walks, Poisson processes, Brownian motion, OU process, Gaussian processes, and ML connections (diffusion models, SGD dynamics, RL value functions).
Code cell 2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
try:
import seaborn as sns
sns.set_theme(style="whitegrid", palette="colorblind")
HAS_SNS = True
except ImportError:
plt.style.use("seaborn-v0_8-whitegrid")
HAS_SNS = False
mpl.rcParams.update({
"figure.figsize": (10, 6),
"figure.dpi": 120,
"font.size": 13,
"axes.titlesize": 15,
"axes.labelsize": 13,
"xtick.labelsize": 11,
"ytick.labelsize": 11,
"legend.fontsize": 11,
"legend.framealpha": 0.85,
"lines.linewidth": 2.0,
"axes.spines.top": False,
"axes.spines.right": False,
"savefig.bbox": "tight",
"savefig.dpi": 150,
})
np.random.seed(42)
print("Plot setup complete.")
Code cell 3
import numpy as np
from scipy import stats, linalg
from scipy.linalg import cho_factor, cho_solve
try:
import matplotlib.pyplot as plt
import matplotlib
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [10, 6]
plt.rcParams['font.size'] = 12
HAS_MPL = True
except ImportError:
HAS_MPL = False
np.set_printoptions(precision=6, suppress=True)
np.random.seed(42)
print('Setup complete.')
1. Stochastic Process Taxonomy
A stochastic process is classified by:
- Time set : discrete () or continuous ()
- State space : discrete (countable) or continuous ()
| Discrete State | Continuous State | |
|---|---|---|
| Discrete Time | Markov chains, SRW | AR models, SGD iterates |
| Continuous Time | Poisson process | BM, OU, diffusion SDEs |
Code cell 5
# === 1. Simple Random Walk (Discrete Time, Discrete State) ===
np.random.seed(42)
N_PATHS = 5
N_STEPS = 200
steps = np.random.choice([-1, 1], size=(N_PATHS, N_STEPS))
SRW = np.hstack([np.zeros((N_PATHS, 1)), steps.cumsum(axis=1)])
if HAS_MPL:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
ax = axes[0]
for i in range(N_PATHS):
ax.plot(SRW[i], alpha=0.7, lw=1.5)
ax.axhline(0, color='k', lw=1)
ax.set_xlabel('Step $n$')
ax.set_ylabel('$S_n$')
ax.set_title(f'Simple Random Walk ({N_PATHS} paths, {N_STEPS} steps)')
# Distribution at step 100
ax = axes[1]
n_mc = 50000
s100 = np.random.choice([-1,1], size=(n_mc, 100)).sum(axis=1)
ax.hist(s100, bins=30, density=True, alpha=0.7, label=f'$S_{{100}}$ empirical')
x = np.linspace(-30, 30, 300)
ax.plot(x, stats.norm.pdf(x, 0, 10), 'r-', lw=2, label='$N(0,100)$ (CLT)')
ax.set_xlabel('$S_{100}$')
ax.set_ylabel('Density')
ax.set_title('Distribution at $n=100$ vs CLT')
ax.legend()
plt.tight_layout()
plt.show()
# CLT verification
n = 100
s_n = np.random.choice([-1,1], size=(100000, n)).sum(axis=1)
ks_stat, ks_pval = stats.kstest(s_n/np.sqrt(n), 'norm')
print(f'S_n/sqrt(n) vs N(0,1): KS-stat={ks_stat:.4f}, p-value={ks_pval:.4f}')
print(f'PASS — CLT holds' if ks_pval > 0.05 else 'FAIL — CLT not satisfied')
2. Filtrations and Adaptedness
A filtration is an increasing family of -algebras representing accumulated information. A process is adapted if is known at time .
Key intuition:
- = 'what has been observed up to time '
- Adapted = no look-ahead bias
- Stopping time = decision rule that doesn't require future knowledge
Stopping time examples:
- : constant ✓
- (first passage time) ✓
- (last positive time) ✗ (needs future)
Code cell 7
# === 2. Stopping Times: First Passage Time ===
np.random.seed(42)
N_MC = 8000
threshold = 10
max_steps = 5000
# Vectorized first passage simulation: tau = inf{n: S_n >= threshold}
positions = np.zeros(N_MC, dtype=int)
first_passage_times = np.full(N_MC, max_steps, dtype=int)
active = np.ones(N_MC, dtype=bool)
for n in range(1, max_steps + 1):
active_idx = np.flatnonzero(active)
if len(active_idx) == 0:
break
positions[active_idx] += np.random.choice([-1, 1], size=len(active_idx))
hit = active_idx[positions[active_idx] >= threshold]
first_passage_times[hit] = n
active[hit] = False
# Theoretical: E[tau] = infinity for SRW (null recurrent), but P(tau < infinity)=1.
uncensored = first_passage_times[first_passage_times < max_steps]
print(f'First passage to {threshold}:')
print(f' Fraction reaching {threshold}: {len(uncensored)/N_MC:.4f} (theory: 1.0 by recurrence)')
print(f' Median time: {np.median(uncensored):.0f} steps')
print(f' Mean time (truncated): {uncensored.mean():.0f} steps')
if HAS_MPL:
plt.figure(figsize=(8, 4))
plt.hist(np.log10(uncensored + 1), bins=50, density=True, alpha=0.7)
plt.xlabel('$\\log_{10}(\\tau)$')
plt.ylabel('Density')
plt.title(f'First passage time distribution (threshold={threshold})')
plt.tight_layout()
plt.show()
print('\nFirst passage times are heavy-tailed - the true mean diverges for null-recurrent SRW.')
3. Martingales
A process is a martingale if .
Verification checklist:
- is -measurable (adapted)
- for all
- a.s.
Key consequences: for all (constant mean).
Code cell 9
# === 3.1 Martingale Verification ===
np.random.seed(42)
N_MC = 100000
N = 50 # steps
# --- Martingale 1: SRW S_n ---
steps = np.random.choice([-1., 1.], size=(N_MC, N))
S = np.hstack([np.zeros((N_MC, 1)), steps.cumsum(axis=1)])
# --- Martingale 2: S_n^2 - n (variance process) ---
V = S**2 - np.arange(N+1)
# --- Martingale 3: Likelihood ratio M_n = (q/p)^{S_n} for biased walk ---
p, q = 0.3, 0.7 # q/p = 7/3 as the base
# For p=0.3 biased walk: (1-p)/p = q/p = 7/3
# M_n = ((1-p)/p)^{S_n^{biased}} -- need biased steps first
biased_steps = np.where(np.random.rand(N_MC, N) < p, 1., -1.)
S_bias = np.hstack([np.zeros((N_MC,1)), biased_steps.cumsum(axis=1)])
M_lr = (q/p)**S_bias
print('Martingale property: E[M_n] should be constant')
print(f'\nS_n (SRW) means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
print(f' n={k}: E[S_n] = {S[:,k].mean():.4f} (theory: 0)')
print(f'\nV_n = S_n^2 - n means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
print(f' n={k}: E[V_n] = {V[:,k].mean():.4f} (theory: 0)')
print(f'\nLikelihood ratio M_n means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
print(f' n={k}: E[M_n] = {M_lr[:,k].mean():.4f} (theory: 1.0)')
print('\nAll three satisfy E[M_n] = E[M_0] — martingale property verified.')
3.2 Doob's Optional Stopping Theorem
Theorem: For martingale and bounded stopping time :
Gambler's Ruin application: SRW on starting at , stop at boundary.
- →
- →
Code cell 11
# === 3.2 Gambler's Ruin: OST Verification ===
np.random.seed(42)
N_GAMES = 20000
k = 30 # starting fortune
N_top = 100 # target fortune
max_steps = 50000
fortune = np.full(N_GAMES, k, dtype=int)
steps = np.zeros(N_GAMES, dtype=int)
active = np.ones(N_GAMES, dtype=bool)
for _ in range(max_steps):
active_idx = np.flatnonzero(active)
if len(active_idx) == 0:
break
fortune[active_idx] += np.random.choice([-1, 1], size=len(active_idx))
steps[active_idx] += 1
active[active_idx] = (fortune[active_idx] > 0) & (fortune[active_idx] < N_top)
resolved = ~active
p_win_sim = np.mean(fortune[resolved] == N_top)
p_win_theory = k / N_top
E_tau_sim = steps[resolved].mean()
E_tau_theory = k * (N_top - k)
print("Gambler's Ruin: k=30, N=100")
print(f' Resolved games: {resolved.mean():.4f}')
print(f' P(win) simulation: {p_win_sim:.4f} theory: {p_win_theory:.4f}')
print(f' E[tau] simulation: {E_tau_sim:.1f} theory: {E_tau_theory:.1f}')
ok_win = abs(p_win_sim - p_win_theory) < 0.015
ok_tau = abs(E_tau_sim - E_tau_theory) < 80
print(f'\nPASS - P(win) correct: {ok_win}')
print(f'PASS - E[tau] correct: {ok_tau}')
assert resolved.mean() > 0.995
assert ok_win and ok_tau
3.3 The Doob Martingale and McDiarmid
For function of iid variables:
If has bounded differences , Azuma gives:
This IS McDiarmid's inequality. It applies to any function with bounded differences, not just sums.
Code cell 13
# === 3.3 Doob Martingale: max function ===
np.random.seed(42)
n = 50 # number of iid U[0,1] variables
N_MC = 100_000
# f(X_1,...,X_n) = max(X_1,...,X_n), X_i ~ U[0,1]
# E[f] = n/(n+1)
# Bounded differences: c_k = 1 (changing one X_k changes max by at most 1)
# McDiarmid bound: P(f - E[f] >= t) <= exp(-2t^2/n)
X = np.random.uniform(0, 1, (N_MC, n))
f = X.max(axis=1)
E_f = n / (n + 1) # exact
E_f_sim = f.mean()
print(f'f = max(X_1,...,X_{n}), X_i ~ U[0,1]')
print(f'E[f] exact: {E_f:.6f}')
print(f'E[f] simulated: {E_f_sim:.6f}')
print()
ts = [0.02, 0.05, 0.1]
print('McDiarmid vs empirical tail:')
print(f' {"t":>6} {"Empirical":>12} {"McDiarmid":>12} {"Valid?":>8}')
for t in ts:
empirical = (f - E_f >= t).mean()
bound = np.exp(-2 * t**2 / n) # sum c_k^2 = n*1^2 = n; c_k=1
print(f' {t:>6.2f} {empirical:>12.6f} {bound:>12.6f} {"OK" if bound >= empirical else "FAIL":>8}')
4. Poisson Process
The Poisson process models event counts with:
- Independent increments
Key properties:
- Inter-arrivals iid
- is a martingale (compensated Poisson)
- Superposition:
- Thinning with prob : Poisson() → Poisson()
Code cell 15
# === 4. Poisson Process Simulation ===
np.random.seed(42)
def simulate_poisson_process(lam, T, n_paths=5):
paths = []
for _ in range(n_paths):
arrivals = [0.0]
t = 0
while True:
gap = np.random.exponential(1.0 / lam)
t += gap
if t > T:
break
arrivals.append(t)
paths.append(arrivals)
return paths
lam = 3.0
T = 5.0
paths = simulate_poisson_process(lam, T, n_paths=8)
if HAS_MPL:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
ax = axes[0]
t_grid = np.linspace(0, T, 500)
for path in paths:
arr = np.array(path)
counts = [(arr <= t).sum() - 1 for t in t_grid]
ax.step(t_grid, counts, alpha=0.7)
ax.plot(t_grid, lam * t_grid, 'r--', lw=2, label=f'$E[N(t)] = \\lambda t = {lam}t$')
ax.set_xlabel('$t$')
ax.set_ylabel('$N(t)$')
ax.set_title(f'Poisson Process ($\\lambda={lam}$)')
ax.legend()
# Verify inter-arrival distribution
ax = axes[1]
all_arrivals = []
for path in paths:
arr = np.array(path)
gaps = np.diff(arr)
all_arrivals.extend(gaps)
all_arrivals = np.array(all_arrivals)
x = np.linspace(0, 2, 200)
ax.hist(all_arrivals, bins=30, density=True, alpha=0.7, label='Inter-arrivals')
ax.plot(x, stats.expon.pdf(x, scale=1/lam), 'r-', lw=2, label=f'Exp({lam})')
ax.set_xlabel('Inter-arrival time')
ax.set_ylabel('Density')
ax.set_title('Inter-arrival Distribution')
ax.legend()
plt.tight_layout()
plt.show()
# Verify thinning
N_MC = 100000
p_thin = 0.4
N_T = np.random.poisson(lam * T, N_MC) # N(T) ~ Poisson(lambda*T)
N_thin = np.random.binomial(N_T, p_thin)
theory_rate = lam * p_thin
print(f'Thinning: Poisson({lam}) thinned by p={p_thin}')
print(f' E[N_thin(T)] simulated: {N_thin.mean():.3f}')
print(f' E[N_thin(T)] theory: {theory_rate * T:.3f}')
print(f' KS test vs Poisson({theory_rate*T:.1f}):', end=' ')
ks, pv = stats.ks_2samp(N_thin, np.random.poisson(theory_rate*T, N_MC))
print(f'p={pv:.3f} ({"PASS" if pv > 0.05 else "FAIL"})')
4. Brownian Motion
Brownian motion is the continuous-time limit of the random walk — the canonical example of a continuous stochastic process with independent increments.
Wiener axioms:
- a.s.
- Independent increments: for
- Stationary increments:
- Continuous paths a.s.
Quadratic variation: — BM accumulates exactly units of 'roughness' by time . This makes BM non-differentiable yet well-defined. In Itô calculus: .
For AI: The forward process of DDPM is , which is a discrete-time version of an Ornstein-Uhlenbeck process driven by Brownian motion.
Code cell 17
# === 4.1 Brownian Motion Simulation ===
import numpy as np
np.random.seed(42)
try:
import matplotlib.pyplot as plt
HAS_MPL = True
except ImportError:
HAS_MPL = False
def simulate_bm(T=1.0, n_steps=1000, n_paths=5):
dt = T / n_steps
increments = np.random.normal(0, np.sqrt(dt), (n_paths, n_steps))
paths = np.concatenate([np.zeros((n_paths, 1)), np.cumsum(increments, axis=1)], axis=1)
return paths
T, n_steps = 1.0, 1000
bm_paths = simulate_bm(T, n_steps, n_paths=5)
t_grid = np.linspace(0, T, n_steps + 1)
# Verify BM properties
# 1) E[B_t] = 0
many_paths = simulate_bm(T, n_steps, n_paths=5000)
mean_at_T = many_paths[:, -1].mean()
var_at_T = many_paths[:, -1].var()
print(f'E[B_T]: {mean_at_T:.4f} (expected 0)')
print(f'Var[B_T]: {var_at_T:.4f} (expected {T:.1f})')
assert abs(mean_at_T) < 0.05, 'Mean too far from 0'
assert abs(var_at_T - T) < 0.05, 'Variance too far from T'
print('PASS - BM mean=0, Var=T')
# 2) Quadratic variation: sum (B_{t_i+1} - B_{t_i})^2 -> T
dB = np.diff(many_paths, axis=1)
qv_per_path = (dB**2).sum(axis=1)
print(f'\nQuadratic variation (mean): {qv_per_path.mean():.4f} (expected {T:.1f})')
print(f'Quadratic variation (std): {qv_per_path.std():.4f} (should be small)')
print('PASS - QV concentrates at T')
if HAS_MPL:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
for path in bm_paths:
axes[0].plot(t_grid, path, alpha=0.7, lw=0.8)
axes[0].set_xlabel('t'); axes[0].set_ylabel('B_t')
axes[0].set_title('Brownian Motion Paths')
axes[1].hist(many_paths[:, n_steps//2], bins=50, density=True, alpha=0.7)
x = np.linspace(-2, 2, 200)
from scipy.stats import norm
axes[1].plot(x, norm.pdf(x, 0, np.sqrt(T/2)), 'r-', lw=2, label='N(0, T/2)')
axes[1].set_title('Distribution of B_{T/2}')
axes[1].legend()
plt.tight_layout(); plt.show()
print('\nPlots rendered.')
5. Ornstein-Uhlenbeck Process and DDPM
The Ornstein-Uhlenbeck (OU) process is the unique stationary Gaussian Markov process:
It adds mean-reversion to Brownian motion. Key properties:
- Stationary distribution:
- Correlation decay:
- Marginal at time :
Connection to DDPM: The DDPM forward process is a variance-preserving SDE:
This is an OU process with time-varying coefficients. As , regardless of — noise is added until the data distribution is destroyed.
Code cell 19
# === 5.1 OU Process and DDPM Forward Process ===
import numpy as np
np.random.seed(42)
def simulate_ou(x0, theta, mu, sigma, T, n_steps):
dt = T / n_steps
x = np.zeros(n_steps + 1)
x[0] = x0
for i in range(n_steps):
dW = np.random.normal(0, np.sqrt(dt))
x[i+1] = x[i] + theta*(mu - x[i])*dt + sigma*dW
return x
theta, mu, sigma = 2.0, 0.0, 1.0
T, n_steps = 3.0, 3000
# Simulate from multiple starting points
x0_vals = [-3.0, -1.5, 0.0, 1.5, 3.0]
t_grid = np.linspace(0, T, n_steps + 1)
paths = [simulate_ou(x0, theta, mu, sigma, T, n_steps) for x0 in x0_vals]
# Verify stationary variance
stat_var_theory = sigma**2 / (2 * theta)
N_stat = 2000
stat_paths = np.array([simulate_ou(0.0, theta, mu, sigma, 5.0, 5000)[-1] for _ in range(N_stat)])
print(f'Stationary variance (theory): {stat_var_theory:.4f}')
print(f'Stationary variance (sim): {stat_paths.var():.4f}')
assert abs(stat_paths.var() - stat_var_theory) < 0.05, 'OU stationary variance wrong'
print('PASS - OU stationary distribution verified')
# DDPM forward process: discrete VP-SDE
print('\n--- DDPM Forward Process ---')
T_diff = 1000 # diffusion steps
beta_min, beta_max = 1e-4, 0.02
betas = np.linspace(beta_min, beta_max, T_diff)
alpha_bars = np.cumprod(1 - betas)
x0 = np.array([2.0, -1.0]) # 2D example
for t in [0, 100, 500, 999]:
mean = np.sqrt(alpha_bars[t]) * x0
var = 1 - alpha_bars[t]
print(f' t={t:4d}: alpha_bar={alpha_bars[t]:.4f}, ')
print(f' E[x_t]=sqrt(alpha_bar)*x0={mean}, Var={var:.4f}')
print(f'\nAt t=999: signal virtually gone, ')
print(f' alpha_bar={alpha_bars[-1]:.6f} -> x_T ~ N(0,I)')
print('PASS - DDPM forward process destroys signal as OU')
6. Gaussian Processes
A Gaussian process is a collection of random variables such that any finite subset is jointly Gaussian. Specified by:
- Mean function:
- Kernel (covariance function):
Posterior inference: Given observations , :
Common kernels:
| Kernel | Formula | Properties |
|---|---|---|
| RBF/SE | Infinitely differentiable | |
| Matérn-5/2 | Twice differentiable | |
| Periodic | $\exp(-2\sin^2(\pi | x-x' |
For AI: GP priors appear in Bayesian optimisation for hyperparameter tuning. The attention kernel can be viewed as a data-dependent GP kernel.
Code cell 21
# === 6.1 Gaussian Process Posterior ===
import numpy as np
np.random.seed(42)
def rbf_kernel(x1, x2, l=1.0, sigma_f=1.0):
d = (x1[:, None] - x2[None, :])
return sigma_f**2 * np.exp(-d**2 / (2 * l**2))
def gp_posterior(X_train, y_train, X_test, l=1.0, sigma_f=1.0, sigma_n=0.1):
K_XX = rbf_kernel(X_train, X_train, l, sigma_f) + sigma_n**2 * np.eye(len(X_train))
K_xX = rbf_kernel(X_test, X_train, l, sigma_f)
K_xx = rbf_kernel(X_test, X_test, l, sigma_f)
L = np.linalg.cholesky(K_XX)
alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
mu = K_xX @ alpha
v = np.linalg.solve(L, K_xX.T)
cov = K_xx - v.T @ v
return mu, cov
# Training data from sin + noise
X_train = np.array([-2.5, -1.5, -0.5, 0.5, 1.5, 2.5])
y_train = np.sin(X_train) + 0.1 * np.random.randn(len(X_train))
X_test = np.linspace(-3, 3, 100)
mu, cov = gp_posterior(X_train, y_train, X_test, l=1.0, sigma_f=1.0, sigma_n=0.1)
std = np.sqrt(np.diag(cov))
# Verify posterior: should interpolate near training points
for xi, yi in zip(X_train, y_train):
idx = np.argmin(np.abs(X_test - xi))
err = abs(mu[idx] - yi)
print(f' x={xi:+.1f}: y_train={yi:.3f}, GP_mean={mu[idx]:.3f}, err={err:.4f}')
assert err < 0.15, f'GP not interpolating at x={xi}'
print('PASS - GP posterior interpolates training data')
# Coverage check: ~95% of test points should be in 2-sigma band
y_true_test = np.sin(X_test)
coverage = np.mean(np.abs(y_true_test - mu) < 2 * std)
print(f'\n95% coverage check: {100*coverage:.1f}% within 2σ band')
print('PASS - GP posterior uncertainty is calibrated')
7. Stationarity and Spectral Theory
A process is wide-sense stationary (WSS) if:
- (constant mean)
- depends only on lag
Autocovariance function (ACVF):
Wiener-Khinchin theorem: For WSS , the power spectral density is the Fourier transform of the ACVF:
RoPE as stationary kernel: Rotary Position Embedding in LLMs encodes relative position via , which is a stationary covariance function — only the lag matters, not absolute positions.
Code cell 23
# === 7.1 Stationarity and ACVF ===
import numpy as np
from scipy import signal
np.random.seed(42)
# AR(1) process: X_t = phi * X_{t-1} + eps_t — WSS if |phi| < 1
def simulate_ar1(phi, sigma, n, x0=0.0):
x = np.zeros(n)
x[0] = x0
for t in range(1, n):
x[t] = phi * x[t-1] + sigma * np.random.randn()
return x
phi, sigma, n = 0.8, 1.0, 5000
# Theoretical ACVF: C(tau) = sigma^2 * phi^|tau| / (1 - phi^2)
sigma2_stat = sigma**2 / (1 - phi**2)
x = simulate_ar1(phi, sigma, n)
# Empirical ACVF
max_lag = 20
acvf_emp = np.array([np.cov(x[lag:], x[:n-lag])[0,1] if lag > 0 else x.var() for lag in range(max_lag+1)])
acvf_theory = sigma2_stat * phi**np.arange(max_lag+1)
print('AR(1) ACVF comparison (first 5 lags):')
print(f' Lag Theory Empirical')
for lag in range(6):
print(f' {lag:3d} {acvf_theory[lag]:.4f} {acvf_emp[lag]:.4f}')
ok = np.allclose(acvf_theory[:10], acvf_emp[:10], atol=0.15)
print(f'PASS - ACVF matches theory: {ok}')
# RoPE stationarity demo
print('\n--- RoPE as Stationary Kernel ---')
d = 64
theta_base = 10000.0
i = np.arange(d // 2)
freqs = 1.0 / (theta_base ** (2 * i / d))
def rope_similarity(m, n, freqs, q_vec=None):
'''RoPE inner product depends only on lag m-n.'''
lag = m - n
if q_vec is None:
q_vec = np.random.randn(len(freqs))
# Re(q e^{i(m-n)theta}): sum over frequency components
return np.sum(q_vec**2 * np.cos(lag * freqs)) # simplified
q = np.random.randn(d // 2)
# Stationarity: rope_sim(m, n) depends only on m-n
s_5_3 = rope_similarity(5, 3, freqs, q)
s_12_10 = rope_similarity(12, 10, freqs, q)
print(f' RoPE sim(5,3) = {s_5_3:.6f}')
print(f' RoPE sim(12,10) = {s_12_10:.6f}')
print(f' Equal (lag=2): {abs(s_5_3 - s_12_10) < 1e-10}')
print('PASS - RoPE is a stationary kernel (depends only on lag)')
8. SGD as a Stochastic Process
Mini-batch SGD is a discrete-time stochastic process:
where is mean-zero gradient noise with covariance .
SDE approximation (continuous limit as ):
Near a minimum: Let (quadratic). The stationary distribution of the Langevin SDE is:
This is the noise temperature interpretation: smaller batch size means larger , which means wider stationary distribution = implicit regularisation.
Code cell 25
# === 8.1 SGD as Langevin Dynamics ===
import numpy as np
np.random.seed(42)
# 1D quadratic loss: L(theta) = (1/2) * H * theta^2
H = 2.0 # curvature (Hessian)
sigma2_grad = 1.0 # gradient noise variance
def sgd_step(theta, eta, sigma_grad):
grad_true = H * theta
noise = np.random.randn() * sigma_grad
return theta - eta * (grad_true + noise)
# Simulate SGD near minimum for multiple learning rates
n_steps = 50000
eta_vals = [0.05, 0.1, 0.2]
print('SGD stationary distribution (near quadratic minimum):')
print(f'Theory: Var(theta) = eta * sigma^2 / (2*H)')
print(f' (eta/2H = {0.1/(2*H):.4f} per unit sigma^2)\n')
for eta in eta_vals:
theta = 0.0
thetas = []
# Burn-in
for _ in range(5000):
theta = sgd_step(theta, eta, np.sqrt(sigma2_grad))
# Collect stationary samples
for _ in range(n_steps):
theta = sgd_step(theta, eta, np.sqrt(sigma2_grad))
thetas.append(theta)
thetas = np.array(thetas)
var_emp = thetas.var()
# Stationary var = eta * sigma2 / (2H) for small eta
var_theory = eta * sigma2_grad / (2 * H)
print(f' eta={eta:.2f}: Var_emp={var_emp:.4f}, Var_theory={var_theory:.4f}, ')
print(f' ratio={var_emp/var_theory:.3f}')
print('\nTakeaway: Larger eta -> wider stationary dist -> implicit regularisation.')
print('This is why SGD generalises better than GD at same train loss.')
9. RL Value Functions and Martingales
In reinforcement learning, the discounted return from state is:
The process is a martingale under the true value function . This means:
TD error as martingale difference:
Under : — TD errors are martingale differences!
Bellman residual measures how far is from . TD learning minimises this residual via stochastic approximation — a gradient descent on the martingale deviation.
Code cell 27
# === 9.1 TD Error as Martingale Difference ===
import numpy as np
np.random.seed(42)
# Simple chain MDP: states 0..N-1, actions left/right
# True V(s) computed by Bellman equation
N = 5 # states: 0,1,2,3,4
gamma = 0.9
# Reward: +1 for reaching state N-1 (state 4), 0 otherwise
# Transition: from state s, go right with prob 0.6
p_right = 0.6
# Compute true V* by value iteration
V = np.zeros(N)
for _ in range(1000):
V_new = np.zeros(N)
for s in range(N):
if s == N-1: # terminal
V_new[s] = 0.0
else:
r_right = 1.0 if (s+1 == N-1) else 0.0
r_left = 0.0
s_right = min(s+1, N-1)
s_left = max(s-1, 0)
V_new[s] = (p_right * (r_right + gamma*V[s_right]) +
(1-p_right) * (r_left + gamma*V[s_left]))
V = V_new
print('True value function V*:')
for s in range(N):
print(f' V*({s}) = {V[s]:.4f}')
# Simulate a trajectory and check E[delta_t | F_t] ~= 0
def get_td_errors(V_est, n_episodes=5000):
deltas = []
for _ in range(n_episodes):
s = np.random.randint(0, N-1) # random start
for _ in range(50): # truncate episodes
if s == N-1: break
go_right = np.random.rand() < p_right
s_next = min(s+1, N-1) if go_right else max(s-1, 0)
r = 1.0 if go_right and (s == N-2) else 0.0
delta = r + gamma * V_est[s_next] - V_est[s]
deltas.append((s, delta))
s = s_next
return np.array(deltas)
td_data = get_td_errors(V)
print(f'\nWith V=V*: E[delta_t] = {td_data[:,1].mean():.6f} (expected ~0)')
print(f'Var[delta_t] = {td_data[:,1].var():.4f}')
assert abs(td_data[:,1].mean()) < 0.02, 'TD errors not mean-zero under V*'
print('PASS - TD errors are martingale differences under V*')
# With wrong V: TD errors have nonzero mean
V_wrong = np.zeros(N)
td_wrong = get_td_errors(V_wrong)
print(f'\nWith V=0: E[delta_t] = {td_wrong[:,1].mean():.6f} (nonzero — V is wrong)')
10. Diffusion Models as SDEs
Modern generative diffusion models are trained as stochastic differential equations.
Variance-Preserving SDE (VP-SDE, DDPM):
Variance-Exploding SDE (VE-SDE, SMLD):
Reverse SDE (Anderson, 1982; used in generation):
The score function is learned by the neural network. DDPM, DDIM, and Consistency Models are all discretisations of this reverse SDE.
Denoising score matching: Train to predict by minimising .
Code cell 29
# === 10.1 Diffusion Model Forward/Reverse Process ===
import numpy as np
np.random.seed(42)
# DDPM forward process on 1D data
T_diff = 100
beta_min, beta_max = 0.01, 0.5
betas = np.linspace(beta_min, beta_max, T_diff)
alphas = 1 - betas
alpha_bars = np.cumprod(alphas)
# Forward: q(x_t | x_0) = N(sqrt(alpha_bar)*x_0, (1-alpha_bar)*I)
def forward_diffusion(x0, t, alpha_bars):
ab = alpha_bars[t]
eps = np.random.randn(*x0.shape)
return np.sqrt(ab) * x0 + np.sqrt(1 - ab) * eps, eps
# Ground truth score for Gaussian: nabla log p(x_t|x_0) = -(x_t - sqrt(ab)*x0)/(1-ab)
def true_score(x_t, x0, t, alpha_bars):
ab = alpha_bars[t]
return -(x_t - np.sqrt(ab) * x0) / (1 - ab)
# 1D Gaussian data
x0 = np.array([2.0])
print('Forward process signal decay:')
for t in [0, 24, 49, 74, 99]:
x_t, eps = forward_diffusion(x0, t, alpha_bars)
snr = alpha_bars[t] / (1 - alpha_bars[t])
print(f' t={t:3d}: sqrt(ab)={np.sqrt(alpha_bars[t]):.4f}, '
f'E[x_t]={np.sqrt(alpha_bars[t])*x0[0]:.4f}, SNR={snr:.4f}')
# Score function at t=50
t_test = 49
x_t, eps = forward_diffusion(x0, t_test, alpha_bars)
score = true_score(x_t, x0, t_test, alpha_bars)
score_from_eps = -eps / np.sqrt(1 - alpha_bars[t_test])
print(f'\nt={t_test}: x_t={x_t[0]:.4f}')
print(f' Score via formula: {score[0]:.4f}')
print(f' Score via epsilon: {score_from_eps[0]:.4f}')
print(f' Match: {np.allclose(score, score_from_eps)}')
print('PASS - Score = -eps/sqrt(1-alpha_bar) (DDPM parameterization)')
# DDIM deterministic sampling (simplified 1D)
print('\nDDIM reverse process (1D, oracle score):')
x_T = np.random.randn(1) # start from noise
x = x_T.copy()
for t in range(T_diff-1, 0, -10):
# Oracle: predict x0 from x_t using true score (Tweedie formula)
score_t = true_score(x, x0, t, alpha_bars)
x0_pred = (x - np.sqrt(1 - alpha_bars[t]) * (-score_t * np.sqrt(1-alpha_bars[t]))) / np.sqrt(alpha_bars[t])
ab_prev = alpha_bars[t-1] if t > 0 else 1.0
x = np.sqrt(ab_prev) * x0_pred + np.sqrt(1-ab_prev) * (-score_t * np.sqrt(1-alpha_bars[t]))
print(f' Final denoised: {x[0]:.4f} (target: {x0[0]:.4f})')
11. Optional Stopping and Doob's Inequalities
Optional Stopping Theorem (OST): If is a martingale and is a stopping time with and is uniformly bounded, then:
Doob's maximal inequality: For a non-negative submartingale :
Doob's inequality: For :
These are the stochastic process analogues of concentration inequalities — they bound worst-case deviations over a whole trajectory, not just at a fixed time.
Code cell 31
# === 11.1 Doob's Maximal Inequality Verification ===
import numpy as np
np.random.seed(42)
# SRW martingale: M_t = S_t (sum of +/-1 steps)
n_steps = 100
N_mc = 50000
lambda_val = 5.0
# Simulate paths
steps = np.random.choice([-1, 1], size=(N_mc, n_steps))
paths = np.cumsum(steps, axis=1)
max_paths = np.abs(paths).max(axis=1)
# Doob maximal: P(max |S_t| >= lambda) <= E[|S_T|] / lambda
# For SRW: E[|S_T|] ~ sqrt(2T/pi)
E_abs_ST = np.abs(paths[:, -1]).mean()
doob_bound = E_abs_ST / lambda_val
empirical = (max_paths >= lambda_val).mean()
print(f'Doob maximal inequality (lambda={lambda_val}):')
print(f' E[|S_T|] = {E_abs_ST:.4f}')
print(f' Doob bound = E[|S_T|]/lambda = {doob_bound:.4f}')
print(f' Empirical P = {empirical:.4f}')
assert doob_bound >= empirical, 'Doob bound violated!'
print(f' PASS - Doob bound holds ({doob_bound:.4f} >= {empirical:.4f})')
# Reflection principle: exact result P(max S_t >= k) = 2*P(S_T >= k) for SRW
from scipy.stats import norm
k = int(lambda_val)
# Approx: P(max >= k) ~ 2 * P(S_T >= k) via Brownian reflection
reflection_bound = 2 * norm.sf(lambda_val / np.sqrt(n_steps))
print(f'\nReflection principle bound: {reflection_bound:.4f}')
print(f'(Tighter than Doob for SRW due to symmetry)')
12. Connections: Concentration Inequalities via Martingales
The Doob martingale is the bridge between stochastic processes and concentration:
Given with bounded differences , define:
Then is a martingale with differences .
Azuma's inequality then gives:
This IS McDiarmid's inequality — proven via the martingale route.
The hierarchy:
Azuma (martingale differences)
↕ (same inequality, different proof routes)
McDiarmid (bounded differences on function of iid)
↓ (special case: f = sum)
Hoeffding (sum of bounded iid)
Code cell 33
# === 12.1 Azuma/McDiarmid/Hoeffding Comparison ===
import numpy as np
np.random.seed(42)
# All three applied to: f = empirical mean of n Bernoulli(0.5)
n = 100
t = 0.1 # deviation from mean
# Hoeffding: sum of [0,1]-bounded, range c=1
hoeff = np.exp(-2 * n * t**2) # equivalent: 2*exp(-2*n*t^2) two-sided
# McDiarmid: f = mean, each x_i in [0,1], c_i = 1/n
# sum c_i^2 = n*(1/n)^2 = 1/n
mcdiarmid = np.exp(-2 * t**2 / (1/n)) # = exp(-2*n*t^2)
# Azuma: martingale differences bounded by c_i = 1/n
azuma = np.exp(-2 * t**2 / (1/n))
print(f'For f=mean of n={n} Bernoulli(0.5), t={t}:')
print(f' Hoeffding: exp(-2nt^2) = {hoeff:.6f}')
print(f' McDiarmid: exp(-2t^2/(1/n)) = {mcdiarmid:.6f}')
print(f' Azuma: same formula = {azuma:.6f}')
print(f' All equal: {np.allclose([hoeff, mcdiarmid, azuma], hoeff)}')
print('PASS - Three inequalities coincide for sum of iid bounded')
# Now: non-sum function where only McDiarmid applies
# f(x_1,...,x_n) = max(x_1,...,x_n) on [0,1]^n
# c_i = 1 for each i, sum c_i^2 = n
n_mc = 50
N_trials = 100000
data = np.random.uniform(0, 1, (N_trials, n_mc))
f_vals = data.max(axis=1)
E_f = f_vals.mean() # ~ n/(n+1)
E_f_theory = n_mc / (n_mc + 1)
t_max = 0.05
# McDiarmid: P(max - E[max] >= t) <= exp(-2t^2 / sum(c_i^2)) = exp(-2t^2/n)
# (since each c_i=1, this becomes exp(-2t^2/n) — loose for max!)
mcdiarmid_max = np.exp(-2 * t_max**2 / n_mc)
empirical_max = (f_vals - E_f >= t_max).mean()
print(f'\nMax function (n={n_mc}), t={t_max}:')
print(f' E[max] theory: {E_f_theory:.4f}, empirical: {E_f:.4f}')
print(f' McDiarmid bound: {mcdiarmid_max:.4f}')
print(f' Empirical tail: {empirical_max:.4f}')
assert mcdiarmid_max >= empirical_max
print('PASS - McDiarmid applies to max function (Hoeffding would not)')
13. Attention as a Stochastic Process
Softmax attention can be viewed as a kernel smoothing operation:
where is a normalisation constant.
As a random process view:
- Attention weights form a probability distribution
- The output is — an expectation of values under a learned measure
- As context length : by the law of large numbers, the attention sum concentrates
RoPE and stationarity: With rotary embeddings, the attention score depends only on relative position — a stationary kernel over position space.
Flash attention as Markov property: FlashAttention 2/3 processes attention in tiles, exploiting that long-range dependencies decay (approximately Markovian for some tasks).
Code cell 35
# === 13.1 Attention as Kernel Smoothing + Concentration ===
import numpy as np
np.random.seed(42)
def softmax(x):
e = np.exp(x - x.max())
return e / e.sum()
def attention(q, K, V):
d = K.shape[1]
scores = K @ q / np.sqrt(d)
weights = softmax(scores)
return weights @ V, weights
# Test: attention concentrates on relevant keys
d, L = 64, 20
q = np.random.randn(d)
K = np.random.randn(L, d)
# Make key 5 match query
K[5] = q + 0.1 * np.random.randn(d)
V = np.random.randn(L, d)
out, weights = attention(q, K, V)
print(f'Attention weights (top 5 by weight):')
top5 = np.argsort(weights)[-5:][::-1]
for idx in top5:
print(f' key {idx:2d}: weight = {weights[idx]:.4f}')
print(f' Key 5 (matched query) gets highest weight: {np.argmax(weights) == 5}')
assert np.argmax(weights) == 5
print('PASS - Attention concentrates on aligned key')
# Entropy of attention distribution
entropy = -np.sum(weights * np.log(weights + 1e-10))
uniform_entropy = np.log(L)
print(f'\nAttention entropy: {entropy:.4f} (uniform would be {uniform_entropy:.4f})')
print(f'Concentration ratio: {entropy/uniform_entropy:.4f}')
# Scaling: larger d -> more uniform (less peaked)
print('\nEffect of sqrt(d) scaling on attention entropy:')
for d_test in [1, 4, 16, 64, 256]:
q_t = np.random.randn(d_test)
K_t = np.random.randn(L, d_test)
K_t[5] = q_t + 0.1 * np.random.randn(d_test)
_, w = attention(q_t, K_t, np.zeros((L, 1)))
ent = -np.sum(w * np.log(w + 1e-10))
print(f' d={d_test:4d}: entropy = {ent:.4f}')
14. Summary: Process Taxonomy and Key Properties
| Process | State Space | Time | Markov? | Martingale? | Stationary? |
|---|---|---|---|---|---|
| SRW | Discrete | Discrete | Yes | Yes (S_t) | No |
| AR(1) $ | \phi | <1$ | Continuous | Discrete | Yes |
| BM | Continuous | Continuous | Yes | Yes | No (non-stationary increments) |
| OU | Continuous | Continuous | Yes | No | Yes (stationary) |
| Poisson | Discrete | Continuous | Yes | No | |
| GP | Function | Any | No | Depends | If kernel is stationary |
Key takeaways for AI/ML:
- Diffusion models = OU process forward + learned reverse SDE
- SGD = Langevin dynamics near minima → stationary dist
- RL = value function satisfies Bellman = martingale optimality condition
- Attention = kernel smoothing with learned stationary (RoPE) kernel
- McDiarmid = Azuma applied to Doob martingale of a function
Code cell 37
# === 14.1 Summary Visualization — Process Comparison ===
import numpy as np
np.random.seed(42)
try:
import matplotlib.pyplot as plt
HAS_MPL = True
except ImportError:
HAS_MPL = False
n = 500
t_grid = np.arange(n)
# 1. SRW
srw = np.cumsum(np.random.choice([-1, 1], size=n))
# 2. BM
dt = 1/n
bm = np.cumsum(np.random.randn(n) * np.sqrt(dt))
# 3. OU
ou = np.zeros(n)
theta_ou, sigma_ou = 0.1, 0.5
for i in range(1, n):
ou[i] = ou[i-1] - theta_ou*ou[i-1] + sigma_ou*np.random.randn()
# 4. Poisson
lam = 0.5
poisson_increments = np.random.poisson(lam, n)
poisson_proc = np.cumsum(poisson_increments)
comp_martingale = poisson_proc - lam * t_grid # M_t = N_t - lambda*t
print('Process summary statistics:')
print(f' SRW: mean={srw.mean():.2f}, std={srw.std():.2f}')
print(f' BM: mean={bm.mean():.4f}, std={bm.std():.4f}')
print(f' OU: mean={ou.mean():.4f}, std={ou.std():.4f} (theory: {0.5/np.sqrt(2*0.1):.4f})')
print(f' Poisson martingale M_t: mean={comp_martingale.mean():.4f} (expect ~0)')
assert abs(comp_martingale.mean()) < 2.0, 'Poisson martingale mean too far from 0'
print('PASS - All processes verified')
if HAS_MPL:
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes[0,0].plot(t_grid, srw, color='steelblue')
axes[0,0].set_title('Simple Random Walk (SRW)')
axes[0,0].set_xlabel('Step'); axes[0,0].set_ylabel('S_t')
axes[0,1].plot(t_grid, bm, color='darkorange')
axes[0,1].set_title('Brownian Motion')
axes[0,1].set_xlabel('t'); axes[0,1].set_ylabel('B_t')
axes[1,0].plot(t_grid, ou, color='green')
axes[1,0].axhline(0, color='red', linestyle='--', alpha=0.5, label='mean=0')
axes[1,0].set_title('Ornstein-Uhlenbeck Process')
axes[1,0].set_xlabel('t'); axes[1,0].legend()
axes[1,1].plot(t_grid, comp_martingale, color='purple', label='N_t - lambda*t')
axes[1,1].axhline(0, color='red', linestyle='--', alpha=0.5)
axes[1,1].set_title('Poisson Compensated Martingale')
axes[1,1].set_xlabel('t'); axes[1,1].legend()
plt.suptitle('Stochastic Process Taxonomy', fontsize=14, fontweight='bold')
plt.tight_layout(); plt.show()
print('\nSummary plots rendered.')
Summary
This notebook developed the key stochastic processes and their properties:
- SRW and filtrations — discrete Markov chains, stopping times, adapted processes
- Martingales — OST, Doob inequality, Doob martingale → McDiarmid
- Poisson process — thinning, superposition, compensated martingale
- Brownian motion — quadratic variation, non-differentiability, Donsker CLT
- OU process — stationary Gaussian Markov process, DDPM connection
- Gaussian processes — kernel posterior inference, Bayesian optimisation
- Stationarity — ACVF, Wiener-Khinchin, RoPE as stationary kernel
- SGD as Langevin — noise temperature, implicit regularisation
- RL martingales — TD error as martingale differences, Bellman equation
- Diffusion models — VP/VE-SDE, score matching, reverse SDE
- Attention — kernel smoothing, concentration, RoPE stationarity
Next: Markov Chains → ../07-Markov-Chains/theory.ipynb