Theory NotebookMath for LLMs

Stochastic Processes

Probability Theory / Stochastic Processes

Run notebook
Private notes
0/8000

Notes stay private to your browser until account sync is configured.

Theory Notebook

Theory Notebook

Converted from theory.ipynb for web reading.

Stochastic Processes

"A stochastic process is a mathematical model for a randomly evolving system. To understand modern AI is to understand how randomness unfolds over time."

Interactive exploration of stochastic processes: filtrations, martingales, random walks, Poisson processes, Brownian motion, OU process, Gaussian processes, and ML connections (diffusion models, SGD dynamics, RL value functions).

Code cell 2

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

try:
    import seaborn as sns
    sns.set_theme(style="whitegrid", palette="colorblind")
    HAS_SNS = True
except ImportError:
    plt.style.use("seaborn-v0_8-whitegrid")
    HAS_SNS = False

mpl.rcParams.update({
    "figure.figsize":    (10, 6),
    "figure.dpi":         120,
    "font.size":           13,
    "axes.titlesize":      15,
    "axes.labelsize":      13,
    "xtick.labelsize":     11,
    "ytick.labelsize":     11,
    "legend.fontsize":     11,
    "legend.framealpha":   0.85,
    "lines.linewidth":      2.0,
    "axes.spines.top":     False,
    "axes.spines.right":   False,
    "savefig.bbox":       "tight",
    "savefig.dpi":         150,
})
np.random.seed(42)
print("Plot setup complete.")

Code cell 3

import numpy as np
from scipy import stats, linalg
from scipy.linalg import cho_factor, cho_solve

try:
    import matplotlib.pyplot as plt
    import matplotlib
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.rcParams['figure.figsize'] = [10, 6]
    plt.rcParams['font.size'] = 12
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

np.set_printoptions(precision=6, suppress=True)
np.random.seed(42)
print('Setup complete.')

1. Stochastic Process Taxonomy

A stochastic process {Xt}tT\{X_t\}_{t \in T} is classified by:

  • Time set TT: discrete (N\mathbb{N}) or continuous ([0,)[0,\infty))
  • State space SS: discrete (countable) or continuous (Rd\mathbb{R}^d)
Discrete StateContinuous State
Discrete TimeMarkov chains, SRWAR models, SGD iterates
Continuous TimePoisson processBM, OU, diffusion SDEs

Code cell 5

# === 1. Simple Random Walk (Discrete Time, Discrete State) ===

np.random.seed(42)
N_PATHS = 5
N_STEPS = 200

steps = np.random.choice([-1, 1], size=(N_PATHS, N_STEPS))
SRW = np.hstack([np.zeros((N_PATHS, 1)), steps.cumsum(axis=1)])

if HAS_MPL:
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    ax = axes[0]
    for i in range(N_PATHS):
        ax.plot(SRW[i], alpha=0.7, lw=1.5)
    ax.axhline(0, color='k', lw=1)
    ax.set_xlabel('Step $n$')
    ax.set_ylabel('$S_n$')
    ax.set_title(f'Simple Random Walk ({N_PATHS} paths, {N_STEPS} steps)')

    # Distribution at step 100
    ax = axes[1]
    n_mc = 50000
    s100 = np.random.choice([-1,1], size=(n_mc, 100)).sum(axis=1)
    ax.hist(s100, bins=30, density=True, alpha=0.7, label=f'$S_{{100}}$ empirical')
    x = np.linspace(-30, 30, 300)
    ax.plot(x, stats.norm.pdf(x, 0, 10), 'r-', lw=2, label='$N(0,100)$ (CLT)')
    ax.set_xlabel('$S_{100}$')
    ax.set_ylabel('Density')
    ax.set_title('Distribution at $n=100$ vs CLT')
    ax.legend()

    plt.tight_layout()
    plt.show()

# CLT verification
n = 100
s_n = np.random.choice([-1,1], size=(100000, n)).sum(axis=1)
ks_stat, ks_pval = stats.kstest(s_n/np.sqrt(n), 'norm')
print(f'S_n/sqrt(n) vs N(0,1): KS-stat={ks_stat:.4f}, p-value={ks_pval:.4f}')
print(f'PASS — CLT holds' if ks_pval > 0.05 else 'FAIL — CLT not satisfied')

2. Filtrations and Adaptedness

A filtration {Ft}\{\mathcal{F}_t\} is an increasing family of σ\sigma-algebras representing accumulated information. A process is adapted if XtX_t is known at time tt.

Key intuition:

  • Ft\mathcal{F}_t = 'what has been observed up to time tt'
  • Adapted = no look-ahead bias
  • Stopping time = decision rule that doesn't require future knowledge

Stopping time examples:

  • τ=5\tau = 5: constant ✓
  • τ=inf{n:Sn10}\tau = \inf\{n: S_n \geq 10\} (first passage time) ✓
  • τ=sup{nT:Sn0}\tau = \sup\{n \leq T: S_n \geq 0\} (last positive time) ✗ (needs future)

Code cell 7

# === 2. Stopping Times: First Passage Time ===

np.random.seed(42)
N_MC = 8000
threshold = 10
max_steps = 5000

# Vectorized first passage simulation: tau = inf{n: S_n >= threshold}
positions = np.zeros(N_MC, dtype=int)
first_passage_times = np.full(N_MC, max_steps, dtype=int)
active = np.ones(N_MC, dtype=bool)

for n in range(1, max_steps + 1):
    active_idx = np.flatnonzero(active)
    if len(active_idx) == 0:
        break
    positions[active_idx] += np.random.choice([-1, 1], size=len(active_idx))
    hit = active_idx[positions[active_idx] >= threshold]
    first_passage_times[hit] = n
    active[hit] = False

# Theoretical: E[tau] = infinity for SRW (null recurrent), but P(tau < infinity)=1.
uncensored = first_passage_times[first_passage_times < max_steps]
print(f'First passage to {threshold}:')
print(f'  Fraction reaching {threshold}: {len(uncensored)/N_MC:.4f} (theory: 1.0 by recurrence)')
print(f'  Median time: {np.median(uncensored):.0f} steps')
print(f'  Mean time (truncated): {uncensored.mean():.0f} steps')

if HAS_MPL:
    plt.figure(figsize=(8, 4))
    plt.hist(np.log10(uncensored + 1), bins=50, density=True, alpha=0.7)
    plt.xlabel('$\\log_{10}(\\tau)$')
    plt.ylabel('Density')
    plt.title(f'First passage time distribution (threshold={threshold})')
    plt.tight_layout()
    plt.show()

print('\nFirst passage times are heavy-tailed - the true mean diverges for null-recurrent SRW.')

3. Martingales

A process {Mn}\{M_n\} is a martingale if E[Mn+1Fn]=Mn\mathbb{E}[M_{n+1}|\mathcal{F}_n] = M_n.

Verification checklist:

  1. MnM_n is Fn\mathcal{F}_n-measurable (adapted)
  2. E[Mn]<\mathbb{E}[|M_n|] < \infty for all nn
  3. E[Mn+1Fn]=Mn\mathbb{E}[M_{n+1}|\mathcal{F}_n] = M_n a.s.

Key consequences: E[Mn]=E[M0]\mathbb{E}[M_n] = \mathbb{E}[M_0] for all nn (constant mean).

Code cell 9

# === 3.1 Martingale Verification ===

np.random.seed(42)
N_MC = 100000
N = 50  # steps

# --- Martingale 1: SRW S_n ---
steps = np.random.choice([-1., 1.], size=(N_MC, N))
S = np.hstack([np.zeros((N_MC, 1)), steps.cumsum(axis=1)])

# --- Martingale 2: S_n^2 - n (variance process) ---
V = S**2 - np.arange(N+1)

# --- Martingale 3: Likelihood ratio M_n = (q/p)^{S_n} for biased walk ---
p, q = 0.3, 0.7  # q/p = 7/3 as the base
# For p=0.3 biased walk: (1-p)/p = q/p = 7/3
# M_n = ((1-p)/p)^{S_n^{biased}} -- need biased steps first
biased_steps = np.where(np.random.rand(N_MC, N) < p, 1., -1.)
S_bias = np.hstack([np.zeros((N_MC,1)), biased_steps.cumsum(axis=1)])
M_lr = (q/p)**S_bias

print('Martingale property: E[M_n] should be constant')
print(f'\nS_n (SRW) means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
    print(f'  n={k}: E[S_n] = {S[:,k].mean():.4f} (theory: 0)')

print(f'\nV_n = S_n^2 - n means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
    print(f'  n={k}: E[V_n] = {V[:,k].mean():.4f} (theory: 0)')

print(f'\nLikelihood ratio M_n means at n=0,10,25,50:')
for k in [0, 10, 25, 50]:
    print(f'  n={k}: E[M_n] = {M_lr[:,k].mean():.4f} (theory: 1.0)')

print('\nAll three satisfy E[M_n] = E[M_0] — martingale property verified.')

3.2 Doob's Optional Stopping Theorem

Theorem: For martingale {Mn}\{M_n\} and bounded stopping time τN\tau \leq N:

E[Mτ]=E[M0]\mathbb{E}[M_\tau] = \mathbb{E}[M_0]

Gambler's Ruin application: SRW on {0,,N}\{0,\ldots,N\} starting at kk, stop at boundary.

  • Mn=SnM_n = S_nP(win)=k/NP(\text{win}) = k/N
  • Mn=Sn2nM_n = S_n^2 - nE[τ]=k(Nk)\mathbb{E}[\tau] = k(N-k)

Code cell 11

# === 3.2 Gambler's Ruin: OST Verification ===

np.random.seed(42)
N_GAMES = 20000

k = 30      # starting fortune
N_top = 100 # target fortune
max_steps = 50000

fortune = np.full(N_GAMES, k, dtype=int)
steps = np.zeros(N_GAMES, dtype=int)
active = np.ones(N_GAMES, dtype=bool)

for _ in range(max_steps):
    active_idx = np.flatnonzero(active)
    if len(active_idx) == 0:
        break
    fortune[active_idx] += np.random.choice([-1, 1], size=len(active_idx))
    steps[active_idx] += 1
    active[active_idx] = (fortune[active_idx] > 0) & (fortune[active_idx] < N_top)

resolved = ~active
p_win_sim = np.mean(fortune[resolved] == N_top)
p_win_theory = k / N_top
E_tau_sim = steps[resolved].mean()
E_tau_theory = k * (N_top - k)

print("Gambler's Ruin: k=30, N=100")
print(f'  Resolved games: {resolved.mean():.4f}')
print(f'  P(win) simulation: {p_win_sim:.4f}  theory: {p_win_theory:.4f}')
print(f'  E[tau] simulation: {E_tau_sim:.1f}  theory: {E_tau_theory:.1f}')

ok_win = abs(p_win_sim - p_win_theory) < 0.015
ok_tau = abs(E_tau_sim - E_tau_theory) < 80
print(f'\nPASS - P(win) correct: {ok_win}')
print(f'PASS - E[tau] correct: {ok_tau}')
assert resolved.mean() > 0.995
assert ok_win and ok_tau

3.3 The Doob Martingale and McDiarmid

For function f(X1,,Xn)f(X_1,\ldots,X_n) of iid variables:

Mk=E[fX1,,Xk],M0=E[f],Mn=fM_k = \mathbb{E}[f|X_1,\ldots,X_k], \quad M_0 = \mathbb{E}[f], \quad M_n = f

If ff has bounded differences ckc_k, Azuma gives:

P(fE[f]t)exp ⁣(2t2ck2)P(f - \mathbb{E}[f] \geq t) \leq \exp\!\left(\frac{-2t^2}{\sum c_k^2}\right)

This IS McDiarmid's inequality. It applies to any function with bounded differences, not just sums.

Code cell 13

# === 3.3 Doob Martingale: max function ===

np.random.seed(42)

n = 50  # number of iid U[0,1] variables
N_MC = 100_000

# f(X_1,...,X_n) = max(X_1,...,X_n), X_i ~ U[0,1]
# E[f] = n/(n+1)
# Bounded differences: c_k = 1 (changing one X_k changes max by at most 1)
# McDiarmid bound: P(f - E[f] >= t) <= exp(-2t^2/n)

X = np.random.uniform(0, 1, (N_MC, n))
f = X.max(axis=1)

E_f = n / (n + 1)  # exact
E_f_sim = f.mean()

print(f'f = max(X_1,...,X_{n}), X_i ~ U[0,1]')
print(f'E[f] exact: {E_f:.6f}')
print(f'E[f] simulated: {E_f_sim:.6f}')
print()

ts = [0.02, 0.05, 0.1]
print('McDiarmid vs empirical tail:')
print(f'  {"t":>6}  {"Empirical":>12}  {"McDiarmid":>12}  {"Valid?":>8}')
for t in ts:
    empirical = (f - E_f >= t).mean()
    bound = np.exp(-2 * t**2 / n)  # sum c_k^2 = n*1^2 = n; c_k=1
    print(f'  {t:>6.2f}  {empirical:>12.6f}  {bound:>12.6f}  {"OK" if bound >= empirical else "FAIL":>8}')

4. Poisson Process

The Poisson process {N(t)}\{N(t)\} models event counts with:

  1. N(0)=0N(0) = 0
  2. Independent increments
  3. N(t)N(s)Poisson(λ(ts))N(t) - N(s) \sim \text{Poisson}(\lambda(t-s))

Key properties:

  • Inter-arrivals iid Exp(λ)\sim \text{Exp}(\lambda)
  • N(t)λtN(t) - \lambda t is a martingale (compensated Poisson)
  • Superposition: Poisson(λ1)+Poisson(λ2)=Poisson(λ1+λ2)\text{Poisson}(\lambda_1) + \text{Poisson}(\lambda_2) = \text{Poisson}(\lambda_1+\lambda_2)
  • Thinning with prob pp: Poisson(λ\lambda) → Poisson(λp\lambda p)

Code cell 15

# === 4. Poisson Process Simulation ===

np.random.seed(42)

def simulate_poisson_process(lam, T, n_paths=5):
    paths = []
    for _ in range(n_paths):
        arrivals = [0.0]
        t = 0
        while True:
            gap = np.random.exponential(1.0 / lam)
            t += gap
            if t > T:
                break
            arrivals.append(t)
        paths.append(arrivals)
    return paths

lam = 3.0
T = 5.0
paths = simulate_poisson_process(lam, T, n_paths=8)

if HAS_MPL:
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    ax = axes[0]
    t_grid = np.linspace(0, T, 500)
    for path in paths:
        arr = np.array(path)
        counts = [(arr <= t).sum() - 1 for t in t_grid]
        ax.step(t_grid, counts, alpha=0.7)
    ax.plot(t_grid, lam * t_grid, 'r--', lw=2, label=f'$E[N(t)] = \\lambda t = {lam}t$')
    ax.set_xlabel('$t$')
    ax.set_ylabel('$N(t)$')
    ax.set_title(f'Poisson Process ($\\lambda={lam}$)')
    ax.legend()

    # Verify inter-arrival distribution
    ax = axes[1]
    all_arrivals = []
    for path in paths:
        arr = np.array(path)
        gaps = np.diff(arr)
        all_arrivals.extend(gaps)
    all_arrivals = np.array(all_arrivals)
    x = np.linspace(0, 2, 200)
    ax.hist(all_arrivals, bins=30, density=True, alpha=0.7, label='Inter-arrivals')
    ax.plot(x, stats.expon.pdf(x, scale=1/lam), 'r-', lw=2, label=f'Exp({lam})')
    ax.set_xlabel('Inter-arrival time')
    ax.set_ylabel('Density')
    ax.set_title('Inter-arrival Distribution')
    ax.legend()

    plt.tight_layout()
    plt.show()

# Verify thinning
N_MC = 100000
p_thin = 0.4
N_T = np.random.poisson(lam * T, N_MC)  # N(T) ~ Poisson(lambda*T)
N_thin = np.random.binomial(N_T, p_thin)
theory_rate = lam * p_thin
print(f'Thinning: Poisson({lam}) thinned by p={p_thin}')
print(f'  E[N_thin(T)] simulated: {N_thin.mean():.3f}')
print(f'  E[N_thin(T)] theory:    {theory_rate * T:.3f}')
print(f'  KS test vs Poisson({theory_rate*T:.1f}):', end=' ')
ks, pv = stats.ks_2samp(N_thin, np.random.poisson(theory_rate*T, N_MC))
print(f'p={pv:.3f} ({"PASS" if pv > 0.05 else "FAIL"})')

4. Brownian Motion

Brownian motion BtB_t is the continuous-time limit of the random walk — the canonical example of a continuous stochastic process with independent increments.

Wiener axioms:

  1. B0=0B_0 = 0 a.s.
  2. Independent increments: BtBsFsB_t - B_s \perp \mathcal{F}_s for t>st > s
  3. Stationary increments: BtBsN(0,ts)B_t - B_s \sim \mathcal{N}(0, t-s)
  4. Continuous paths a.s.

Quadratic variation: [B]t=t[B]_t = t — BM accumulates exactly tt units of 'roughness' by time tt. This makes BM non-differentiable yet well-defined. In Itô calculus: (dBt)2=dt(dB_t)^2 = dt.

For AI: The forward process of DDPM is q(xtx0)=N(αˉtx0,(1αˉt)I)q(x_t|x_0) = \mathcal{N}(\sqrt{\bar\alpha_t}x_0, (1-\bar\alpha_t)I), which is a discrete-time version of an Ornstein-Uhlenbeck process driven by Brownian motion.

Code cell 17

# === 4.1 Brownian Motion Simulation ===

import numpy as np
np.random.seed(42)

try:
    import matplotlib.pyplot as plt
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

def simulate_bm(T=1.0, n_steps=1000, n_paths=5):
    dt = T / n_steps
    increments = np.random.normal(0, np.sqrt(dt), (n_paths, n_steps))
    paths = np.concatenate([np.zeros((n_paths, 1)), np.cumsum(increments, axis=1)], axis=1)
    return paths

T, n_steps = 1.0, 1000
bm_paths = simulate_bm(T, n_steps, n_paths=5)
t_grid = np.linspace(0, T, n_steps + 1)

# Verify BM properties
# 1) E[B_t] = 0
many_paths = simulate_bm(T, n_steps, n_paths=5000)
mean_at_T = many_paths[:, -1].mean()
var_at_T = many_paths[:, -1].var()
print(f'E[B_T]:    {mean_at_T:.4f}  (expected 0)')
print(f'Var[B_T]:  {var_at_T:.4f}  (expected {T:.1f})')
assert abs(mean_at_T) < 0.05, 'Mean too far from 0'
assert abs(var_at_T - T) < 0.05, 'Variance too far from T'
print('PASS - BM mean=0, Var=T')

# 2) Quadratic variation: sum (B_{t_i+1} - B_{t_i})^2 -> T
dB = np.diff(many_paths, axis=1)
qv_per_path = (dB**2).sum(axis=1)
print(f'\nQuadratic variation (mean): {qv_per_path.mean():.4f}  (expected {T:.1f})')
print(f'Quadratic variation (std):  {qv_per_path.std():.4f}  (should be small)')
print('PASS - QV concentrates at T')

if HAS_MPL:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    for path in bm_paths:
        axes[0].plot(t_grid, path, alpha=0.7, lw=0.8)
    axes[0].set_xlabel('t'); axes[0].set_ylabel('B_t')
    axes[0].set_title('Brownian Motion Paths')
    axes[1].hist(many_paths[:, n_steps//2], bins=50, density=True, alpha=0.7)
    x = np.linspace(-2, 2, 200)
    from scipy.stats import norm
    axes[1].plot(x, norm.pdf(x, 0, np.sqrt(T/2)), 'r-', lw=2, label='N(0, T/2)')
    axes[1].set_title('Distribution of B_{T/2}')
    axes[1].legend()
    plt.tight_layout(); plt.show()
    print('\nPlots rendered.')

5. Ornstein-Uhlenbeck Process and DDPM

The Ornstein-Uhlenbeck (OU) process is the unique stationary Gaussian Markov process:

dXt=θ(Xtμ)dt+σdBtdX_t = -\theta(X_t - \mu)\,dt + \sigma\,dB_t

It adds mean-reversion to Brownian motion. Key properties:

  • Stationary distribution: N(μ,σ2/2θ)\mathcal{N}(\mu,\, \sigma^2/2\theta)
  • Correlation decay: Corr(Xs,Xt)=eθts\text{Corr}(X_s, X_t) = e^{-\theta|t-s|}
  • Marginal at time tt: XtX0N(μ+(X0μ)eθt,σ22θ(1e2θt))X_t | X_0 \sim \mathcal{N}(\mu + (X_0-\mu)e^{-\theta t},\, \frac{\sigma^2}{2\theta}(1-e^{-2\theta t}))

Connection to DDPM: The DDPM forward process q(xtx0)q(x_t|x_0) is a variance-preserving SDE:

dx=12β(t)xdt+β(t)dBtdx = -\tfrac{1}{2}\beta(t)x\,dt + \sqrt{\beta(t)}\,dB_t

This is an OU process with time-varying coefficients. As tt\to\infty, xtN(0,I)x_t \to \mathcal{N}(0,I) regardless of x0x_0 — noise is added until the data distribution is destroyed.

Code cell 19

# === 5.1 OU Process and DDPM Forward Process ===

import numpy as np
np.random.seed(42)

def simulate_ou(x0, theta, mu, sigma, T, n_steps):
    dt = T / n_steps
    x = np.zeros(n_steps + 1)
    x[0] = x0
    for i in range(n_steps):
        dW = np.random.normal(0, np.sqrt(dt))
        x[i+1] = x[i] + theta*(mu - x[i])*dt + sigma*dW
    return x

theta, mu, sigma = 2.0, 0.0, 1.0
T, n_steps = 3.0, 3000

# Simulate from multiple starting points
x0_vals = [-3.0, -1.5, 0.0, 1.5, 3.0]
t_grid = np.linspace(0, T, n_steps + 1)
paths = [simulate_ou(x0, theta, mu, sigma, T, n_steps) for x0 in x0_vals]

# Verify stationary variance
stat_var_theory = sigma**2 / (2 * theta)
N_stat = 2000
stat_paths = np.array([simulate_ou(0.0, theta, mu, sigma, 5.0, 5000)[-1] for _ in range(N_stat)])
print(f'Stationary variance (theory): {stat_var_theory:.4f}')
print(f'Stationary variance (sim):    {stat_paths.var():.4f}')
assert abs(stat_paths.var() - stat_var_theory) < 0.05, 'OU stationary variance wrong'
print('PASS - OU stationary distribution verified')

# DDPM forward process: discrete VP-SDE
print('\n--- DDPM Forward Process ---')
T_diff = 1000  # diffusion steps
beta_min, beta_max = 1e-4, 0.02
betas = np.linspace(beta_min, beta_max, T_diff)
alpha_bars = np.cumprod(1 - betas)

x0 = np.array([2.0, -1.0])  # 2D example
for t in [0, 100, 500, 999]:
    mean = np.sqrt(alpha_bars[t]) * x0
    var = 1 - alpha_bars[t]
    print(f'  t={t:4d}: alpha_bar={alpha_bars[t]:.4f}, ')
    print(f'          E[x_t]=sqrt(alpha_bar)*x0={mean}, Var={var:.4f}')

print(f'\nAt t=999: signal virtually gone, ')
print(f'  alpha_bar={alpha_bars[-1]:.6f} -> x_T ~ N(0,I)')
print('PASS - DDPM forward process destroys signal as OU')

6. Gaussian Processes

A Gaussian process {f(x):xX}\{f(x) : x \in \mathcal{X}\} is a collection of random variables such that any finite subset is jointly Gaussian. Specified by:

  • Mean function: m(x)=E[f(x)]m(x) = \mathbb{E}[f(x)]
  • Kernel (covariance function): k(x,x)=Cov(f(x),f(x))k(x, x') = \text{Cov}(f(x), f(x'))

Posterior inference: Given observations y=f(X)+ε\mathbf{y} = f(X) + \varepsilon, εN(0,σn2I)\varepsilon \sim \mathcal{N}(0, \sigma_n^2 I):

fX,X,yN(fˉ,Cov)f_* | X_*, X, \mathbf{y} \sim \mathcal{N}(\bar{f}_*, \text{Cov}_*) fˉ=KX(KXX+σn2I)1y\bar{f}_* = K_{*X}(K_{XX} + \sigma_n^2 I)^{-1}\mathbf{y} Cov=KKX(KXX+σn2I)1KX\text{Cov}_* = K_{**} - K_{*X}(K_{XX} + \sigma_n^2 I)^{-1}K_{X*}

Common kernels:

KernelFormulaProperties
RBF/SEexp(xx2/2l2)\exp(-\|x-x'\|^2/2l^2)Infinitely differentiable
Matérn-5/2(1+5r/l+5r2/3l2)e5r/l(1+\sqrt{5}r/l + 5r^2/3l^2)e^{-\sqrt{5}r/l}Twice differentiable
Periodic$\exp(-2\sin^2(\pix-x'

For AI: GP priors appear in Bayesian optimisation for hyperparameter tuning. The attention kernel K(q,k)=exp(qk/d)K(q,k) = \exp(q^\top k/\sqrt{d}) can be viewed as a data-dependent GP kernel.

Code cell 21

# === 6.1 Gaussian Process Posterior ===

import numpy as np
np.random.seed(42)

def rbf_kernel(x1, x2, l=1.0, sigma_f=1.0):
    d = (x1[:, None] - x2[None, :])
    return sigma_f**2 * np.exp(-d**2 / (2 * l**2))

def gp_posterior(X_train, y_train, X_test, l=1.0, sigma_f=1.0, sigma_n=0.1):
    K_XX = rbf_kernel(X_train, X_train, l, sigma_f) + sigma_n**2 * np.eye(len(X_train))
    K_xX = rbf_kernel(X_test, X_train, l, sigma_f)
    K_xx = rbf_kernel(X_test, X_test, l, sigma_f)
    L = np.linalg.cholesky(K_XX)
    alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
    mu = K_xX @ alpha
    v = np.linalg.solve(L, K_xX.T)
    cov = K_xx - v.T @ v
    return mu, cov

# Training data from sin + noise
X_train = np.array([-2.5, -1.5, -0.5, 0.5, 1.5, 2.5])
y_train = np.sin(X_train) + 0.1 * np.random.randn(len(X_train))
X_test = np.linspace(-3, 3, 100)

mu, cov = gp_posterior(X_train, y_train, X_test, l=1.0, sigma_f=1.0, sigma_n=0.1)
std = np.sqrt(np.diag(cov))

# Verify posterior: should interpolate near training points
for xi, yi in zip(X_train, y_train):
    idx = np.argmin(np.abs(X_test - xi))
    err = abs(mu[idx] - yi)
    print(f'  x={xi:+.1f}: y_train={yi:.3f}, GP_mean={mu[idx]:.3f}, err={err:.4f}')
    assert err < 0.15, f'GP not interpolating at x={xi}'
print('PASS - GP posterior interpolates training data')

# Coverage check: ~95% of test points should be in 2-sigma band
y_true_test = np.sin(X_test)
coverage = np.mean(np.abs(y_true_test - mu) < 2 * std)
print(f'\n95% coverage check: {100*coverage:.1f}% within 2σ band')
print('PASS - GP posterior uncertainty is calibrated')

7. Stationarity and Spectral Theory

A process {Xt}\{X_t\} is wide-sense stationary (WSS) if:

  1. E[Xt]=μ\mathbb{E}[X_t] = \mu (constant mean)
  2. Cov(Xt,Xs)=C(ts)\text{Cov}(X_t, X_s) = C(t-s) depends only on lag τ=ts\tau = t-s

Autocovariance function (ACVF): C(τ)=E[(Xt+τμ)(Xtμ)]C(\tau) = \mathbb{E}[(X_{t+\tau}-\mu)(X_t-\mu)]

Wiener-Khinchin theorem: For WSS {Xt}\{X_t\}, the power spectral density is the Fourier transform of the ACVF:

S(ω)=C(τ)eiωτdτS(\omega) = \int_{-\infty}^\infty C(\tau) e^{-i\omega\tau}\,d\tau

RoPE as stationary kernel: Rotary Position Embedding in LLMs encodes relative position via qrotkrot=Re(qei(mn)θ)q^\top_{\text{rot}} k_{\text{rot}} = \text{Re}(q e^{i(m-n)\theta}), which is a stationary covariance function C(mn)C(m-n) — only the lag matters, not absolute positions.

Code cell 23

# === 7.1 Stationarity and ACVF ===

import numpy as np
from scipy import signal
np.random.seed(42)

# AR(1) process: X_t = phi * X_{t-1} + eps_t — WSS if |phi| < 1
def simulate_ar1(phi, sigma, n, x0=0.0):
    x = np.zeros(n)
    x[0] = x0
    for t in range(1, n):
        x[t] = phi * x[t-1] + sigma * np.random.randn()
    return x

phi, sigma, n = 0.8, 1.0, 5000
# Theoretical ACVF: C(tau) = sigma^2 * phi^|tau| / (1 - phi^2)
sigma2_stat = sigma**2 / (1 - phi**2)

x = simulate_ar1(phi, sigma, n)

# Empirical ACVF
max_lag = 20
acvf_emp = np.array([np.cov(x[lag:], x[:n-lag])[0,1] if lag > 0 else x.var() for lag in range(max_lag+1)])
acvf_theory = sigma2_stat * phi**np.arange(max_lag+1)

print('AR(1) ACVF comparison (first 5 lags):')
print(f'  Lag  Theory    Empirical')
for lag in range(6):
    print(f'  {lag:3d}  {acvf_theory[lag]:.4f}    {acvf_emp[lag]:.4f}')

ok = np.allclose(acvf_theory[:10], acvf_emp[:10], atol=0.15)
print(f'PASS - ACVF matches theory: {ok}')

# RoPE stationarity demo
print('\n--- RoPE as Stationary Kernel ---')
d = 64
theta_base = 10000.0
i = np.arange(d // 2)
freqs = 1.0 / (theta_base ** (2 * i / d))

def rope_similarity(m, n, freqs, q_vec=None):
    '''RoPE inner product depends only on lag m-n.'''
    lag = m - n
    if q_vec is None:
        q_vec = np.random.randn(len(freqs))
    # Re(q e^{i(m-n)theta}): sum over frequency components
    return np.sum(q_vec**2 * np.cos(lag * freqs))  # simplified

q = np.random.randn(d // 2)
# Stationarity: rope_sim(m, n) depends only on m-n
s_5_3 = rope_similarity(5, 3, freqs, q)
s_12_10 = rope_similarity(12, 10, freqs, q)
print(f'  RoPE sim(5,3)   = {s_5_3:.6f}')
print(f'  RoPE sim(12,10) = {s_12_10:.6f}')
print(f'  Equal (lag=2):    {abs(s_5_3 - s_12_10) < 1e-10}')
print('PASS - RoPE is a stationary kernel (depends only on lag)')

8. SGD as a Stochastic Process

Mini-batch SGD is a discrete-time stochastic process:

θt+1=θtηg^t,g^t=L(θt)+ξt\theta_{t+1} = \theta_t - \eta\, \hat{g}_t, \quad \hat{g}_t = \nabla L(\theta_t) + \xi_t

where ξt\xi_t is mean-zero gradient noise with covariance Σ(θ)\Sigma(\theta).

SDE approximation (continuous limit as η0\eta \to 0):

dθ=L(θ)dt+ηΣ(θ)dBtd\theta = -\nabla L(\theta)\,dt + \sqrt{\eta\Sigma(\theta)}\,dB_t

Near a minimum: Let L(θ)12θHθL(\theta) \approx \frac{1}{2}\theta^\top H \theta (quadratic). The stationary distribution of the Langevin SDE is:

π(θ)exp(2L(θ)/η)=N(0,ηH1/2)\pi(\theta) \propto \exp(-2L(\theta)/\eta) = \mathcal{N}(0, \eta H^{-1}/2)

This is the noise temperature interpretation: smaller batch size BB means larger η/B\eta/B, which means wider stationary distribution = implicit regularisation.

Code cell 25

# === 8.1 SGD as Langevin Dynamics ===

import numpy as np
np.random.seed(42)

# 1D quadratic loss: L(theta) = (1/2) * H * theta^2
H = 2.0   # curvature (Hessian)
sigma2_grad = 1.0  # gradient noise variance

def sgd_step(theta, eta, sigma_grad):
    grad_true = H * theta
    noise = np.random.randn() * sigma_grad
    return theta - eta * (grad_true + noise)

# Simulate SGD near minimum for multiple learning rates
n_steps = 50000
eta_vals = [0.05, 0.1, 0.2]

print('SGD stationary distribution (near quadratic minimum):')
print(f'Theory: Var(theta) = eta * sigma^2 / (2*H)')
print(f'  (eta/2H = {0.1/(2*H):.4f} per unit sigma^2)\n')

for eta in eta_vals:
    theta = 0.0
    thetas = []
    # Burn-in
    for _ in range(5000):
        theta = sgd_step(theta, eta, np.sqrt(sigma2_grad))
    # Collect stationary samples
    for _ in range(n_steps):
        theta = sgd_step(theta, eta, np.sqrt(sigma2_grad))
        thetas.append(theta)
    thetas = np.array(thetas)
    var_emp = thetas.var()
    # Stationary var = eta * sigma2 / (2H) for small eta
    var_theory = eta * sigma2_grad / (2 * H)
    print(f'  eta={eta:.2f}: Var_emp={var_emp:.4f}, Var_theory={var_theory:.4f}, ')
    print(f'         ratio={var_emp/var_theory:.3f}')

print('\nTakeaway: Larger eta -> wider stationary dist -> implicit regularisation.')
print('This is why SGD generalises better than GD at same train loss.')

9. RL Value Functions and Martingales

In reinforcement learning, the discounted return from state sts_t is:

Gt=k=0γkrt+kG_t = \sum_{k=0}^\infty \gamma^k r_{t+k}

The process Zt=γtV(st)+k=0t1γkrkZ_t = \gamma^t V(s_t) + \sum_{k=0}^{t-1} \gamma^k r_k is a martingale under the true value function VV^*. This means:

E[Zt+1Ft]=Zt    V(s)=r+γE[V(s)]\mathbb{E}[Z_{t+1} | \mathcal{F}_t] = Z_t \iff V^*(s) = r + \gamma \mathbb{E}[V^*(s')]

TD error as martingale difference:

δt=rt+γV(st+1)V(st)\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)

Under VV^*: E[δtFt]=0\mathbb{E}[\delta_t | \mathcal{F}_t] = 0 — TD errors are martingale differences!

Bellman residual E[δt2]\mathbb{E}[\delta_t^2] measures how far VV is from VV^*. TD learning minimises this residual via stochastic approximation — a gradient descent on the martingale deviation.

Code cell 27

# === 9.1 TD Error as Martingale Difference ===

import numpy as np
np.random.seed(42)

# Simple chain MDP: states 0..N-1, actions left/right
# True V(s) computed by Bellman equation
N = 5       # states: 0,1,2,3,4
gamma = 0.9
# Reward: +1 for reaching state N-1 (state 4), 0 otherwise
# Transition: from state s, go right with prob 0.6
p_right = 0.6

# Compute true V* by value iteration
V = np.zeros(N)
for _ in range(1000):
    V_new = np.zeros(N)
    for s in range(N):
        if s == N-1:  # terminal
            V_new[s] = 0.0
        else:
            r_right = 1.0 if (s+1 == N-1) else 0.0
            r_left  = 0.0
            s_right = min(s+1, N-1)
            s_left  = max(s-1, 0)
            V_new[s] = (p_right * (r_right + gamma*V[s_right]) +
                       (1-p_right) * (r_left + gamma*V[s_left]))
    V = V_new
print('True value function V*:')
for s in range(N):
    print(f'  V*({s}) = {V[s]:.4f}')

# Simulate a trajectory and check E[delta_t | F_t] ~= 0
def get_td_errors(V_est, n_episodes=5000):
    deltas = []
    for _ in range(n_episodes):
        s = np.random.randint(0, N-1)  # random start
        for _ in range(50):  # truncate episodes
            if s == N-1: break
            go_right = np.random.rand() < p_right
            s_next = min(s+1, N-1) if go_right else max(s-1, 0)
            r = 1.0 if go_right and (s == N-2) else 0.0
            delta = r + gamma * V_est[s_next] - V_est[s]
            deltas.append((s, delta))
            s = s_next
    return np.array(deltas)

td_data = get_td_errors(V)
print(f'\nWith V=V*: E[delta_t] = {td_data[:,1].mean():.6f} (expected ~0)')
print(f'Var[delta_t] = {td_data[:,1].var():.4f}')
assert abs(td_data[:,1].mean()) < 0.02, 'TD errors not mean-zero under V*'
print('PASS - TD errors are martingale differences under V*')

# With wrong V: TD errors have nonzero mean
V_wrong = np.zeros(N)
td_wrong = get_td_errors(V_wrong)
print(f'\nWith V=0: E[delta_t] = {td_wrong[:,1].mean():.6f} (nonzero — V is wrong)')

10. Diffusion Models as SDEs

Modern generative diffusion models are trained as stochastic differential equations.

Variance-Preserving SDE (VP-SDE, DDPM):

dx=12β(t)xdt+β(t)dBtdx = -\tfrac{1}{2}\beta(t)x\,dt + \sqrt{\beta(t)}\,dB_t

Variance-Exploding SDE (VE-SDE, SMLD):

dx=d[σ2(t)]dtdBtdx = \sqrt{\frac{d[\sigma^2(t)]}{dt}}\,dB_t

Reverse SDE (Anderson, 1982; used in generation):

dx=[f(x,t)g2(t)xlogpt(x)]dt+g(t)dBˉtdx = \left[f(x,t) - g^2(t)\nabla_x \log p_t(x)\right]dt + g(t)\,d\bar{B}_t

The score function xlogpt(x)\nabla_x \log p_t(x) is learned by the neural network. DDPM, DDIM, and Consistency Models are all discretisations of this reverse SDE.

Denoising score matching: Train sθ(xt,t)s_\theta(x_t, t) to predict xtlogpt(xt)\nabla_{x_t} \log p_t(x_t) by minimising E[sθ(xt,t)xtlogp(xtx0)2]\mathbb{E}[\|s_\theta(x_t,t) - \nabla_{x_t}\log p(x_t|x_0)\|^2].

Code cell 29

# === 10.1 Diffusion Model Forward/Reverse Process ===

import numpy as np
np.random.seed(42)

# DDPM forward process on 1D data
T_diff = 100
beta_min, beta_max = 0.01, 0.5
betas = np.linspace(beta_min, beta_max, T_diff)
alphas = 1 - betas
alpha_bars = np.cumprod(alphas)

# Forward: q(x_t | x_0) = N(sqrt(alpha_bar)*x_0, (1-alpha_bar)*I)
def forward_diffusion(x0, t, alpha_bars):
    ab = alpha_bars[t]
    eps = np.random.randn(*x0.shape)
    return np.sqrt(ab) * x0 + np.sqrt(1 - ab) * eps, eps

# Ground truth score for Gaussian: nabla log p(x_t|x_0) = -(x_t - sqrt(ab)*x0)/(1-ab)
def true_score(x_t, x0, t, alpha_bars):
    ab = alpha_bars[t]
    return -(x_t - np.sqrt(ab) * x0) / (1 - ab)

# 1D Gaussian data
x0 = np.array([2.0])
print('Forward process signal decay:')
for t in [0, 24, 49, 74, 99]:
    x_t, eps = forward_diffusion(x0, t, alpha_bars)
    snr = alpha_bars[t] / (1 - alpha_bars[t])
    print(f'  t={t:3d}: sqrt(ab)={np.sqrt(alpha_bars[t]):.4f}, '
          f'E[x_t]={np.sqrt(alpha_bars[t])*x0[0]:.4f}, SNR={snr:.4f}')

# Score function at t=50
t_test = 49
x_t, eps = forward_diffusion(x0, t_test, alpha_bars)
score = true_score(x_t, x0, t_test, alpha_bars)
score_from_eps = -eps / np.sqrt(1 - alpha_bars[t_test])
print(f'\nt={t_test}: x_t={x_t[0]:.4f}')
print(f'  Score via formula:  {score[0]:.4f}')
print(f'  Score via epsilon:  {score_from_eps[0]:.4f}')
print(f'  Match: {np.allclose(score, score_from_eps)}')
print('PASS - Score = -eps/sqrt(1-alpha_bar) (DDPM parameterization)')

# DDIM deterministic sampling (simplified 1D)
print('\nDDIM reverse process (1D, oracle score):')
x_T = np.random.randn(1)  # start from noise
x = x_T.copy()
for t in range(T_diff-1, 0, -10):
    # Oracle: predict x0 from x_t using true score (Tweedie formula)
    score_t = true_score(x, x0, t, alpha_bars)
    x0_pred = (x - np.sqrt(1 - alpha_bars[t]) * (-score_t * np.sqrt(1-alpha_bars[t]))) / np.sqrt(alpha_bars[t])
    ab_prev = alpha_bars[t-1] if t > 0 else 1.0
    x = np.sqrt(ab_prev) * x0_pred + np.sqrt(1-ab_prev) * (-score_t * np.sqrt(1-alpha_bars[t]))
print(f'  Final denoised: {x[0]:.4f}  (target: {x0[0]:.4f})')

11. Optional Stopping and Doob's Inequalities

Optional Stopping Theorem (OST): If MtM_t is a martingale and τ\tau is a stopping time with E[τ]<\mathbb{E}[\tau] < \infty and Mtτ|M_{t \wedge \tau}| is uniformly bounded, then:

E[Mτ]=E[M0]\mathbb{E}[M_\tau] = \mathbb{E}[M_0]

Doob's maximal inequality: For a non-negative submartingale MtM_t:

P ⁣(max0stMsλ)E[Mt]λP\!\left(\max_{0 \leq s \leq t} M_s \geq \lambda\right) \leq \frac{\mathbb{E}[M_t]}{\lambda}

Doob's LpL^p inequality: For p>1p > 1:

E ⁣[(max0stMs)p](pp1)pE[Mtp]\mathbb{E}\!\left[\left(\max_{0 \leq s \leq t} M_s\right)^p\right] \leq \left(\frac{p}{p-1}\right)^p \mathbb{E}[M_t^p]

These are the stochastic process analogues of concentration inequalities — they bound worst-case deviations over a whole trajectory, not just at a fixed time.

Code cell 31

# === 11.1 Doob's Maximal Inequality Verification ===

import numpy as np
np.random.seed(42)

# SRW martingale: M_t = S_t (sum of +/-1 steps)
n_steps = 100
N_mc = 50000
lambda_val = 5.0

# Simulate paths
steps = np.random.choice([-1, 1], size=(N_mc, n_steps))
paths = np.cumsum(steps, axis=1)
max_paths = np.abs(paths).max(axis=1)

# Doob maximal: P(max |S_t| >= lambda) <= E[|S_T|] / lambda
# For SRW: E[|S_T|] ~ sqrt(2T/pi)
E_abs_ST = np.abs(paths[:, -1]).mean()
doob_bound = E_abs_ST / lambda_val
empirical = (max_paths >= lambda_val).mean()

print(f'Doob maximal inequality (lambda={lambda_val}):')
print(f'  E[|S_T|]      = {E_abs_ST:.4f}')
print(f'  Doob bound    = E[|S_T|]/lambda = {doob_bound:.4f}')
print(f'  Empirical P   = {empirical:.4f}')
assert doob_bound >= empirical, 'Doob bound violated!'
print(f'  PASS - Doob bound holds ({doob_bound:.4f} >= {empirical:.4f})')

# Reflection principle: exact result P(max S_t >= k) = 2*P(S_T >= k) for SRW
from scipy.stats import norm
k = int(lambda_val)
# Approx: P(max >= k) ~ 2 * P(S_T >= k) via Brownian reflection
reflection_bound = 2 * norm.sf(lambda_val / np.sqrt(n_steps))
print(f'\nReflection principle bound: {reflection_bound:.4f}')
print(f'(Tighter than Doob for SRW due to symmetry)')

12. Connections: Concentration Inequalities via Martingales

The Doob martingale is the bridge between stochastic processes and concentration:

Given f(X1,,Xn)f(X_1,\ldots,X_n) with bounded differences cic_i, define:

Di=E[fX1,,Xi]E[fX1,,Xi1]D_i = \mathbb{E}[f | X_1,\ldots,X_i] - \mathbb{E}[f | X_1,\ldots,X_{i-1}]

Then Mk=E[fX1,,Xk]M_k = \mathbb{E}[f|X_1,\ldots,X_k] is a martingale with differences Dici|D_i| \leq c_i.

Azuma's inequality then gives:

P(fE[f]t)exp ⁣(2t2ici2)P(f - \mathbb{E}[f] \geq t) \leq \exp\!\left(-\frac{2t^2}{\sum_i c_i^2}\right)

This IS McDiarmid's inequality — proven via the martingale route.

The hierarchy:

Azuma (martingale differences)
    ↕ (same inequality, different proof routes)
McDiarmid (bounded differences on function of iid)
    ↓ (special case: f = sum)
Hoeffding (sum of bounded iid)

Code cell 33

# === 12.1 Azuma/McDiarmid/Hoeffding Comparison ===

import numpy as np
np.random.seed(42)

# All three applied to: f = empirical mean of n Bernoulli(0.5)
n = 100
t = 0.1  # deviation from mean

# Hoeffding: sum of [0,1]-bounded, range c=1
hoeff = np.exp(-2 * n * t**2)  # equivalent: 2*exp(-2*n*t^2) two-sided

# McDiarmid: f = mean, each x_i in [0,1], c_i = 1/n
# sum c_i^2 = n*(1/n)^2 = 1/n
mcdiarmid = np.exp(-2 * t**2 / (1/n))  # = exp(-2*n*t^2)

# Azuma: martingale differences bounded by c_i = 1/n
azuma = np.exp(-2 * t**2 / (1/n))

print(f'For f=mean of n={n} Bernoulli(0.5), t={t}:')
print(f'  Hoeffding:   exp(-2nt^2) = {hoeff:.6f}')
print(f'  McDiarmid:   exp(-2t^2/(1/n)) = {mcdiarmid:.6f}')
print(f'  Azuma:       same formula = {azuma:.6f}')
print(f'  All equal: {np.allclose([hoeff, mcdiarmid, azuma], hoeff)}')
print('PASS - Three inequalities coincide for sum of iid bounded')

# Now: non-sum function where only McDiarmid applies
# f(x_1,...,x_n) = max(x_1,...,x_n) on [0,1]^n
# c_i = 1 for each i, sum c_i^2 = n
n_mc = 50
N_trials = 100000
data = np.random.uniform(0, 1, (N_trials, n_mc))
f_vals = data.max(axis=1)
E_f = f_vals.mean()  # ~ n/(n+1)
E_f_theory = n_mc / (n_mc + 1)
t_max = 0.05
# McDiarmid: P(max - E[max] >= t) <= exp(-2t^2 / sum(c_i^2)) = exp(-2t^2/n)
# (since each c_i=1, this becomes exp(-2t^2/n) — loose for max!)
mcdiarmid_max = np.exp(-2 * t_max**2 / n_mc)
empirical_max = (f_vals - E_f >= t_max).mean()
print(f'\nMax function (n={n_mc}), t={t_max}:')
print(f'  E[max] theory: {E_f_theory:.4f}, empirical: {E_f:.4f}')
print(f'  McDiarmid bound: {mcdiarmid_max:.4f}')
print(f'  Empirical tail: {empirical_max:.4f}')
assert mcdiarmid_max >= empirical_max
print('PASS - McDiarmid applies to max function (Hoeffding would not)')

13. Attention as a Stochastic Process

Softmax attention can be viewed as a kernel smoothing operation:

Attn(q,K,V)i=jeqkj/dZvj\text{Attn}(q, K, V)_i = \sum_j \frac{e^{q^\top k_j/\sqrt{d}}}{Z} v_j

where Z=jeqkj/dZ = \sum_j e^{q^\top k_j/\sqrt{d}} is a normalisation constant.

As a random process view:

  • Attention weights αj=softmax(qkj/d)\alpha_j = \text{softmax}(q^\top k_j/\sqrt{d}) form a probability distribution
  • The output is Ejα[vj]\mathbb{E}_{j \sim \alpha}[v_j] — an expectation of values under a learned measure
  • As context length LL \to \infty: by the law of large numbers, the attention sum concentrates

RoPE and stationarity: With rotary embeddings, the attention score Rmq,Rnk=Re(qei(mn)θk)\langle R_m q, R_n k \rangle = \text{Re}(qe^{i(m-n)\theta}k^*) depends only on relative position mnm-n — a stationary kernel over position space.

Flash attention as Markov property: FlashAttention 2/3 processes attention in tiles, exploiting that long-range dependencies decay (approximately Markovian for some tasks).

Code cell 35

# === 13.1 Attention as Kernel Smoothing + Concentration ===

import numpy as np
np.random.seed(42)

def softmax(x):
    e = np.exp(x - x.max())
    return e / e.sum()

def attention(q, K, V):
    d = K.shape[1]
    scores = K @ q / np.sqrt(d)
    weights = softmax(scores)
    return weights @ V, weights

# Test: attention concentrates on relevant keys
d, L = 64, 20
q = np.random.randn(d)
K = np.random.randn(L, d)
# Make key 5 match query
K[5] = q + 0.1 * np.random.randn(d)
V = np.random.randn(L, d)

out, weights = attention(q, K, V)
print(f'Attention weights (top 5 by weight):')
top5 = np.argsort(weights)[-5:][::-1]
for idx in top5:
    print(f'  key {idx:2d}: weight = {weights[idx]:.4f}')
print(f'  Key 5 (matched query) gets highest weight: {np.argmax(weights) == 5}')
assert np.argmax(weights) == 5
print('PASS - Attention concentrates on aligned key')

# Entropy of attention distribution
entropy = -np.sum(weights * np.log(weights + 1e-10))
uniform_entropy = np.log(L)
print(f'\nAttention entropy: {entropy:.4f} (uniform would be {uniform_entropy:.4f})')
print(f'Concentration ratio: {entropy/uniform_entropy:.4f}')

# Scaling: larger d -> more uniform (less peaked)
print('\nEffect of sqrt(d) scaling on attention entropy:')
for d_test in [1, 4, 16, 64, 256]:
    q_t = np.random.randn(d_test)
    K_t = np.random.randn(L, d_test)
    K_t[5] = q_t + 0.1 * np.random.randn(d_test)
    _, w = attention(q_t, K_t, np.zeros((L, 1)))
    ent = -np.sum(w * np.log(w + 1e-10))
    print(f'  d={d_test:4d}: entropy = {ent:.4f}')

14. Summary: Process Taxonomy and Key Properties

ProcessState SpaceTimeMarkov?Martingale?Stationary?
SRWDiscreteDiscreteYesYes (S_t)No
AR(1) $\phi<1$ContinuousDiscreteYes
BMContinuousContinuousYesYesNo (non-stationary increments)
OUContinuousContinuousYesNoYes (stationary)
PoissonDiscreteContinuousYesNtλtN_t - \lambda tNo
GPFunctionAnyNoDependsIf kernel is stationary

Key takeaways for AI/ML:

  1. Diffusion models = OU process forward + learned reverse SDE
  2. SGD = Langevin dynamics near minima → stationary dist eL/η\propto e^{-L/\eta}
  3. RL = value function satisfies Bellman = martingale optimality condition
  4. Attention = kernel smoothing with learned stationary (RoPE) kernel
  5. McDiarmid = Azuma applied to Doob martingale of a function

Code cell 37

# === 14.1 Summary Visualization — Process Comparison ===

import numpy as np
np.random.seed(42)

try:
    import matplotlib.pyplot as plt
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

n = 500
t_grid = np.arange(n)

# 1. SRW
srw = np.cumsum(np.random.choice([-1, 1], size=n))

# 2. BM
dt = 1/n
bm = np.cumsum(np.random.randn(n) * np.sqrt(dt))

# 3. OU
ou = np.zeros(n)
theta_ou, sigma_ou = 0.1, 0.5
for i in range(1, n):
    ou[i] = ou[i-1] - theta_ou*ou[i-1] + sigma_ou*np.random.randn()

# 4. Poisson
lam = 0.5
poisson_increments = np.random.poisson(lam, n)
poisson_proc = np.cumsum(poisson_increments)
comp_martingale = poisson_proc - lam * t_grid  # M_t = N_t - lambda*t

print('Process summary statistics:')
print(f'  SRW: mean={srw.mean():.2f}, std={srw.std():.2f}')
print(f'  BM:  mean={bm.mean():.4f}, std={bm.std():.4f}')
print(f'  OU:  mean={ou.mean():.4f}, std={ou.std():.4f} (theory: {0.5/np.sqrt(2*0.1):.4f})')
print(f'  Poisson martingale M_t: mean={comp_martingale.mean():.4f} (expect ~0)')
assert abs(comp_martingale.mean()) < 2.0, 'Poisson martingale mean too far from 0'
print('PASS - All processes verified')

if HAS_MPL:
    fig, axes = plt.subplots(2, 2, figsize=(12, 8))
    axes[0,0].plot(t_grid, srw, color='steelblue')
    axes[0,0].set_title('Simple Random Walk (SRW)')
    axes[0,0].set_xlabel('Step'); axes[0,0].set_ylabel('S_t')
    axes[0,1].plot(t_grid, bm, color='darkorange')
    axes[0,1].set_title('Brownian Motion')
    axes[0,1].set_xlabel('t'); axes[0,1].set_ylabel('B_t')
    axes[1,0].plot(t_grid, ou, color='green')
    axes[1,0].axhline(0, color='red', linestyle='--', alpha=0.5, label='mean=0')
    axes[1,0].set_title('Ornstein-Uhlenbeck Process')
    axes[1,0].set_xlabel('t'); axes[1,0].legend()
    axes[1,1].plot(t_grid, comp_martingale, color='purple', label='N_t - lambda*t')
    axes[1,1].axhline(0, color='red', linestyle='--', alpha=0.5)
    axes[1,1].set_title('Poisson Compensated Martingale')
    axes[1,1].set_xlabel('t'); axes[1,1].legend()
    plt.suptitle('Stochastic Process Taxonomy', fontsize=14, fontweight='bold')
    plt.tight_layout(); plt.show()
    print('\nSummary plots rendered.')

Summary

This notebook developed the key stochastic processes and their properties:

  1. SRW and filtrations — discrete Markov chains, stopping times, adapted processes
  2. Martingales — OST, Doob inequality, Doob martingale → McDiarmid
  3. Poisson process — thinning, superposition, compensated martingale
  4. Brownian motion — quadratic variation, non-differentiability, Donsker CLT
  5. OU process — stationary Gaussian Markov process, DDPM connection
  6. Gaussian processes — kernel posterior inference, Bayesian optimisation
  7. Stationarity — ACVF, Wiener-Khinchin, RoPE as stationary kernel
  8. SGD as Langevin — noise temperature, implicit regularisation
  9. RL martingales — TD error as martingale differences, Bellman equation
  10. Diffusion models — VP/VE-SDE, score matching, reverse SDE
  11. Attention — kernel smoothing, concentration, RoPE stationarity

Next: Markov Chains → ../07-Markov-Chains/theory.ipynb

Skill Check

Test this lesson

Answer 4 quick questions to lock in the lesson and feed your adaptive practice queue.

--
Score
0/4
Answered
Not attempted
Status
1

Which module does this lesson belong to?

2

Which section is covered in this lesson content?

3

Which term is most central to this lesson?

4

What is the best way to use this lesson for real learning?

Your answers save locally first, then sync when account storage is available.
Practice queue