Theory Notebook
Converted from
theory.ipynbfor web reading.
Markov Chains — Theory Notebook
"A sequence of experiments forms a simple Markov chain if the conditional distribution of each experiment, given all preceding ones, depends only on the immediately preceding experiment." — Andrei Markov, 1906
Interactive derivations: transition matrices, Chapman-Kolmogorov, state classification, stationary distributions, mixing times, MCMC, PageRank, and HMMs.
Code cell 2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
try:
import seaborn as sns
sns.set_theme(style="whitegrid", palette="colorblind")
HAS_SNS = True
except ImportError:
plt.style.use("seaborn-v0_8-whitegrid")
HAS_SNS = False
mpl.rcParams.update({
"figure.figsize": (10, 6),
"figure.dpi": 120,
"font.size": 13,
"axes.titlesize": 15,
"axes.labelsize": 13,
"xtick.labelsize": 11,
"ytick.labelsize": 11,
"legend.fontsize": 11,
"legend.framealpha": 0.85,
"lines.linewidth": 2.0,
"axes.spines.top": False,
"axes.spines.right": False,
"savefig.bbox": "tight",
"savefig.dpi": 150,
})
np.random.seed(42)
print("Plot setup complete.")
Code cell 3
import numpy as np
import numpy.linalg as la
from scipy import linalg as sla
try:
import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [10, 6]
plt.rcParams['font.size'] = 12
HAS_MPL = True
except ImportError:
HAS_MPL = False
np.set_printoptions(precision=6, suppress=True)
np.random.seed(42)
def is_stochastic(P, tol=1e-10):
return np.all(P >= -tol) and np.allclose(P.sum(axis=1), 1, atol=tol)
def stationary_power(P, tol=1e-12, max_iter=10000):
pi = np.ones(P.shape[0]) / P.shape[0]
for _ in range(max_iter):
pi_new = pi @ P
if np.max(np.abs(pi_new - pi)) < tol:
return pi_new
pi = pi_new
return pi
def tv_distance(mu, nu):
return 0.5 * np.sum(np.abs(mu - nu))
print('Setup complete.')
1. Transition Matrices and Chapman-Kolmogorov
A stochastic matrix has and . The entry is the probability of transitioning from state to state .
-step transitions: — the -step transition matrix. The distribution at step : .
Chapman-Kolmogorov: — matrix multiplication is the right operation.
Code cell 5
# === 1.1 Transition Matrix Examples ===
# Two-state weather chain: Sunny/Rainy
P_weather = np.array([[0.8, 0.2],
[0.4, 0.6]])
print('Weather chain P:')
print(P_weather)
print(f'Row sums: {P_weather.sum(axis=1)} (should be [1, 1])')
print(f'Is stochastic: {is_stochastic(P_weather)}')
# Chapman-Kolmogorov: P^(m+n) = P^m @ P^n
m, n = 3, 4
Pm = la.matrix_power(P_weather, m)
Pn = la.matrix_power(P_weather, n)
Pmn = la.matrix_power(P_weather, m + n)
print(f'\nChapman-Kolmogorov P^{m+n} = P^{m} @ P^{n}:')
print(f'Direct P^{m+n}:')
print(Pmn)
print(f'P^{m} @ P^{n}:')
print(Pm @ Pn)
print(f'Equal: {np.allclose(Pmn, Pm @ Pn)}')
# Distribution evolution
mu0 = np.array([1.0, 0.0]) # start sunny
print('\nDistribution evolution from sunny start:')
for step in [0, 1, 5, 10, 20, 50]:
mun = mu0 @ la.matrix_power(P_weather, step)
print(f' n={step:3d}: P(Sunny)={mun[0]:.6f}, P(Rainy)={mun[1]:.6f}')
2. State Classification
States are classified by their long-run behaviour:
- Communicating: if there exist paths and
- Recurrent: ; equivalently
- Transient: ; chain leaves state permanently with positive probability
- Period: ; state is aperiodic if
- Absorbing: ; never leaves once entered
Code cell 7
# === 2.1 State Classification ===
import numpy as np
def classify_states(P, n_check=200):
"""Classify states: transient vs recurrent, period."""
N = P.shape[0]
# Return probabilities: sum P^n_ii
cumsum_return = np.zeros(N)
Pn = np.eye(N)
for n in range(1, n_check+1):
Pn = Pn @ P
cumsum_return += np.diag(Pn)
# Period: gcd of return times
Pn = np.eye(N)
periods = [0] * N
for n in range(1, 51):
Pn = Pn @ P
for i in range(N):
if Pn[i, i] > 1e-10:
if periods[i] == 0:
periods[i] = n
else:
from math import gcd
periods[i] = gcd(periods[i], n)
return cumsum_return, periods
# Example 1: Simple 3-state chain
P3 = np.array([[0.5, 0.5, 0.0],
[0.3, 0.4, 0.3],
[0.0, 0.5, 0.5]])
returns3, periods3 = classify_states(P3)
print('3-state irreducible chain:')
for i in range(3):
status = 'recurrent' if returns3[i] > 50 else 'transient'
print(f' State {i}: return_sum={returns3[i]:.1f}, period={periods3[i]}, status={status}')
# Example 2: Chain with absorbing state
P_abs = np.array([[0.5, 0.3, 0.2, 0.0],
[0.0, 0.6, 0.4, 0.0],
[0.0, 0.0, 0.4, 0.6],
[0.0, 0.0, 0.0, 1.0]]) # state 3 absorbing
returns_abs, periods_abs = classify_states(P_abs)
print('\nChain with absorbing state 3:')
for i in range(4):
status = 'recurrent' if returns_abs[i] > 50 else 'transient'
print(f' State {i}: return_sum={returns_abs[i]:.1f}, status={status}')
# Example 3: Periodic chain (2-cycle)
P_period = np.array([[0.0, 1.0],
[1.0, 0.0]]) # period 2
_, periods_p = classify_states(P_period)
print(f'\nPeriodic chain (0<->1): periods = {periods_p} (expected [2, 2])')
3. Stationary Distributions and Perron-Frobenius
A distribution is stationary if , , .
Perron-Frobenius theorem: For an irreducible (primitive) stochastic matrix:
- Unique stationary distribution
- All other eigenvalues
- geometrically at rate
Methods: (1) solve , ; (2) power iteration; (3) left eigenvector of .
Code cell 9
# === 3.1 Stationary Distribution Methods ===
import numpy as np
import numpy.linalg as la
def stationary_linear_system(P):
"""Solve pi P = pi, sum(pi)=1 via linear system."""
N = P.shape[0]
A = (P.T - np.eye(N))
A[-1] = 1 # replace last equation with normalisation
b = np.zeros(N)
b[-1] = 1
return la.solve(A, b)
def stationary_eigenvector(P):
"""Stationary dist via left eigenvector for eigenvalue 1."""
vals, vecs = la.eig(P.T)
idx = np.argmin(np.abs(vals - 1))
pi = np.real(vecs[:, idx])
return pi / pi.sum()
# Test on weather chain
P = np.array([[0.8, 0.2], [0.4, 0.6]])
pi_ls = stationary_linear_system(P)
pi_pi = stationary_power(P)
pi_ev = stationary_eigenvector(P)
print('Weather chain stationary distribution:')
print(f' Linear system: {pi_ls}')
print(f' Power iteration: {pi_pi}')
print(f' Eigenvector: {pi_ev}')
print(f' Theory (q/(p+q), p/(p+q)): ({0.4/0.6:.4f}, {0.2/0.6:.4f})')
assert np.allclose(pi_ls, pi_pi, atol=1e-6)
print('PASS - All methods agree')
# Verify stationary property: pi @ P = pi
residual = pi_ls @ P - pi_ls
print(f'\nVerify pi P = pi: max residual = {np.max(np.abs(residual)):.2e}')
# Eigenvalue spectrum
evals = np.sort(np.real(la.eigvals(P)))[::-1]
print(f'\nEigenvalues: {evals}')
print(f'Spectral gap: {1 - abs(evals[1]):.4f}')
4. Convergence to Stationarity
For an ergodic chain, at rate .
TV distance:
Mixing time:
Spectral gap bound:
Code cell 11
# === 4.1 Convergence to Stationarity ===
import numpy as np
import numpy.linalg as la
def convergence_profile(P, n_max=100):
"""Compute TV distance from each starting state to pi over time."""
N = P.shape[0]
pi = stationary_power(P)
tv_per_state = np.zeros((N, n_max + 1))
Pn = np.eye(N)
for n in range(n_max + 1):
for i in range(N):
tv_per_state[i, n] = tv_distance(Pn[i], pi)
if n < n_max:
Pn = Pn @ P
return tv_per_state, pi
# Weather chain
P = np.array([[0.8, 0.2], [0.4, 0.6]])
tv_profile, pi = convergence_profile(P, n_max=30)
print('Convergence to stationarity (TV distance from sunny start):')
evals = np.sort(np.abs(la.eigvals(P)))[::-1]
lambda2 = evals[1]
gap = 1 - lambda2
print(f'Spectral gap: {gap:.4f}, lambda_2: {lambda2:.4f}')
print(f' n TV_from_state0 TV_bound(lambda_2^n/sqrt(pi_min))')
pi_min = pi.min()
for n in [1, 5, 10, 20, 30]:
tv = tv_profile[0, n]
bound = 0.5 * lambda2**n / np.sqrt(pi_min)
print(f' {n:3d} {tv:.8f} {bound:.8f}')
# Mixing time computation
eps = 0.01
t_mix_empirical = next(n for n in range(1, 100) if tv_profile.max(axis=0)[n] <= eps)
t_mix_theory = int(np.ceil(np.log(1/(eps * pi_min)) / gap))
print(f'\nMixing time to eps={eps}:')
print(f' Empirical: {t_mix_empirical}')
print(f' Spectral gap bound: {t_mix_theory}')
# Periodic chain: no convergence
P_period = np.array([[0., 1.], [1., 0.]])
tv_p, pi_p = convergence_profile(P_period, n_max=10)
print(f'\nPeriodic chain TV distance from state 0: {tv_p[0, :11]}')
print('TV oscillates -- no convergence for periodic chain')
5. Detailed Balance and Reversibility
A chain is reversible with respect to if:
Detailed balance stationarity. The reversed chain equals for reversible chains.
Birth-death chains are always reversible with explicit stationary distribution:
Code cell 13
# === 5.1 Detailed Balance Verification ===
import numpy as np
def check_detailed_balance(P, pi, tol=1e-10):
"""Check pi_i P_ij = pi_j P_ji for all i,j."""
N = P.shape[0]
max_violation = 0
violations = []
for i in range(N):
for j in range(N):
lhs = pi[i] * P[i, j]
rhs = pi[j] * P[j, i]
diff = abs(lhs - rhs)
if diff > max_violation:
max_violation = diff
if diff > tol:
violations.append((i, j, lhs, rhs, diff))
return len(violations) == 0, violations, max_violation
# 1. Reversible: symmetric chain
P_sym = np.array([[0.5, 0.3, 0.2],
[0.3, 0.4, 0.3],
[0.2, 0.3, 0.5]])
pi_sym = stationary_power(P_sym)
ok_sym, _, max_v_sym = check_detailed_balance(P_sym, pi_sym)
print(f'Symmetric chain satisfies DB: {ok_sym} (max violation: {max_v_sym:.2e})')
# 2. Birth-death chain
N_bd = 5
p = 0.6 # birth probability
q = 0.4 # death probability
P_bd = np.zeros((N_bd, N_bd))
for i in range(N_bd):
if i > 0: P_bd[i, i-1] = q
if i < N_bd-1: P_bd[i, i+1] = p
P_bd[0, 0] = q # reflect at 0
P_bd[-1, -1] = p # reflect at N-1
pi_bd = stationary_power(P_bd)
# Analytical: pi_n = pi_0 * (p/q)^n
ratio = p / q
pi_bd_theory = np.array([ratio**k for k in range(N_bd)])
pi_bd_theory /= pi_bd_theory.sum()
print(f'\nBirth-death chain (p={p}, q={q}):')
print(f' pi (power iteration): {pi_bd}')
print(f' pi (theory, (p/q)^n): {pi_bd_theory}')
print(f' Match: {np.allclose(pi_bd, pi_bd_theory, atol=1e-6)}')
ok_bd, _, max_v_bd = check_detailed_balance(P_bd, pi_bd)
print(f' Detailed balance satisfied: {ok_bd}')
# 3. Non-reversible: cyclic chain
P_cyc = np.array([[0., 1., 0.],
[0., 0., 1.],
[1., 0., 0.]])
pi_cyc = stationary_power(P_cyc)
ok_cyc, violations_cyc, max_v_cyc = check_detailed_balance(P_cyc, pi_cyc)
print(f'\nCyclic chain satisfies DB: {ok_cyc} (max violation: {max_v_cyc:.4f})')
print('(Cyclic chain is NOT reversible despite having uniform stationary dist)')
6. Mixing Times and Spectral Gap
Spectral gap: for the second-largest eigenvalue.
Mixing time bound:
Cheeger inequality: where is the conductance.
Lazy chain: doubles mixing time but ensures non-negative eigenvalues.
Code cell 15
# === 6.1 Spectral Gap and Mixing Time ===
import numpy as np
import numpy.linalg as la
def spectral_gap(P):
"""Spectral gap = 1 - |lambda_2| for reversible P."""
evals = np.sort(np.abs(np.real(la.eigvals(P))))[::-1]
return 1 - evals[1], evals
def mixing_time_bound(P, eps=0.01):
gap, evals = spectral_gap(P)
pi = stationary_power(P)
pi_min = pi.min()
return int(np.ceil(np.log(1/(eps * pi_min)) / gap))
# Compare chains with different mixing rates
print('Spectral gaps and mixing time bounds (eps=0.01):')
chains = {
'Fast (p=0.5, q=0.5)': np.array([[0.5, 0.5], [0.5, 0.5]]),
'Weather (p=0.2, q=0.4)': np.array([[0.8, 0.2], [0.4, 0.6]]),
'Slow (p=0.05, q=0.05)': np.array([[0.95, 0.05], [0.05, 0.95]]),
}
for name, P in chains.items():
gap, evals = spectral_gap(P)
t_bound = mixing_time_bound(P)
print(f' {name}:')
print(f' lambda_2={evals[1]:.4f}, gap={gap:.4f}, t_mix_bound={t_bound}')
# Lazy chain comparison
P_slow = np.array([[0.95, 0.05], [0.05, 0.95]])
P_lazy = 0.5 * (np.eye(2) + P_slow)
gap_slow, _ = spectral_gap(P_slow)
gap_lazy, _ = spectral_gap(P_lazy)
print(f'\nLazy chain comparison:')
print(f' Original gap: {gap_slow:.4f}')
print(f' Lazy gap: {gap_lazy:.4f} (halved, as expected)')
print(f' Lazy eigenvalues non-negative: {np.all(np.real(la.eigvals(P_lazy)) >= -1e-10)}')
7. Metropolis-Hastings Algorithm
Algorithm: Given current state , target , proposal :
- Sample candidate
- Accept with probability
- Else stay at
Correctness: MH satisfies detailed balance — — so is the stationary distribution.
Optimal acceptance rate: ~23% in high dimensions (Gaussian random walk proposal).
Code cell 17
# === 7.1 Metropolis-Hastings ===
import numpy as np
np.random.seed(42)
def log_target(theta, kind='double_well'):
if kind == 'double_well':
return -theta**4/4 + theta**2/2 # bimodal: modes at ±1
elif kind == 'gaussian':
return -0.5 * theta**2
def metropolis_hastings(log_pi, n_steps=50000, sigma=0.5, x0=0.0, kind='double_well'):
x = x0
samples = []
n_accept = 0
for _ in range(n_steps):
x_prime = x + sigma * np.random.randn()
log_a = log_pi(x_prime, kind) - log_pi(x, kind)
if np.log(np.random.rand()) < log_a:
x = x_prime
n_accept += 1
samples.append(x)
return np.array(samples), n_accept / n_steps
# Run MH on double-well target
samples, acc_rate = metropolis_hastings(log_target, n_steps=50000, sigma=0.5)
print(f'MH on double-well: acceptance rate = {acc_rate:.3f}')
# Discard burn-in
burn_in = 5000
post_samples = samples[burn_in:]
# Verify bimodal structure: samples near ±1
frac_positive = (post_samples > 0).mean()
frac_negative = (post_samples < 0).mean()
print(f'Fraction in positive mode (theta>0): {frac_positive:.3f}')
print(f'Fraction in negative mode (theta<0): {frac_negative:.3f}')
print(f'Should be ~50/50 by symmetry: symmetric={abs(frac_positive - 0.5) < 0.05}')
# Verify detailed balance numerically
print('\nDetailed balance check (discrete approximation):')
theta1, theta2 = 0.5, 1.0
log_pi1, log_pi2 = log_target(theta1), log_target(theta2)
sigma = 0.5
from scipy.stats import norm
log_q12 = norm.logpdf(theta2, theta1, sigma)
log_q21 = norm.logpdf(theta1, theta2, sigma)
# MH transition density: pi(theta1)*q(theta2|theta1)*a(theta1,theta2)
log_a12 = min(0.0, log_pi2 + log_q21 - log_pi1 - log_q12)
log_a21 = min(0.0, log_pi1 + log_q12 - log_pi2 - log_q21)
lhs = log_pi1 + log_q12 + log_a12 # log pi(t1)*q(t2|t1)*a(t1,t2)
rhs = log_pi2 + log_q21 + log_a21 # log pi(t2)*q(t1|t2)*a(t2,t1)
print(f' log pi(t1)*K(t1->t2) = {lhs:.6f}')
print(f' log pi(t2)*K(t2->t1) = {rhs:.6f}')
print(f' Difference: {abs(lhs - rhs):.2e} (should be ~0)')
print('PASS - MH satisfies detailed balance')
# Effect of sigma on acceptance and ESS
print('\nEffect of proposal sigma:')
for sigma_test in [0.1, 0.5, 1.0, 2.0, 5.0]:
samps, acc = metropolis_hastings(log_target, n_steps=10000, sigma=sigma_test)
# ESS approximation via autocorrelation
samps = samps[1000:] # discard burn-in
acf = np.correlate(samps - samps.mean(), samps - samps.mean(), mode='full')
acf = acf[len(acf)//2:] / acf[len(acf)//2]
rho = acf[1:10].sum() * 2 + 1 # rough ESS denominator
ess_ratio = 1 / max(rho, 1)
print(f' sigma={sigma_test:.1f}: acc={acc:.3f}, ESS/T~{ess_ratio:.3f}')
8. Gibbs Sampling
Gibbs sampling updates one coordinate at a time from its full conditional:
- Special case of MH with acceptance ratio = 1
- Requires ability to sample from full conditionals
- Works well when coordinates are nearly independent; struggles with high correlation
Ising model example: 2 spins , joint . Full conditionals are Bernoulli distributions.
Code cell 19
# === 8.1 Gibbs Sampling: 2D Gaussian and Ising Model ===
import numpy as np
np.random.seed(42)
# === Gibbs for 2D Correlated Gaussian ===
# Target: N([0,0], [[1, rho],[rho, 1]])
rho = 0.9
n_steps = 30000
def gibbs_gaussian_2d(rho, n_steps):
x1, x2 = 0.0, 0.0
samples = []
for _ in range(n_steps):
# Full conditional: x1 | x2 ~ N(rho*x2, 1-rho^2)
x1 = np.random.normal(rho * x2, np.sqrt(1 - rho**2))
# Full conditional: x2 | x1 ~ N(rho*x1, 1-rho^2)
x2 = np.random.normal(rho * x1, np.sqrt(1 - rho**2))
samples.append((x1, x2))
return np.array(samples)
samps = gibbs_gaussian_2d(rho, n_steps)
samps = samps[5000:] # burn-in
emp_mean = samps.mean(axis=0)
emp_cov = np.cov(samps.T)
print(f'Gibbs on correlated Gaussian (rho={rho}):')
print(f' Empirical mean: {emp_mean} (expected [0, 0])')
print(f' Empirical cov:\n{emp_cov}')
print(f' Expected cov: [[1, {rho}], [{rho}, 1]]')
assert np.allclose(emp_mean, [0, 0], atol=0.05)
assert np.allclose(emp_cov, [[1, rho], [rho, 1]], atol=0.05)
print('PASS - Gibbs converges to correct distribution')
# High correlation -> slow mixing
# Compute ESS via lag-1 autocorrelation
for rho_test in [0.0, 0.5, 0.9, 0.99]:
s = gibbs_gaussian_2d(rho_test, 5000)[1000:]
ac1 = np.corrcoef(s[:-1, 0], s[1:, 0])[0, 1]
ess_ratio = (1 - ac1) / (1 + ac1)
print(f' rho={rho_test}: lag-1 autocorr={ac1:.3f}, ESS/T~{ess_ratio:.3f}')
print('Higher correlation -> worse Gibbs mixing (need more steps for same ESS)')
9. PageRank
PageRank models a random web surfer: with prob follow a random link, with prob teleport to any page.
PageRank = stationary distribution of this chain. Computed by power iteration; convergence rate .
Code cell 21
# === 9.1 PageRank Power Iteration ===
import numpy as np
def pagerank(A, alpha=0.85, tol=1e-10, max_iter=1000):
N = A.shape[0]
out_degree = A.sum(axis=1)
# Handle dangling nodes
dangling = (out_degree == 0)
out_degree[dangling] = 1
P = A / out_degree[:, np.newaxis] # row-stochastic
P[dangling] = 1.0 / N # dangling nodes teleport
teleport = np.ones((N, N)) / N
P_hat = alpha * P + (1 - alpha) * teleport
# Power iteration
pi = np.ones(N) / N
history = [pi.copy()]
for k in range(max_iter):
pi_new = pi @ P_hat
history.append(pi_new.copy())
if np.max(np.abs(pi_new - pi)) < tol:
break
pi = pi_new
return pi_new, np.array(history)
# 5-node web graph
A5 = np.array([
[0, 1, 1, 0, 0],
[0, 0, 1, 1, 0],
[0, 0, 0, 1, 1],
[1, 0, 0, 0, 1],
[0, 1, 0, 0, 0]
], dtype=float)
pr, history = pagerank(A5, alpha=0.85)
print('PageRank for 5-node web graph (alpha=0.85):')
for i, p in enumerate(pr):
print(f' Page {i+1}: {p:.6f}')
# Verify: pi P = pi
N = A5.shape[0]
out_degree = A5.sum(axis=1)
P_hat = 0.85 * (A5 / out_degree[:, np.newaxis]) + 0.15 * np.ones((N,N)) / N
residual = np.max(np.abs(pr @ P_hat - pr))
print(f'\nVerify pi P = pi: max residual = {residual:.2e}')
assert residual < 1e-8
print('PASS - PageRank is stationary distribution')
# Convergence rate
tv_hist = [tv_distance(h, pr) for h in history]
print(f'\nConvergence (TV to final PR):')
for k in [1, 5, 10, 20, len(history)-1]:
print(f' iter {k:3d}: TV = {tv_hist[min(k, len(tv_hist)-1)]:.2e}')
print(f'Total iterations: {len(history)-1}')
# Impact of dangling node
A5_dangling = A5.copy()
A5_dangling[4] = 0 # make node 5 dangling
pr_dangle, _ = pagerank(A5_dangling, alpha=0.85)
print(f'\nWith dangling node 5: PageRank = {pr_dangle}')
10. Hidden Markov Models
An HMM has hidden states (Markov chain) and observations emitted from .
Forward algorithm:
Viterbi algorithm:
Code cell 23
# === 10.1 HMM Forward Algorithm and Viterbi ===
import numpy as np
def hmm_forward(obs, T, E, pi0):
"""Forward algorithm. Returns alpha matrix and likelihood."""
n_states = T.shape[0]
n_obs = len(obs)
alpha = np.zeros((n_obs, n_states))
alpha[0] = pi0 * E[:, obs[0]]
for t in range(1, n_obs):
for j in range(n_states):
alpha[t, j] = E[j, obs[t]] * np.dot(alpha[t-1], T[:, j])
return alpha, alpha[-1].sum()
def hmm_viterbi(obs, T, E, pi0):
"""Viterbi algorithm. Returns most likely state sequence."""
n_states = T.shape[0]
n_obs = len(obs)
delta = np.zeros((n_obs, n_states))
psi = np.zeros((n_obs, n_states), dtype=int)
delta[0] = pi0 * E[:, obs[0]]
for t in range(1, n_obs):
for j in range(n_states):
trans = delta[t-1] * T[:, j]
psi[t, j] = np.argmax(trans)
delta[t, j] = E[j, obs[t]] * trans[psi[t, j]]
# Backtrack
path = np.zeros(n_obs, dtype=int)
path[-1] = np.argmax(delta[-1])
for t in range(n_obs-2, -1, -1):
path[t] = psi[t+1, path[t+1]]
return path, delta
# HMM: weather CpG model
# States: H (CpG island), L (non-CpG)
# Observations: 0=A, 1=C, 2=G, 3=T
T_hmm = np.array([[0.5, 0.5], [0.4, 0.6]]) # H->H=0.5, H->L=0.5
E_hmm = np.array([[0.1, 0.4, 0.4, 0.1], # H: C,G enriched
[0.3, 0.2, 0.2, 0.3]]) # L: A,T enriched
pi0_hmm = np.array([0.5, 0.5])
# Observation: C,G,C,A,T (indices 1,2,1,0,3)
obs = [1, 2, 1, 0, 3]
alpha, likelihood = hmm_forward(obs, T_hmm, E_hmm, pi0_hmm)
print('HMM forward algorithm:')
print(f' P(obs=(C,G,C,A,T)) = {likelihood:.6f}')
print(f' Forward matrix alpha:')
for t, ob in enumerate(['C','G','C','A','T']):
post = alpha[t] / alpha[t].sum()
print(f' t={t} ({ob}): alpha_H={alpha[t,0]:.4f}, alpha_L={alpha[t,1]:.4f} | P(H)={post[0]:.3f}')
# Verify: sum(alpha[t]) = P(X_1..X_t)
print(f'\nVerify sum(alpha[t]) = P(X_1..X_t):')
for t in range(len(obs)):
print(f' t={t}: sum(alpha[{t}])={alpha[t].sum():.6f}')
# Viterbi
viterbi_path, delta = hmm_viterbi(obs, T_hmm, E_hmm, pi0_hmm)
state_names = ['H', 'L']
path_str = '->'.join([state_names[s] for s in viterbi_path])
print(f'\nViterbi path: {path_str}')
print(f'Likelihood of Viterbi path: {delta[-1].max():.6f}')
11. Continuous-Time Markov Chains
A CTMC holds in each state for time, then jumps. The generator matrix has:
- for (jump rates)
- (row sums to 0)
Transition matrix:
Stationary distribution: ,
M/M/1 queue: arrival rate , service rate . Stationary: with if .
Code cell 25
# === 11.1 Continuous-Time Markov Chain and M/M/1 Queue ===
import numpy as np
from scipy.linalg import expm
# 3-state CTMC: rates q12=1, q13=0.5, q21=2, q23=0.5, q31=1, q32=0.5
Q = np.array([[-1.5, 1.0, 0.5],
[ 2.0, -2.5, 0.5],
[ 1.0, 0.5, -1.5]])
print('Generator matrix Q:')
print(Q)
print(f'Row sums (should be 0): {Q.sum(axis=1)}')
# Stationary: pi Q = 0
def ctmc_stationary(Q):
N = Q.shape[0]
A = Q.T.copy()
A[-1] = 1 # normalisation
b = np.zeros(N)
b[-1] = 1
return np.linalg.solve(A, b)
pi_ctmc = ctmc_stationary(Q)
print(f'\nStationary distribution: {pi_ctmc}')
print(f'Verify pi Q = 0: {pi_ctmc @ Q} (should be ~0)')
assert np.allclose(pi_ctmc @ Q, 0, atol=1e-10)
print('PASS - CTMC stationary distribution verified')
# P(t) = exp(Q*t)
for t in [0.1, 1.0, 5.0, 10.0]:
Pt = expm(Q * t)
print(f'\nP({t}): max |P(t)[i,:] - pi| = {np.max(np.abs(Pt - pi_ctmc)):.6f}')
# M/M/1 Queue
print('\n--- M/M/1 Queue ---')
lam, mu = 0.6, 1.0
rho = lam / mu
N_states = 20 # truncate at 20 customers
Q_mm1 = np.zeros((N_states, N_states))
for i in range(N_states):
if i < N_states - 1:
Q_mm1[i, i+1] = lam # arrival
if i > 0:
Q_mm1[i, i-1] = mu # service
Q_mm1[i, i] = -Q_mm1[i].sum() - Q_mm1[i, i] # fix diagonal
pi_mm1 = ctmc_stationary(Q_mm1)
pi_mm1_theory = np.array([(1 - rho) * rho**n for n in range(N_states)])
pi_mm1_theory /= pi_mm1_theory.sum() # normalise (truncated)
print(f'M/M/1 queue (lambda={lam}, mu={mu}, rho={rho}):')
print(f' pi_0: sim={pi_mm1[0]:.4f}, theory={(1-rho):.4f}')
print(f' pi_1: sim={pi_mm1[1]:.4f}, theory={(1-rho)*rho:.4f}')
print(f' E[queue]: {np.dot(np.arange(N_states), pi_mm1):.3f} (theory {rho/(1-rho):.3f})')
assert np.allclose(pi_mm1[:10], pi_mm1_theory[:10], atol=0.01)
print('PASS - M/M/1 stationary distribution verified')
12. Markov Decision Processes and Reinforcement Learning
An MDP extends Markov chains with actions. A fixed policy induces a Markov chain .
Bellman equation:
Solving:
Code cell 27
# === 12.1 MDP Value Function via Bellman Equation ===
import numpy as np
# 5-state chain MDP
N_states = 5
gamma = 0.9
p_right = 0.6
def make_chain_mdp(N, p_right):
"""Build transition matrix P^pi and reward r for uniform policy."""
P = np.zeros((N, N))
r = np.zeros(N)
for s in range(N):
if s == N-1:
P[s, s] = 1.0 # terminal
else:
s_right = min(s+1, N-1)
s_left = max(s-1, 0)
P[s, s_right] += p_right
P[s, s_left] += 1 - p_right
if s == N-2: # reward for reaching goal
r[s] += p_right * 1.0
return P, r
P_mdp, r_mdp = make_chain_mdp(N_states, p_right)
# Solve Bellman: V = (I - gamma*P)^{-1} r
A_bellman = np.eye(N_states) - gamma * P_mdp
V_star = np.linalg.solve(A_bellman, r_mdp)
print('Value function via Bellman equation:')
for s in range(N_states):
print(f' V*({s}) = {V_star[s]:.6f}')
# Verify via value iteration
V_vi = np.zeros(N_states)
for _ in range(2000):
V_vi = r_mdp + gamma * P_mdp @ V_vi
print(f'\nValue iteration agrees: {np.allclose(V_star, V_vi, atol=1e-6)}')
assert np.allclose(V_star, V_vi, atol=1e-6)
print('PASS - Bellman and value iteration agree')
# TD(0) convergence
print('\nTD(0) learning:')
V_td = np.zeros(N_states)
visit_counts = np.ones(N_states)
mse_hist = []
for ep in range(5000):
s = np.random.randint(0, N_states-1)
for _ in range(50):
if s == N_states-1: break
go_right = np.random.rand() < p_right
s_next = min(s+1, N_states-1) if go_right else max(s-1, 0)
r = 1.0 if go_right and s == N_states-2 else 0.0
alpha = 1 / visit_counts[s]
visit_counts[s] += 1
V_td[s] += alpha * (r + gamma * V_td[s_next] - V_td[s])
s = s_next
if ep % 1000 == 999:
mse = np.mean((V_td[:N_states-1] - V_star[:N_states-1])**2)
mse_hist.append(mse)
print(f' ep={ep+1:5d}: MSE = {mse:.8f}')
print(f'Final MSE: {mse_hist[-1]:.2e}')
print('PASS - TD(0) converges to V*')
13. Summary Visualizations
Comparing convergence profiles, Metropolis-Hastings traces, and PageRank distributions.
Code cell 29
# === 13.1 Summary Plots ===
import numpy as np
np.random.seed(42)
try:
import matplotlib.pyplot as plt
HAS_MPL = True
except ImportError:
HAS_MPL = False
# === Plot 1: TV distance convergence for different spectral gaps ===
n_steps = 50
gaps = [0.05, 0.2, 0.5, 0.8]
pi_vals = {}
tv_vals = {}
for gap in gaps:
lambda2 = 1 - gap
# 2-state chain with this spectral gap
# Eigenvalues 1 and lambda2; stationary = (0.5, 0.5)
p = (1 - lambda2) / 2
P_g = np.array([[1-p, p], [p, 1-p]])
pi_g = np.array([0.5, 0.5])
tv_g = []
for n in range(n_steps+1):
Pn = np.linalg.matrix_power(P_g, n)
tv_g.append(0.5 * np.abs(Pn[0] - pi_g).sum())
tv_vals[gap] = tv_g
if HAS_MPL:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# TV convergence
for gap, tv in tv_vals.items():
axes[0].semilogy(range(len(tv)), tv, label=f'gap={gap}')
axes[0].set_xlabel('Steps'); axes[0].set_ylabel('TV distance (log)')
axes[0].set_title('Convergence Rate vs Spectral Gap')
axes[0].legend()
# MH trace plot
def log_pi_dw(theta):
return -theta**4/4 + theta**2/2
theta = 0.0
mh_trace = []
for _ in range(3000):
t_prime = theta + 0.5 * np.random.randn()
if np.log(np.random.rand()) < log_pi_dw(t_prime) - log_pi_dw(theta):
theta = t_prime
mh_trace.append(theta)
axes[1].plot(mh_trace, alpha=0.7, lw=0.5)
axes[1].axhline(1, color='r', linestyle='--', alpha=0.5)
axes[1].axhline(-1, color='r', linestyle='--', alpha=0.5)
axes[1].set_title('MH Trace (Double Well)')
axes[1].set_xlabel('Step')
# PageRank distribution
A_pr = np.array([[0,1,1,0,0],[0,0,1,1,0],[0,0,0,1,1],[1,0,0,0,1],[0,1,0,0,0]], dtype=float)
out_deg = A_pr.sum(axis=1)
P_pr = 0.85 * (A_pr / out_deg[:, None]) + 0.15 * np.ones((5,5)) / 5
pi_pr = np.ones(5)/5
for _ in range(100):
pi_pr = pi_pr @ P_pr
axes[2].bar(range(1, 6), pi_pr, color='steelblue')
axes[2].set_xlabel('Page'); axes[2].set_ylabel('PageRank')
axes[2].set_title('PageRank Distribution')
plt.tight_layout(); plt.show()
print('Summary plots rendered.')
else:
print('Matplotlib not available; skipping plots.')
print('\nAll theory notebook cells completed.')
Summary
This notebook covered the complete Markov chain theory:
- Transition matrices — Chapman-Kolmogorov, -step distributions
- State classification — communicating classes, recurrence, transience, periodicity
- Stationary distributions — Perron-Frobenius, power iteration, eigendecomposition
- Convergence — TV distance, mixing time, spectral gap bounds
- Detailed balance — reversibility, birth-death chains
- Mixing times — spectral gap, lazy chains
- Metropolis-Hastings — detailed balance proof, proposal tuning
- Gibbs sampling — full conditionals, mixing with correlated targets
- PageRank — power iteration, dangling nodes, convergence rate
- HMM — forward algorithm, Viterbi decoding
- CTMC — generator matrix, M/M/1 queue
- MDP/RL — Bellman equation, TD(0) convergence
Next: Chapter 7 Statistics → ../../07-Statistics/