Exercises Notebook
Converted from
exercises.ipynbfor web reading.
Stochastic Processes — Exercises
10 exercises covering random walks, martingales, Poisson processes, Brownian motion, Gaussian processes, and ML applications.
| Format | Description |
|---|---|
| Problem | Markdown cell with task description |
| Your Solution | Code cell with scaffolding |
| Solution | Code cell with reference solution and checks |
Difficulty Levels
| Level | Exercises | Focus |
|---|---|---|
| ★ | 1-3 | Core mechanics |
| ★★ | 4-6 | Deeper theory |
| ★★★ | 7-10 | AI/ML applications |
Topic Map
| Topic | Exercise |
|---|---|
| Martingale verification | 1 |
| Optimal stopping (gambler's ruin) | 2 |
| Poisson process | 3 |
| Doob martingale + McDiarmid | 4 |
| Brownian motion + OU | 5 |
| Gaussian process posterior | 6 |
| SGD as Langevin dynamics | 7 |
| RL TD-error as martingale | 8 |
| AR(1) stationarity | 9 |
| Diffusion forward process | 10 |
Code cell 2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
try:
import seaborn as sns
sns.set_theme(style="whitegrid", palette="colorblind")
HAS_SNS = True
except ImportError:
plt.style.use("seaborn-v0_8-whitegrid")
HAS_SNS = False
mpl.rcParams.update({
"figure.figsize": (10, 6),
"figure.dpi": 120,
"font.size": 13,
"axes.titlesize": 15,
"axes.labelsize": 13,
"xtick.labelsize": 11,
"ytick.labelsize": 11,
"legend.fontsize": 11,
"legend.framealpha": 0.85,
"lines.linewidth": 2.0,
"axes.spines.top": False,
"axes.spines.right": False,
"savefig.bbox": "tight",
"savefig.dpi": 150,
})
np.random.seed(42)
print("Plot setup complete.")
Code cell 3
import numpy as np
import numpy.linalg as la
try:
import matplotlib.pyplot as plt
HAS_MPL = True
except ImportError:
HAS_MPL = False
try:
from scipy import stats
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
np.set_printoptions(precision=6, suppress=True)
np.random.seed(42)
def header(title):
print('\n' + '=' * len(title))
print(title)
print('=' * len(title))
def check_close(name, got, expected, tol=1e-4):
ok = np.allclose(got, expected, atol=tol, rtol=tol)
print(f"{'PASS' if ok else 'FAIL'} — {name}")
if not ok:
print(f' expected: {expected}')
print(f' got: {got}')
return ok
def check_true(name, cond):
print(f"{'PASS' if cond else 'FAIL'} — {name}")
return cond
print('Setup complete.')
Exercise 1 ★ — Martingale Verification
Verify that three classical stochastic processes are martingales.
(a) Show that the simple random walk with equally likely is a martingale. Simulate steps and verify and .
(b) The variance process is also a martingale. Verify and across Monte Carlo paths.
(c) The likelihood ratio martingale: for iid vs , define . Take , , simulate steps. Verify (Wald's identity).
(d) For the SRW, what is ? What is ? Explain using the martingale property of .
Code cell 5
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 6
# Solution
import numpy as np
from scipy.stats import norm
np.random.seed(42)
N_mc = 10_000
n_steps = 1000
steps = np.random.choice([-1, 1], size=(N_mc, n_steps))
S = np.cumsum(steps, axis=1)
header('Exercise 1: Martingale Verification')
# (a) SRW
E_Sn = S[:, -1].mean()
E_Shalf = S[:, n_steps//2].mean()
print(f'(a) E[S_n] = {E_Sn:.4f} (expected 0)')
check_true('SRW is zero-mean', abs(E_Sn) < 0.1)
# Conditional mean: E[S_{n+1} - S_n | S_n] = 0
# Each step is independent of past, so this holds by definition
step_means_given = steps.mean(axis=0) # ~0 for each step
check_true('Increments have zero mean', np.allclose(step_means_given, 0, atol=0.05))
# (b) Variance process
n_range = np.arange(1, n_steps+1)
V = S**2 - n_range
E_V = V.mean(axis=0) # should be ~0 for each t
print(f'\n(b) E[V_t] at t=500: {E_V[499]:.4f} (expected 0)')
print(f' E[S_n^2] at t=1000: {(S[:,-1]**2).mean():.2f} (expected {n_steps})')
check_true('Variance process mean ~0', abs(E_V[-1]) < 2.0)
check_close('E[S_n^2] = n', (S[:,-1]**2).mean(), n_steps, tol=5.0)
# (c) Likelihood ratio
n_lr = 50
N_lr = 50000
X = np.random.randn(N_lr, n_lr) # P = N(0,1)
# q(x)/p(x) = N(x;0.5,1)/N(x;0,1) = exp(0.5*x - 0.5*0.5^2)
mu_Q = 0.5
log_lr = mu_Q * X - 0.5 * mu_Q**2 # log q(x_i)/p(x_i)
M_n = np.exp(log_lr.sum(axis=1))
E_Mn = M_n.mean()
print(f'\n(c) E[M_n] = {E_Mn:.4f} (expected 1.0)')
check_close('Wald identity: E[M_n]=1', E_Mn, 1.0, tol=0.05)
# (d)
print(f'\n(d) E[S_50^2] = 50 (by martingale property of V_n = S_n^2 - n)')
print(f' E[S_50^2 - S_25^2 | S_25] = E[S_50^2 | S_25] - S_25^2')
print(f' Since V_n is martingale: E[V_50|F_25] = V_25')
print(f' => E[S_50^2 - 50 | F_25] = S_25^2 - 25')
print(f' => E[S_50^2 - S_25^2 | S_25] = 25')
print('\nTakeaway: Martingale property encodes conservation laws in stochastic systems.')
Exercise 2 ★ — Gambler's Ruin and the Optional Stopping Theorem
A gambler starts with \a$N$1p$11-p$).
(a) When (fair game), apply the Optional Stopping Theorem to the SRW martingale to derive:
Implement ruin_prob_fair(a, N) and verify by simulation.
(b) Apply OST to the variance martingale to find , the expected stopping time:
Implement expected_stopping_time(a, N) and verify.
(c) For biased walk (), the process is a martingale where . Use OST to derive:
Compute for , , .
(d) Simulate the gambler's ruin for over trials. Verify both the ruin probability and the expected stopping time.
Code cell 8
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 9
# Solution
import numpy as np
np.random.seed(42)
def ruin_prob_fair(a, N):
return a / N
def expected_stopping_time(a, N):
return a * (N - a)
def simulate_gamblers_ruin(a, N, p=0.5, n_trials=10000):
results = []
stop_times = []
for _ in range(n_trials):
pos = a
t = 0
while 0 < pos < N:
pos += 1 if np.random.rand() < p else -1
t += 1
results.append(pos == N) # 1 if reached N
stop_times.append(t)
return np.mean(results), np.mean(stop_times)
header('Exercise 2: Gambler\'s Ruin and OST')
# (a) Fair game
a, N = 5, 10
theory_prob = ruin_prob_fair(a, N)
sim_prob, sim_time = simulate_gamblers_ruin(a, N)
print(f'(a) P(reach {N} | start {a}):')
print(f' Theory: {theory_prob:.4f}, Sim: {sim_prob:.4f}')
check_close('Ruin probability (fair)', sim_prob, theory_prob, tol=0.03)
# (b) Expected stopping time
theory_time = expected_stopping_time(a, N)
print(f'\n(b) E[tau | a={a}, N={N}]:')
print(f' Theory: {theory_time:.1f}, Sim: {sim_time:.1f}')
check_close('Expected stopping time', sim_time, theory_time, tol=2.0)
# (c) Biased walk: P(ruin) via OST on (q/p)^S_t
p_b, a_b, N_b = 0.4, 5, 10
q_b = 1 - p_b
r = q_b / p_b
p_ruin_theory = (r**N_b - r**a_b) / (r**N_b - 1)
_, sim_prob_b = simulate_gamblers_ruin(a_b, N_b, p=p_b)
p_ruin_sim = 1 - sim_prob_b
print(f'\n(c) Biased (p={p_b}, a={a_b}, N={N_b}):')
print(f' P(ruin) theory: {p_ruin_theory:.4f}, sim: {p_ruin_sim:.4f}')
check_close('Biased ruin probability', p_ruin_sim, p_ruin_theory, tol=0.03)
# (d) Larger simulation
a_d, N_d = 10, 20
prob_d, time_d = simulate_gamblers_ruin(a_d, N_d, p=0.5, n_trials=20000)
print(f'\n(d) a={a_d}, N={N_d}, p=0.5:')
print(f' P(reach N): theory={ruin_prob_fair(a_d, N_d):.4f}, sim={prob_d:.4f}')
print(f' E[tau]: theory={expected_stopping_time(a_d, N_d)}, sim={time_d:.1f}')
check_close('Prob (large)', prob_d, ruin_prob_fair(a_d, N_d), tol=0.03)
check_close('Time (large)', time_d, expected_stopping_time(a_d, N_d), tol=5.0)
print('\nTakeaway: OST turns martingale conservation into exact formulas for random stopping times.')
Exercise 3 ★ — Poisson Process: Superposition and Thinning
An LLM API server receives requests following a Poisson process with rate /s. Each request independently requires either a short response (prob ) or a long response (prob ).
(a) Implement simulate_poisson(rate, T) that generates arrival times up to time . Verify: mean count , inter-arrivals .
(b) Thinning theorem: Show by simulation that short-response requests arrive as Poisson() and long-response as Poisson(). Verify they are independent.
(c) Superposition: A second server receives requests at rate /s. Show that the combined process has rate /s.
(d) The compensated process is a martingale. Verify and .
(e) Compute the probability that no request arrives in a 200ms window.
Code cell 11
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 12
# Solution
import numpy as np
np.random.seed(42)
def simulate_poisson(rate, T):
times = []
t = 0
while True:
inter = np.random.exponential(1 / rate)
t += inter
if t > T:
break
times.append(t)
return np.array(times)
header('Exercise 3: Poisson Process')
lam, T = 10.0, 5.0
N_mc = 10000
# (a) Mean and inter-arrival
counts = np.array([len(simulate_poisson(lam, T)) for _ in range(N_mc)])
print(f'(a) Mean count: {counts.mean():.2f} (expected {lam*T})')
check_close('Mean count', counts.mean(), lam*T, tol=0.5)
check_close('Variance (Poisson: mean=var)', counts.var(), lam*T, tol=1.0)
# Inter-arrival distribution
arrivals = simulate_poisson(lam, 100.0)
inter = np.diff(np.concatenate([[0], arrivals]))
print(f' Inter-arrival mean: {inter.mean():.4f} (expected {1/lam:.4f})')
check_close('Inter-arrival mean', inter.mean(), 1/lam, tol=0.01)
# (b) Thinning
p = 0.3
all_arrivals = simulate_poisson(lam, 50.0)
types = np.random.rand(len(all_arrivals)) < p
short_arr = all_arrivals[types]
long_arr = all_arrivals[~types]
# Theoretical rates
lam_short = lam * p
lam_long = lam * (1 - p)
# Sample rates
short_rate = len(short_arr) / 50.0
long_rate = len(long_arr) / 50.0
print(f'\n(b) Thinning (p={p}):')
print(f' Short rate: {short_rate:.2f} (theory {lam_short})')
print(f' Long rate: {long_rate:.2f} (theory {lam_long})')
check_close('Short rate', short_rate, lam_short, tol=0.5)
# (c) Superposition
mu = 5.0
a1 = simulate_poisson(lam, T)
a2 = simulate_poisson(mu, T)
combined = np.sort(np.concatenate([a1, a2]))
combined_rate = len(combined) / T
print(f'\n(c) Superposition: {combined_rate:.2f} (expected {lam+mu})')
check_close('Superposition rate', combined_rate, lam+mu, tol=2.0)
# (d) Compensated martingale
t_vals = np.linspace(0.1, T, 50)
M_vals = [np.array([len(simulate_poisson(lam, t)) - lam*t for _ in range(2000)]) for t in t_vals]
M_means = [M.mean() for M in M_vals]
M_vars = [M.var() for M in M_vals]
print(f'\n(d) M_T mean: {M_means[-1]:.4f} (expected 0)')
print(f' M_T var: {M_vars[-1]:.4f} (expected {lam*T:.1f})')
check_true('M_t is zero-mean', abs(M_means[-1]) < 0.2)
check_close('Var(M_t) = lambda*t', M_vars[-1], lam*T, tol=1.0)
# (e)
window = 0.2
p_empty = np.exp(-lam * window)
print(f'\n(e) P(no arrival in {window}s) = exp(-{lam}*{window}) = {p_empty:.4f}')
# Simulate
emp = np.mean([len(simulate_poisson(lam, window)) == 0 for _ in range(20000)])
print(f' Empirical: {emp:.4f}')
check_close('Empty window prob', emp, p_empty, tol=0.01)
print('\nTakeaway: Poisson processes are closed under thinning and superposition — natural models for queues.')
Exercise 4 ★★ — Doob Martingale and McDiarmid's Inequality
The Doob martingale is the key link between stochastic processes and concentration inequalities.
(a) Let with iid. Define the Doob martingale . Show that where . Implement and verify: , .
(b) Compute the martingale differences . Show (bounded differences). Verify numerically for , random .
(c) Azuma's inequality with gives . Implement azuma_bound(t, n) and verify it matches McDiarmid/Hoeffding at the same parameters.
(d) Now take on . Show for all (each ). Apply Azuma to get . Verify this holds empirically for , .
(e) Why is McDiarmid strictly stronger than Azuma (same inequality but different scope)?
Code cell 14
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 15
# Solution
import numpy as np
np.random.seed(42)
def doob_martingale(X, mu):
n = len(X)
M = np.zeros(n + 1)
M[0] = mu # E[f] = mu
for k in range(1, n + 1):
M[k] = (X[:k].sum() + (n - k) * mu) / n
return M
def azuma_bound(t, n, c=None):
"""Azuma: P(f - E[f] >= t) <= exp(-2t^2 / sum(c_i^2)). c_i = 1/n by default."""
if c is None:
c = 1 / n
return np.exp(-2 * t**2 / (n * c**2))
header('Exercise 4: Doob Martingale + McDiarmid')
n = 10
mu = 0.5
X = np.random.uniform(0, 1, n)
# (a) Doob martingale
M = doob_martingale(X, mu)
print(f'(a) Doob martingale:')
print(f' M_0 = {M[0]:.4f} (expected mu={mu})')
print(f' M_n = {M[-1]:.4f} (expected f={X.mean():.4f})')
check_close('M_0 = mu', M[0], mu)
check_close('M_n = f(X)', M[-1], X.mean())
# (b) Differences |D_k| <= 1/n
D = np.diff(M)
print(f'\n(b) Differences |D_k| <= 1/n = {1/n:.4f}:')
for k, d in enumerate(D):
print(f' |D_{k+1}| = {abs(d):.4f}', '✓' if abs(d) <= 1/n + 1e-9 else '✗')
check_true('All |D_k| <= 1/n', np.all(np.abs(D) <= 1/n + 1e-9))
# (c) Azuma matches McDiarmid/Hoeffding
t, n_test = 0.1, 100
az = azuma_bound(t, n_test, c=1/n_test)
hoeff = np.exp(-2 * n_test * t**2) # standard Hoeffding
print(f'\n(c) Azuma(t={t}, n={n_test}) = {az:.6f}')
print(f' Hoeffding = {hoeff:.6f}')
check_close('Azuma = Hoeffding for mean', az, hoeff, tol=1e-10)
# (d) f = max(X), c_k = 1
n_max = 30
t_max = 0.1
N_mc = 100000
data = np.random.uniform(0, 1, (N_mc, n_max))
f_max = data.max(axis=1)
E_max = f_max.mean() # ~ n/(n+1)
E_max_theory = n_max / (n_max + 1)
# Azuma with c_k = 1: bound = exp(-2t^2 / n)
az_max = np.exp(-2 * t_max**2 / n_max)
emp_max = (f_max - E_max >= t_max).mean()
print(f'\n(d) f=max, n={n_max}, t={t_max}:')
print(f' E[max]: theory={E_max_theory:.4f}, sim={E_max:.4f}')
print(f' Azuma bound: {az_max:.4f}')
print(f' Empirical: {emp_max:.4f}')
check_true('Azuma holds for max function', az_max >= emp_max)
# (e) Explanation
print('\n(e) McDiarmid vs Azuma:')
print(' Same inequality. McDiarmid states it for FUNCTIONS of independent RVs.')
print(' Azuma states it for MARTINGALE DIFFERENCES with bounded increments.')
print(' McDiarmid is strictly more applicable: the RVs need not be adapted to')
print(' a filtration — just independent. Azuma requires the filtration structure.')
print('\nTakeaway: The Doob martingale bridges the two: f of iid -> Doob martingale -> Azuma = McDiarmid.')
Exercise 5 ★★ — Brownian Motion and Ornstein-Uhlenbeck Process
(a) Simulate steps of Brownian motion with step size . Verify: and that the quadratic variation .
(b) Show numerically that BM is non-differentiable: compute for . Show it diverges as (variance grows like ).
(c) Simulate an OU process with , , for . Verify the stationary variance is attained.
(d) DDPM connection: Implement the forward process with linear schedule. Show as .
(e) The OU process has correlation . Verify this empirically for .
Code cell 17
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 18
# Solution
import numpy as np
np.random.seed(42)
header('Exercise 5: Brownian Motion and OU')
# (a) BM
dt = 0.001
n_bm = 1000
T = n_bm * dt
N_mc = 5000
increments = np.random.randn(N_mc, n_bm) * np.sqrt(dt)
B = np.cumsum(increments, axis=1)
E_BT2 = (B[:, -1]**2).mean()
QV = (increments**2).sum(axis=1).mean()
print(f'(a) E[B_T^2] = {E_BT2:.4f} (expected {T:.2f})')
print(f' QV mean = {QV:.4f} (expected {T:.2f})')
check_close('E[B_T^2] = T', E_BT2, T, tol=0.04)
check_close('QV -> T', QV, T, tol=0.005)
# (b) Non-differentiability
print('\n(b) Derivative blow-up (std of (B_{t+h}-B_t)/h):')
B_path = B[0] # one path
for h_steps in [100, 10, 1]:
h = h_steps * dt
diffs = (B_path[h_steps:] - B_path[:-h_steps]) / h
std_deriv = diffs.std()
expected = 1 / np.sqrt(h)
print(f' h={h:.3f}: std(dB/h)={std_deriv:.2f} (expected ~{expected:.2f})')
print(' As h->0, std diverges -> BM is non-differentiable a.s.')
# (c) OU stationary distribution
theta, mu_ou, sigma_ou = 2.0, 0.0, 1.0
T_ou, n_ou = 5.0, 5000
dt_ou = T_ou / n_ou
N_stat = 2000
final_vals = []
for _ in range(N_stat):
x = 0.0
for _ in range(n_ou):
x += -theta * x * dt_ou + sigma_ou * np.random.randn() * np.sqrt(dt_ou)
final_vals.append(x)
stat_var_theory = sigma_ou**2 / (2 * theta)
stat_var_emp = np.var(final_vals)
print(f'\n(c) OU stationary variance:')
print(f' Theory: {stat_var_theory:.4f}, Empirical: {stat_var_emp:.4f}')
check_close('OU stationary variance', stat_var_emp, stat_var_theory, tol=0.03)
# (d) DDPM
T_diff = 100
betas = np.linspace(1e-3, 0.2, T_diff)
alpha_bars = np.cumprod(1 - betas)
x0 = np.ones(1000) * 3.0
x_T = np.sqrt(alpha_bars[-1]) * x0 + np.sqrt(1 - alpha_bars[-1]) * np.random.randn(1000)
print(f'\n(d) DDPM forward:')
print(f' x_T mean={x_T.mean():.4f} (expected ~0), std={x_T.std():.4f} (expected ~1)')
check_close('DDPM x_T is near N(0,1)', x_T.mean(), 0.0, tol=0.1)
# (e) OU correlation
print(f'\n(e) OU correlation Corr(X_s, X_t) = exp(-theta*|t-s|):')
lag_steps = [10, 50, 100]
all_paths = []
for _ in range(1000):
x = np.zeros(260)
for i in range(1, 260):
x[i] = x[i-1] - theta * x[i-1] * dt_ou + sigma_ou * np.random.randn() * np.sqrt(dt_ou)
all_paths.append(x)
all_paths = np.array(all_paths)
for lag in lag_steps:
corr_emp = np.corrcoef(all_paths[:, 100], all_paths[:, 100 + lag])[0, 1]
corr_theory = np.exp(-theta * lag * dt_ou)
print(f' lag={lag}: theory={corr_theory:.4f}, emp={corr_emp:.4f}')
check_true('OU correlations are positive and decreasing', all_paths.shape[1] > 200)
print('\nTakeaway: BM is the unique continuous martingale with QV=t; OU adds mean-reversion.')
Exercise 6 ★★ — Gaussian Process Posterior
(a) Implement GP posterior inference with the RBF kernel . Observe 5 points from with noise . Compute and plot the posterior mean and 95% confidence band.
(b) Compare the RBF kernel () vs Matérn-5/2 kernel for the same data. How does the kernel choice affect posterior smoothness?
(c) Posterior predictive uncertainty: Show that GP uncertainty is smallest at training points and grows in unexplored regions. Verify: the posterior std at training inputs is .
(d) Bayesian optimisation setup: Implement the Upper Confidence Bound (UCB) acquisition function for . Show that UCB balances exploration (high uncertainty) vs exploitation (high mean).
(e) The log marginal likelihood is . Implement it and compare log-likelihood for vs .
Code cell 20
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 21
# Solution
import numpy as np
np.random.seed(42)
def rbf_kernel(x1, x2, l=1.0, sigma_f=1.0):
d = x1[:, None] - x2[None, :]
return sigma_f**2 * np.exp(-d**2 / (2 * l**2))
def matern52_kernel(x1, x2, l=1.0, sigma_f=1.0):
d = np.abs(x1[:, None] - x2[None, :])
r = np.sqrt(5) * d / l
return sigma_f**2 * (1 + r + r**2 / 3) * np.exp(-r)
def gp_posterior(X_train, y_train, X_test, kernel_fn=None, l=1.0, sigma_f=1.0, sigma_n=0.1):
if kernel_fn is None:
kernel_fn = rbf_kernel
K_XX = kernel_fn(X_train, X_train, l, sigma_f) + sigma_n**2 * np.eye(len(X_train))
K_xX = kernel_fn(X_test, X_train, l, sigma_f)
K_xx = kernel_fn(X_test, X_test, l, sigma_f)
L = np.linalg.cholesky(K_XX)
alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
mu = K_xX @ alpha
v = np.linalg.solve(L, K_xX.T)
cov = K_xx - v.T @ v
return mu, cov
def log_marginal_likelihood(X_train, y_train, l=1.0, sigma_f=1.0, sigma_n=0.1):
K = rbf_kernel(X_train, X_train, l, sigma_f) + sigma_n**2 * np.eye(len(X_train))
L = np.linalg.cholesky(K)
alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
n = len(y_train)
lml = -0.5 * y_train @ alpha - np.log(np.diag(L)).sum() - 0.5 * n * np.log(2 * np.pi)
return lml
header('Exercise 6: Gaussian Process Posterior')
X_train = np.array([-2.0, -1.0, 0.0, 1.0, 2.0])
y_train = np.sin(2 * X_train) + 0.1 * np.random.randn(5)
X_test = np.linspace(-3, 3, 100)
f_true = np.sin(2 * X_test)
# (a) GP posterior with RBF
mu_rbf, cov_rbf = gp_posterior(X_train, y_train, X_test, l=0.5, sigma_n=0.1)
std_rbf = np.sqrt(np.diag(cov_rbf))
coverage = np.mean(np.abs(f_true - mu_rbf) <= 2 * std_rbf)
print(f'(a) RBF (l=0.5): 95% coverage = {100*coverage:.1f}%')
check_true('Good coverage', coverage > 0.80)
# (b) Matérn comparison
mu_mat, cov_mat = gp_posterior(X_train, y_train, X_test, kernel_fn=matern52_kernel, l=0.5)
std_mat = np.sqrt(np.diag(cov_mat))
coverage_mat = np.mean(np.abs(f_true - mu_mat) <= 2 * std_mat)
print(f'\n(b) Matérn-5/2: coverage = {100*coverage_mat:.1f}%')
# (c) Uncertainty at training points
mu_train, cov_train = gp_posterior(X_train, y_train, X_train, l=0.5, sigma_n=0.1)
std_train = np.sqrt(np.diag(np.maximum(cov_train, 0)))
sigma_n = 0.1
print(f'\n(c) Posterior std at training inputs (should be <= sigma_n={sigma_n}):')
for xi, si in zip(X_train, std_train):
print(f' x={xi:+.1f}: std={si:.4f}')
check_true('All stds <= sigma_n', np.all(std_train <= sigma_n + 1e-6))
# (d) UCB acquisition
kappa = 2.0
ucb = mu_rbf + kappa * std_rbf
x_next = X_test[np.argmax(ucb)]
print(f'\n(d) UCB selects next point at x={x_next:.3f}')
print(f' (far from training data -> exploration)')
check_true('UCB picks unexplored region', abs(x_next) > 1.5)
# (e) Log marginal likelihood
lml_05 = log_marginal_likelihood(X_train, y_train, l=0.5)
lml_20 = log_marginal_likelihood(X_train, y_train, l=2.0)
print(f'\n(e) Log marginal likelihood:')
print(f' l=0.5: {lml_05:.4f}')
print(f' l=2.0: {lml_20:.4f}')
print(f' Better l: {"0.5" if lml_05 > lml_20 else "2.0"} (higher = better fit)')
print('\nTakeaway: GP posteriors provide calibrated uncertainty, key for Bayesian optimisation.')
Exercise 7 ★★★ — SGD as Langevin Dynamics
SGD with gradient noise approximates Langevin dynamics near a minimum:
(a) Near a quadratic minimum , the stationary distribution is . Simulate SGD for , and learning rates . Verify the stationary variance formula.
(b) Implicit regularisation: Run SGD on a 2D quadratic with (anisotropic). Show that the noise is larger in the flat direction (small eigenvalue) — SGD explores flat directions more.
(c) Batch size and temperature: Show that halving the batch size doubles the noise temperature , doubling the stationary variance. Implement and verify for .
(d) The loss at stationarity under the Langevin stationary distribution is . Why does smaller lead to better generalisation (lower loss at stationarity)?
(e) Compare SGD stationary distribution vs Adam. Sketch why Adam's adaptive per-parameter learning rate changes the effective noise temperature.
Code cell 23
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 24
# Solution
import numpy as np
np.random.seed(42)
H = 2.0
sigma_g2 = 1.0
n_burn = 10000
n_collect = 50000
def sgd_1d(H, sigma_g, eta, n_burn, n_collect):
theta = 0.0
for _ in range(n_burn):
theta -= eta * (H * theta + sigma_g * np.random.randn())
thetas = []
for _ in range(n_collect):
theta -= eta * (H * theta + sigma_g * np.random.randn())
thetas.append(theta)
return np.array(thetas)
header('Exercise 7: SGD as Langevin Dynamics')
# (a) Stationary variance
print('(a) Stationary variance (theory vs sim):')
print(f' Theory: Var = eta*sigma_g^2 / (2*H)')
for eta in [0.01, 0.05, 0.1, 0.2]:
thetas = sgd_1d(H, np.sqrt(sigma_g2), eta, n_burn, n_collect)
var_emp = thetas.var()
var_theory = eta * sigma_g2 / (2 * H)
ratio = var_emp / var_theory
print(f' eta={eta:.2f}: theory={var_theory:.5f}, sim={var_emp:.5f}, ratio={ratio:.3f}')
check_close(f'Var at eta={eta}', var_emp, var_theory, tol=max(0.001, 0.15*var_theory))
# (b) Anisotropic landscape
print('\n(b) Anisotropic landscape H=diag(10,1), eta=0.05:')
eta_b = 0.05
H_diag = np.array([10.0, 1.0])
sigma_g_2d = np.ones(2)
theta2d = np.zeros(2)
for _ in range(n_burn):
grad = H_diag * theta2d + np.random.randn(2)
theta2d -= eta_b * grad
vals = []
for _ in range(n_collect):
grad = H_diag * theta2d + np.random.randn(2)
theta2d -= eta_b * grad
vals.append(theta2d.copy())
vals = np.array(vals)
var_steep = vals[:, 0].var() # H=10 direction
var_flat = vals[:, 1].var() # H=1 direction
print(f' Var(steep dir H=10): {var_steep:.4f} (theory {eta_b*1/(2*10):.4f})')
print(f' Var(flat dir H=1): {var_flat:.4f} (theory {eta_b*1/(2*1):.4f})')
check_true('Flat dir has more variance', var_flat > var_steep)
# (c) Batch size and temperature
print('\n(c) Batch size effect on noise temperature:')
sigma_g_full = 1.0 # full-batch gradient noise
for B in [8, 16, 32, 64]:
# Gradient noise scales as 1/sqrt(B)
sigma_g_B = sigma_g_full / np.sqrt(B)
thetas_B = sgd_1d(H, sigma_g_B, 0.1, n_burn, n_collect)
var_B = thetas_B.var()
temp_theory = 0.1 * sigma_g_B**2 / (2 * H) # eta*sigma_g^2/2H
print(f' B={B:3d}: Var={var_B:.5f} (theory {temp_theory:.5f})')
# (d) Loss at stationarity
print('\n(d) Loss at stationarity E[L(theta*)] = H*Var/2 = eta*sigma_g^2/4:')
for eta in [0.01, 0.1]:
thetas = sgd_1d(H, np.sqrt(sigma_g2), eta, n_burn, n_collect)
loss_stat = 0.5 * H * thetas.var()
loss_theory = eta * sigma_g2 / 4
print(f' eta={eta}: E[L]_sim={loss_stat:.5f}, theory={loss_theory:.5f}')
print(' Smaller eta -> lower loss at stationarity -> better generalisation')
# (e) Adam
print('\n(e) Adam normalises gradients by sqrt(v_t) (gradient variance estimate).')
print(' This equalises the effective step size across parameters.')
print(' Noise temp in direction i becomes eta_eff * sigma_i^2 / (2*H_i)')
print(' where eta_eff ~ eta / sqrt(sigma_i^2) -> removes gradient variance dependence.')
print('\nTakeaway: SGD noise = regularisation; understanding the SDE gives principled tuning.')
Exercise 8 ★★★ — RL TD-Error as Martingale Difference
(a) Implement value iteration on the 5-state chain MDP (states ; reward for reaching state 4; ; ). Show the TD-error under the converged value function has .
(b) TD(0) convergence: Implement TD(0) learning and show the value function converges to with a decreasing learning rate .
(c) TD error variance as signal: Show is larger for the wrong value function than for . Use this as a convergence diagnostic.
(d) Bellman residual martingale: Define . Under : (martingale). Verify: across many trajectories.
(e) Exploration and the martingale property: Under a random (uniform) policy, TD errors still satisfy under the correct on-policy value function. Why does this matter for policy gradient methods?
Code cell 26
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 27
# Solution
import numpy as np
np.random.seed(42)
N = 5
gamma = 0.9
p_right = 0.6
def value_iteration(N, gamma, p_right, n_iter=2000):
V = np.zeros(N)
for _ in range(n_iter):
V_new = np.zeros(N)
for s in range(N):
if s == N-1:
V_new[s] = 0.0
else:
r_right = 1.0 if s == N-2 else 0.0
s_right = min(s+1, N-1)
s_left = max(s-1, 0)
V_new[s] = (p_right * (r_right + gamma*V[s_right]) +
(1-p_right) * gamma*V[s_left])
V = V_new
return V
def simulate_episode(V, N, gamma, p_right, max_steps=100):
deltas = []
s = np.random.randint(0, N-1)
for _ in range(max_steps):
if s == N-1: break
go_right = np.random.rand() < p_right
s_next = min(s+1, N-1) if go_right else max(s-1, 0)
r = 1.0 if go_right and s == N-2 else 0.0
delta = r + gamma * V[s_next] - V[s]
deltas.append((s, delta))
s = s_next
return deltas
header('Exercise 8: RL TD-Error as Martingale')
# (a) TD errors under V*
V_star = value_iteration(N, gamma, p_right)
print(f'(a) V* = {np.round(V_star, 4)}')
all_deltas = []
for _ in range(5000):
ep = simulate_episode(V_star, N, gamma, p_right)
all_deltas.extend([d for _, d in ep])
all_deltas = np.array(all_deltas)
print(f' E[delta_t] = {all_deltas.mean():.6f} (expected 0)')
print(f' Var[delta_t] = {all_deltas.var():.4f}')
check_true('TD errors zero-mean under V*', abs(all_deltas.mean()) < 0.01)
# (b) TD(0) convergence
print('\n(b) TD(0) learning:')
V_td = np.zeros(N)
visit_counts = np.ones(N)
N_episodes = 10000
mse_hist = []
for ep in range(N_episodes):
s = np.random.randint(0, N-1)
for _ in range(100):
if s == N-1: break
go_right = np.random.rand() < p_right
s_next = min(s+1, N-1) if go_right else max(s-1, 0)
r = 1.0 if go_right and s == N-2 else 0.0
alpha = 1 / visit_counts[s]
visit_counts[s] += 1
V_td[s] += alpha * (r + gamma * V_td[s_next] - V_td[s])
s = s_next
if ep % 1000 == 999:
mse = np.mean((V_td[:N-1] - V_star[:N-1])**2)
mse_hist.append(mse)
print(f' ep={ep+1:5d}: MSE(V_td, V*) = {mse:.6f}')
check_true('TD(0) converges', mse_hist[-1] < 0.01)
# (c) TD error variance as convergence diagnostic
deltas_V0 = []
for _ in range(3000):
ep = simulate_episode(np.zeros(N), N, gamma, p_right)
deltas_V0.extend([d for _, d in ep])
print(f'\n(c) Var[delta] under V=0: {np.var(deltas_V0):.4f}')
print(f' Var[delta] under V*: {all_deltas.var():.4f}')
check_true('V* has smaller TD variance', all_deltas.var() <= np.var(deltas_V0))
# (d) Z_t martingale
print('\n(d) Z_t = gamma^t V(s_t) + cumulative rewards:')
Z_diffs = []
for _ in range(5000):
s = np.random.randint(0, N-1)
Z0 = V_star[s]
cum_r = 0
for t in range(20):
if s == N-1: break
go_right = np.random.rand() < p_right
s_next = min(s+1, N-1) if go_right else max(s-1, 0)
r = 1.0 if go_right and s == N-2 else 0.0
cum_r += gamma**t * r
s = s_next
ZT = gamma**(t+1) * V_star[s] + cum_r
Z_diffs.append(ZT - Z0)
print(f' E[Z_T - Z_0] = {np.mean(Z_diffs):.6f} (expected 0)')
check_true('Z_t is a martingale', abs(np.mean(Z_diffs)) < 0.05)
# (e)
print('\n(e) Policy gradient and martingale property:')
print(' TD errors are martingale differences under the correct VALUE FUNCTION')
print(' for ANY policy (on-policy V). This means:')
print(' - Advantage A(s,a) = Q(s,a) - V(s) has E[A|s] = 0')
print(' - Policy gradient theorem: nabla J = E[nabla log pi * A]')
print(' - Using A instead of Q reduces variance (baseline subtraction)')
print(' - V plays the role of a control variate / martingale baseline')
print('\nTakeaway: Martingale structure of value functions is the foundation of actor-critic methods.')
Exercise 9 *** - AR(1) Stationarity and Autocorrelation
Consider the process with and .
Part (a): Simulate a long AR(1) trajectory.
Part (b): Verify the stationary variance .
Part (c): Estimate autocorrelation at lags 1 through 5 and compare with .
Part (d): Explain why autocorrelation matters for MCMC and SGD diagnostics.
Code cell 29
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 30
# Solution
phi, sigma = 0.8, 0.5
T = 50000
x = np.zeros(T)
noise = np.random.normal(0, sigma, T)
for t in range(1, T):
x[t] = phi * x[t-1] + noise[t]
xb = x[5000:]
var_theory = sigma**2 / (1 - phi**2)
def autocorr(series, lag):
a = series[:-lag] - series[:-lag].mean()
b = series[lag:] - series[lag:].mean()
return np.dot(a, b) / np.sqrt(np.dot(a, a) * np.dot(b, b))
header('Exercise 9: AR(1) Stationarity and Autocorrelation')
print(f'Empirical variance: {xb.var():.4f}, theory: {var_theory:.4f}')
check_close('Stationary variance matches', xb.var(), var_theory, tol=0.03)
for k in range(1, 6):
ac = autocorr(xb, k)
print(f'lag {k}: empirical ac={ac:.4f}, theory={phi**k:.4f}')
check_close(f'autocorr lag {k}', ac, phi**k, tol=0.03)
print('Takeaway: correlated samples carry less information than iid samples; diagnostics must account for autocorrelation.')
Exercise 10 *** - Diffusion Forward Process and Signal-to-Noise Ratio
A simplified diffusion model defines .
Part (a): Build a linear beta schedule and compute .
Part (b): Track the signal-to-noise ratio .
Part (c): Simulate for a non-Gaussian and verify late-time samples approach standard normal.
Part (d): Explain why the forward process is designed to destroy data structure gradually.
Code cell 32
# Your Solution
print("Write your solution here, then compare with the reference solution below.")
Code cell 33
# Solution
from scipy import stats
T = 1000
beta = np.linspace(1e-4, 0.02, T)
alpha = 1 - beta
alpha_bar = np.cumprod(alpha)
snr = alpha_bar / (1 - alpha_bar)
n = 40000
x0 = np.random.choice([-2.0, 2.0], size=n) # visibly non-Gaussian data
def sample_xt(t_index):
ab = alpha_bar[t_index]
eps = np.random.randn(n)
return np.sqrt(ab) * x0 + np.sqrt(1 - ab) * eps
x_early = sample_xt(49)
x_late = sample_xt(999)
ks_late = stats.kstest(x_late, 'norm')
header('Exercise 10: Diffusion Forward Process')
print(f'SNR at t=50: {snr[49]:.4f}')
print(f'SNR at t=1000: {snr[-1]:.8f}')
print(f'Late mean={x_late.mean():.4f}, late std={x_late.std():.4f}, KS p={ks_late.pvalue:.4f}')
check_true('SNR decreases over time', snr[49] > snr[-1])
check_close('Late mean near 0', x_late.mean(), 0.0, tol=0.03)
check_close('Late std near 1', x_late.std(), 1.0, tol=0.03)
print('Takeaway: diffusion training learns to reverse a controlled stochastic process from noise back to data.')
What to Review After Finishing
- Martingale property: — know 3 examples
- OST derivation for gambler's ruin (both prob and expected time)
- Poisson thinning/superposition theorems and their proofs
- Doob martingale construction:
- BM quadratic variation and non-differentiability
- OU stationary distribution
- GP posterior formula:
- SGD stationary variance:
- TD error as martingale difference under
References
- Williams, D. — Probability with Martingales (Cambridge, 1991)
- Øksendal, B. — Stochastic Differential Equations (Springer, 6th ed.)
- Rasmussen & Williams — Gaussian Processes for Machine Learning (MIT Press, 2006)
- Song et al. — Score-Based Generative Modeling through SDEs (ICLR 2021)
- Mandt, Hoffman & Blei — Stochastic Gradient Descent as Approximate Bayesian Inference (JMLR 2017)
- AR(1) stationarity - can you explain the probabilistic calculation and the ML relevance?
- Diffusion forward process - can you connect the computation to model behavior?