Theory NotebookMath for LLMs

Markov Chains

Probability Theory / Markov Chains

Run notebook
Private notes
0/8000

Notes stay private to your browser until account sync is configured.

Theory Notebook

Theory Notebook

Converted from theory.ipynb for web reading.

Markov Chains — Theory Notebook

"A sequence of experiments forms a simple Markov chain if the conditional distribution of each experiment, given all preceding ones, depends only on the immediately preceding experiment." — Andrei Markov, 1906

Interactive derivations: transition matrices, Chapman-Kolmogorov, state classification, stationary distributions, mixing times, MCMC, PageRank, and HMMs.

Code cell 2

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

try:
    import seaborn as sns
    sns.set_theme(style="whitegrid", palette="colorblind")
    HAS_SNS = True
except ImportError:
    plt.style.use("seaborn-v0_8-whitegrid")
    HAS_SNS = False

mpl.rcParams.update({
    "figure.figsize":    (10, 6),
    "figure.dpi":         120,
    "font.size":           13,
    "axes.titlesize":      15,
    "axes.labelsize":      13,
    "xtick.labelsize":     11,
    "ytick.labelsize":     11,
    "legend.fontsize":     11,
    "legend.framealpha":   0.85,
    "lines.linewidth":      2.0,
    "axes.spines.top":     False,
    "axes.spines.right":   False,
    "savefig.bbox":       "tight",
    "savefig.dpi":         150,
})
np.random.seed(42)
print("Plot setup complete.")

Code cell 3

import numpy as np
import numpy.linalg as la
from scipy import linalg as sla

try:
    import matplotlib.pyplot as plt
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.rcParams['figure.figsize'] = [10, 6]
    plt.rcParams['font.size'] = 12
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

np.set_printoptions(precision=6, suppress=True)
np.random.seed(42)

def is_stochastic(P, tol=1e-10):
    return np.all(P >= -tol) and np.allclose(P.sum(axis=1), 1, atol=tol)

def stationary_power(P, tol=1e-12, max_iter=10000):
    pi = np.ones(P.shape[0]) / P.shape[0]
    for _ in range(max_iter):
        pi_new = pi @ P
        if np.max(np.abs(pi_new - pi)) < tol:
            return pi_new
        pi = pi_new
    return pi

def tv_distance(mu, nu):
    return 0.5 * np.sum(np.abs(mu - nu))

print('Setup complete.')

1. Transition Matrices and Chapman-Kolmogorov

A stochastic matrix PP has Pij0P_{ij} \geq 0 and jPij=1\sum_j P_{ij} = 1. The (i,j)(i,j) entry is the probability of transitioning from state ii to state jj.

nn-step transitions: P(n)=PnP^{(n)} = P^n — the nn-step transition matrix. The distribution at step nn: μ(n)=μ(0)Pn\mu^{(n)} = \mu^{(0)} P^n.

Chapman-Kolmogorov: Pm+n=PmPnP^{m+n} = P^m P^n — matrix multiplication is the right operation.

Code cell 5

# === 1.1 Transition Matrix Examples ===

# Two-state weather chain: Sunny/Rainy
P_weather = np.array([[0.8, 0.2],
                       [0.4, 0.6]])

print('Weather chain P:')
print(P_weather)
print(f'Row sums: {P_weather.sum(axis=1)} (should be [1, 1])')
print(f'Is stochastic: {is_stochastic(P_weather)}')

# Chapman-Kolmogorov: P^(m+n) = P^m @ P^n
m, n = 3, 4
Pm = la.matrix_power(P_weather, m)
Pn = la.matrix_power(P_weather, n)
Pmn = la.matrix_power(P_weather, m + n)
print(f'\nChapman-Kolmogorov P^{m+n} = P^{m} @ P^{n}:')
print(f'Direct P^{m+n}:')
print(Pmn)
print(f'P^{m} @ P^{n}:')
print(Pm @ Pn)
print(f'Equal: {np.allclose(Pmn, Pm @ Pn)}')

# Distribution evolution
mu0 = np.array([1.0, 0.0])  # start sunny
print('\nDistribution evolution from sunny start:')
for step in [0, 1, 5, 10, 20, 50]:
    mun = mu0 @ la.matrix_power(P_weather, step)
    print(f'  n={step:3d}: P(Sunny)={mun[0]:.6f}, P(Rainy)={mun[1]:.6f}')

2. State Classification

States are classified by their long-run behaviour:

  • Communicating: iji \leftrightarrow j if there exist paths iji \to j and jij \to i
  • Recurrent: fi=P(return to i)=1f_i = P(\text{return to } i) = 1; equivalently nPii(n)=\sum_n P^{(n)}_{ii} = \infty
  • Transient: fi<1f_i < 1; chain leaves state ii permanently with positive probability
  • Period: d(i)=gcd{n:Pii(n)>0}d(i) = \gcd\{n : P^{(n)}_{ii} > 0\}; state is aperiodic if d(i)=1d(i)=1
  • Absorbing: Pii=1P_{ii}=1; never leaves once entered

Code cell 7

# === 2.1 State Classification ===

import numpy as np

def classify_states(P, n_check=200):
    """Classify states: transient vs recurrent, period."""
    N = P.shape[0]
    # Return probabilities: sum P^n_ii
    cumsum_return = np.zeros(N)
    Pn = np.eye(N)
    for n in range(1, n_check+1):
        Pn = Pn @ P
        cumsum_return += np.diag(Pn)

    # Period: gcd of return times
    Pn = np.eye(N)
    periods = [0] * N
    for n in range(1, 51):
        Pn = Pn @ P
        for i in range(N):
            if Pn[i, i] > 1e-10:
                if periods[i] == 0:
                    periods[i] = n
                else:
                    from math import gcd
                    periods[i] = gcd(periods[i], n)
    return cumsum_return, periods

# Example 1: Simple 3-state chain
P3 = np.array([[0.5, 0.5, 0.0],
               [0.3, 0.4, 0.3],
               [0.0, 0.5, 0.5]])
returns3, periods3 = classify_states(P3)
print('3-state irreducible chain:')
for i in range(3):
    status = 'recurrent' if returns3[i] > 50 else 'transient'
    print(f'  State {i}: return_sum={returns3[i]:.1f}, period={periods3[i]}, status={status}')

# Example 2: Chain with absorbing state
P_abs = np.array([[0.5, 0.3, 0.2, 0.0],
                   [0.0, 0.6, 0.4, 0.0],
                   [0.0, 0.0, 0.4, 0.6],
                   [0.0, 0.0, 0.0, 1.0]])  # state 3 absorbing
returns_abs, periods_abs = classify_states(P_abs)
print('\nChain with absorbing state 3:')
for i in range(4):
    status = 'recurrent' if returns_abs[i] > 50 else 'transient'
    print(f'  State {i}: return_sum={returns_abs[i]:.1f}, status={status}')

# Example 3: Periodic chain (2-cycle)
P_period = np.array([[0.0, 1.0],
                      [1.0, 0.0]])  # period 2
_, periods_p = classify_states(P_period)
print(f'\nPeriodic chain (0<->1): periods = {periods_p} (expected [2, 2])')

3. Stationary Distributions and Perron-Frobenius

A distribution π\pi is stationary if πP=π\pi P = \pi, π0\pi \geq 0, π1=1\|\pi\|_1=1.

Perron-Frobenius theorem: For an irreducible (primitive) stochastic matrix:

  • Unique stationary distribution π>0\pi > 0
  • All other eigenvalues λk<1|\lambda_k| < 1
  • Pn1πP^n \to \mathbf{1}\pi geometrically at rate λ2n|\lambda_2|^n

Methods: (1) solve πP=π\pi P = \pi, π1=1\|\pi\|_1=1; (2) power iteration; (3) left eigenvector of PP.

Code cell 9

# === 3.1 Stationary Distribution Methods ===

import numpy as np
import numpy.linalg as la

def stationary_linear_system(P):
    """Solve pi P = pi, sum(pi)=1 via linear system."""
    N = P.shape[0]
    A = (P.T - np.eye(N))
    A[-1] = 1  # replace last equation with normalisation
    b = np.zeros(N)
    b[-1] = 1
    return la.solve(A, b)

def stationary_eigenvector(P):
    """Stationary dist via left eigenvector for eigenvalue 1."""
    vals, vecs = la.eig(P.T)
    idx = np.argmin(np.abs(vals - 1))
    pi = np.real(vecs[:, idx])
    return pi / pi.sum()

# Test on weather chain
P = np.array([[0.8, 0.2], [0.4, 0.6]])

pi_ls = stationary_linear_system(P)
pi_pi = stationary_power(P)
pi_ev = stationary_eigenvector(P)

print('Weather chain stationary distribution:')
print(f'  Linear system:   {pi_ls}')
print(f'  Power iteration: {pi_pi}')
print(f'  Eigenvector:     {pi_ev}')
print(f'  Theory (q/(p+q), p/(p+q)): ({0.4/0.6:.4f}, {0.2/0.6:.4f})')
assert np.allclose(pi_ls, pi_pi, atol=1e-6)
print('PASS - All methods agree')

# Verify stationary property: pi @ P = pi
residual = pi_ls @ P - pi_ls
print(f'\nVerify pi P = pi: max residual = {np.max(np.abs(residual)):.2e}')

# Eigenvalue spectrum
evals = np.sort(np.real(la.eigvals(P)))[::-1]
print(f'\nEigenvalues: {evals}')
print(f'Spectral gap: {1 - abs(evals[1]):.4f}')

4. Convergence to Stationarity

For an ergodic chain, μ(n)πTV0\|\mu^{(n)} - \pi\|_{\text{TV}} \to 0 at rate λ2n|\lambda_2|^n.

TV distance: μνTV=12iμiνi=maxAμ(A)ν(A)\|\mu-\nu\|_{\text{TV}} = \frac{1}{2}\sum_i|\mu_i-\nu_i| = \max_{A}|\mu(A)-\nu(A)|

Mixing time: tmix(ε)=min{t:maxxPt(x,)πTVε}t_{\text{mix}}(\varepsilon) = \min\{t: \max_x \|P^t(x,\cdot)-\pi\|_{\text{TV}} \leq \varepsilon\}

Spectral gap bound: tmix(ε)log(1/(επmin))/gapt_{\text{mix}}(\varepsilon) \leq \lceil \log(1/(\varepsilon\pi_{\min}))/\text{gap} \rceil

Code cell 11

# === 4.1 Convergence to Stationarity ===

import numpy as np
import numpy.linalg as la

def convergence_profile(P, n_max=100):
    """Compute TV distance from each starting state to pi over time."""
    N = P.shape[0]
    pi = stationary_power(P)
    tv_per_state = np.zeros((N, n_max + 1))
    Pn = np.eye(N)
    for n in range(n_max + 1):
        for i in range(N):
            tv_per_state[i, n] = tv_distance(Pn[i], pi)
        if n < n_max:
            Pn = Pn @ P
    return tv_per_state, pi

# Weather chain
P = np.array([[0.8, 0.2], [0.4, 0.6]])
tv_profile, pi = convergence_profile(P, n_max=30)

print('Convergence to stationarity (TV distance from sunny start):')
evals = np.sort(np.abs(la.eigvals(P)))[::-1]
lambda2 = evals[1]
gap = 1 - lambda2
print(f'Spectral gap: {gap:.4f}, lambda_2: {lambda2:.4f}')
print(f'  n    TV_from_state0    TV_bound(lambda_2^n/sqrt(pi_min))')
pi_min = pi.min()
for n in [1, 5, 10, 20, 30]:
    tv = tv_profile[0, n]
    bound = 0.5 * lambda2**n / np.sqrt(pi_min)
    print(f'  {n:3d}  {tv:.8f}    {bound:.8f}')

# Mixing time computation
eps = 0.01
t_mix_empirical = next(n for n in range(1, 100) if tv_profile.max(axis=0)[n] <= eps)
t_mix_theory = int(np.ceil(np.log(1/(eps * pi_min)) / gap))
print(f'\nMixing time to eps={eps}:')
print(f'  Empirical: {t_mix_empirical}')
print(f'  Spectral gap bound: {t_mix_theory}')

# Periodic chain: no convergence
P_period = np.array([[0., 1.], [1., 0.]])
tv_p, pi_p = convergence_profile(P_period, n_max=10)
print(f'\nPeriodic chain TV distance from state 0: {tv_p[0, :11]}')
print('TV oscillates -- no convergence for periodic chain')

5. Detailed Balance and Reversibility

A chain is reversible with respect to π\pi if:

πiPij=πjPjii,j(detailed balance)\pi_i P_{ij} = \pi_j P_{ji} \quad \forall i,j \qquad (\text{detailed balance})

Detailed balance \Rightarrow stationarity. The reversed chain P^ij=πjPji/πi\hat{P}_{ij} = \pi_j P_{ji}/\pi_i equals PP for reversible chains.

Birth-death chains are always reversible with explicit stationary distribution:

πn=π0k=0n1pkqk+1\pi_n = \pi_0 \prod_{k=0}^{n-1} \frac{p_k}{q_{k+1}}

Code cell 13

# === 5.1 Detailed Balance Verification ===

import numpy as np

def check_detailed_balance(P, pi, tol=1e-10):
    """Check pi_i P_ij = pi_j P_ji for all i,j."""
    N = P.shape[0]
    max_violation = 0
    violations = []
    for i in range(N):
        for j in range(N):
            lhs = pi[i] * P[i, j]
            rhs = pi[j] * P[j, i]
            diff = abs(lhs - rhs)
            if diff > max_violation:
                max_violation = diff
            if diff > tol:
                violations.append((i, j, lhs, rhs, diff))
    return len(violations) == 0, violations, max_violation

# 1. Reversible: symmetric chain
P_sym = np.array([[0.5, 0.3, 0.2],
                   [0.3, 0.4, 0.3],
                   [0.2, 0.3, 0.5]])
pi_sym = stationary_power(P_sym)
ok_sym, _, max_v_sym = check_detailed_balance(P_sym, pi_sym)
print(f'Symmetric chain satisfies DB: {ok_sym} (max violation: {max_v_sym:.2e})')

# 2. Birth-death chain
N_bd = 5
p = 0.6  # birth probability
q = 0.4  # death probability
P_bd = np.zeros((N_bd, N_bd))
for i in range(N_bd):
    if i > 0: P_bd[i, i-1] = q
    if i < N_bd-1: P_bd[i, i+1] = p
P_bd[0, 0] = q  # reflect at 0
P_bd[-1, -1] = p  # reflect at N-1

pi_bd = stationary_power(P_bd)
# Analytical: pi_n = pi_0 * (p/q)^n
ratio = p / q
pi_bd_theory = np.array([ratio**k for k in range(N_bd)])
pi_bd_theory /= pi_bd_theory.sum()

print(f'\nBirth-death chain (p={p}, q={q}):')
print(f'  pi (power iteration): {pi_bd}')
print(f'  pi (theory, (p/q)^n): {pi_bd_theory}')
print(f'  Match: {np.allclose(pi_bd, pi_bd_theory, atol=1e-6)}')
ok_bd, _, max_v_bd = check_detailed_balance(P_bd, pi_bd)
print(f'  Detailed balance satisfied: {ok_bd}')

# 3. Non-reversible: cyclic chain
P_cyc = np.array([[0., 1., 0.],
                   [0., 0., 1.],
                   [1., 0., 0.]])
pi_cyc = stationary_power(P_cyc)
ok_cyc, violations_cyc, max_v_cyc = check_detailed_balance(P_cyc, pi_cyc)
print(f'\nCyclic chain satisfies DB: {ok_cyc} (max violation: {max_v_cyc:.4f})')
print('(Cyclic chain is NOT reversible despite having uniform stationary dist)')

6. Mixing Times and Spectral Gap

Spectral gap: gap=1λ2\text{gap} = 1 - |\lambda_2| for the second-largest eigenvalue.

Mixing time bound: tmix(ε)log(1/(επmin))/gapt_{\text{mix}}(\varepsilon) \leq \lceil\log(1/(\varepsilon\pi_{\min}))/\text{gap}\rceil

Cheeger inequality: h2/2gap2hh^2/2 \leq \text{gap} \leq 2h where hh is the conductance.

Lazy chain: P=(I+P)/2P' = (I+P)/2 doubles mixing time but ensures non-negative eigenvalues.

Code cell 15

# === 6.1 Spectral Gap and Mixing Time ===

import numpy as np
import numpy.linalg as la

def spectral_gap(P):
    """Spectral gap = 1 - |lambda_2| for reversible P."""
    evals = np.sort(np.abs(np.real(la.eigvals(P))))[::-1]
    return 1 - evals[1], evals

def mixing_time_bound(P, eps=0.01):
    gap, evals = spectral_gap(P)
    pi = stationary_power(P)
    pi_min = pi.min()
    return int(np.ceil(np.log(1/(eps * pi_min)) / gap))

# Compare chains with different mixing rates
print('Spectral gaps and mixing time bounds (eps=0.01):')
chains = {
    'Fast (p=0.5, q=0.5)': np.array([[0.5, 0.5], [0.5, 0.5]]),
    'Weather (p=0.2, q=0.4)': np.array([[0.8, 0.2], [0.4, 0.6]]),
    'Slow (p=0.05, q=0.05)': np.array([[0.95, 0.05], [0.05, 0.95]]),
}
for name, P in chains.items():
    gap, evals = spectral_gap(P)
    t_bound = mixing_time_bound(P)
    print(f'  {name}:')
    print(f'    lambda_2={evals[1]:.4f}, gap={gap:.4f}, t_mix_bound={t_bound}')

# Lazy chain comparison
P_slow = np.array([[0.95, 0.05], [0.05, 0.95]])
P_lazy = 0.5 * (np.eye(2) + P_slow)
gap_slow, _ = spectral_gap(P_slow)
gap_lazy, _ = spectral_gap(P_lazy)
print(f'\nLazy chain comparison:')
print(f'  Original gap: {gap_slow:.4f}')
print(f'  Lazy gap:     {gap_lazy:.4f}  (halved, as expected)')
print(f'  Lazy eigenvalues non-negative: {np.all(np.real(la.eigvals(P_lazy)) >= -1e-10)}')

7. Metropolis-Hastings Algorithm

Algorithm: Given current state θ\theta, target π\pi, proposal q(θ)q(\cdot|\theta):

  1. Sample candidate θq(θ)\theta' \sim q(\cdot|\theta)
  2. Accept with probability a=min(1, π(θ)q(θθ)/(π(θ)q(θθ)))a = \min(1,\ \pi(\theta')q(\theta|\theta')/(\pi(\theta)q(\theta'|\theta)))
  3. Else stay at θ\theta

Correctness: MH satisfies detailed balance — π(θ)K(θ,θ)=π(θ)K(θ,θ)\pi(\theta)K(\theta,\theta') = \pi(\theta')K(\theta',\theta) — so π\pi is the stationary distribution.

Optimal acceptance rate: ~23% in high dimensions (Gaussian random walk proposal).

Code cell 17

# === 7.1 Metropolis-Hastings ===

import numpy as np
np.random.seed(42)

def log_target(theta, kind='double_well'):
    if kind == 'double_well':
        return -theta**4/4 + theta**2/2  # bimodal: modes at ±1
    elif kind == 'gaussian':
        return -0.5 * theta**2

def metropolis_hastings(log_pi, n_steps=50000, sigma=0.5, x0=0.0, kind='double_well'):
    x = x0
    samples = []
    n_accept = 0
    for _ in range(n_steps):
        x_prime = x + sigma * np.random.randn()
        log_a = log_pi(x_prime, kind) - log_pi(x, kind)
        if np.log(np.random.rand()) < log_a:
            x = x_prime
            n_accept += 1
        samples.append(x)
    return np.array(samples), n_accept / n_steps

# Run MH on double-well target
samples, acc_rate = metropolis_hastings(log_target, n_steps=50000, sigma=0.5)
print(f'MH on double-well: acceptance rate = {acc_rate:.3f}')

# Discard burn-in
burn_in = 5000
post_samples = samples[burn_in:]

# Verify bimodal structure: samples near ±1
frac_positive = (post_samples > 0).mean()
frac_negative = (post_samples < 0).mean()
print(f'Fraction in positive mode (theta>0): {frac_positive:.3f}')
print(f'Fraction in negative mode (theta<0): {frac_negative:.3f}')
print(f'Should be ~50/50 by symmetry: symmetric={abs(frac_positive - 0.5) < 0.05}')

# Verify detailed balance numerically
print('\nDetailed balance check (discrete approximation):')
theta1, theta2 = 0.5, 1.0
log_pi1, log_pi2 = log_target(theta1), log_target(theta2)
sigma = 0.5
from scipy.stats import norm
log_q12 = norm.logpdf(theta2, theta1, sigma)
log_q21 = norm.logpdf(theta1, theta2, sigma)
# MH transition density: pi(theta1)*q(theta2|theta1)*a(theta1,theta2)
log_a12 = min(0.0, log_pi2 + log_q21 - log_pi1 - log_q12)
log_a21 = min(0.0, log_pi1 + log_q12 - log_pi2 - log_q21)
lhs = log_pi1 + log_q12 + log_a12  # log pi(t1)*q(t2|t1)*a(t1,t2)
rhs = log_pi2 + log_q21 + log_a21  # log pi(t2)*q(t1|t2)*a(t2,t1)
print(f'  log pi(t1)*K(t1->t2) = {lhs:.6f}')
print(f'  log pi(t2)*K(t2->t1) = {rhs:.6f}')
print(f'  Difference: {abs(lhs - rhs):.2e}  (should be ~0)')
print('PASS - MH satisfies detailed balance')

# Effect of sigma on acceptance and ESS
print('\nEffect of proposal sigma:')
for sigma_test in [0.1, 0.5, 1.0, 2.0, 5.0]:
    samps, acc = metropolis_hastings(log_target, n_steps=10000, sigma=sigma_test)
    # ESS approximation via autocorrelation
    samps = samps[1000:]  # discard burn-in
    acf = np.correlate(samps - samps.mean(), samps - samps.mean(), mode='full')
    acf = acf[len(acf)//2:] / acf[len(acf)//2]
    rho = acf[1:10].sum() * 2 + 1  # rough ESS denominator
    ess_ratio = 1 / max(rho, 1)
    print(f'  sigma={sigma_test:.1f}: acc={acc:.3f}, ESS/T~{ess_ratio:.3f}')

8. Gibbs Sampling

Gibbs sampling updates one coordinate at a time from its full conditional:

θk(t+1)π(θkθkcurrent)\theta_k^{(t+1)} \sim \pi(\theta_k \mid \theta_{-k}^{\text{current}})
  • Special case of MH with acceptance ratio = 1
  • Requires ability to sample from full conditionals
  • Works well when coordinates are nearly independent; struggles with high correlation

Ising model example: 2 spins {±1}\{\pm 1\}, joint π(s1,s2)eβs1s2\pi(s_1,s_2) \propto e^{\beta s_1 s_2}. Full conditionals are Bernoulli distributions.

Code cell 19

# === 8.1 Gibbs Sampling: 2D Gaussian and Ising Model ===

import numpy as np
np.random.seed(42)

# === Gibbs for 2D Correlated Gaussian ===
# Target: N([0,0], [[1, rho],[rho, 1]])
rho = 0.9
n_steps = 30000

def gibbs_gaussian_2d(rho, n_steps):
    x1, x2 = 0.0, 0.0
    samples = []
    for _ in range(n_steps):
        # Full conditional: x1 | x2 ~ N(rho*x2, 1-rho^2)
        x1 = np.random.normal(rho * x2, np.sqrt(1 - rho**2))
        # Full conditional: x2 | x1 ~ N(rho*x1, 1-rho^2)
        x2 = np.random.normal(rho * x1, np.sqrt(1 - rho**2))
        samples.append((x1, x2))
    return np.array(samples)

samps = gibbs_gaussian_2d(rho, n_steps)
samps = samps[5000:]  # burn-in

emp_mean = samps.mean(axis=0)
emp_cov = np.cov(samps.T)
print(f'Gibbs on correlated Gaussian (rho={rho}):')
print(f'  Empirical mean: {emp_mean} (expected [0, 0])')
print(f'  Empirical cov:\n{emp_cov}')
print(f'  Expected cov: [[1, {rho}], [{rho}, 1]]')
assert np.allclose(emp_mean, [0, 0], atol=0.05)
assert np.allclose(emp_cov, [[1, rho], [rho, 1]], atol=0.05)
print('PASS - Gibbs converges to correct distribution')

# High correlation -> slow mixing
# Compute ESS via lag-1 autocorrelation
for rho_test in [0.0, 0.5, 0.9, 0.99]:
    s = gibbs_gaussian_2d(rho_test, 5000)[1000:]
    ac1 = np.corrcoef(s[:-1, 0], s[1:, 0])[0, 1]
    ess_ratio = (1 - ac1) / (1 + ac1)
    print(f'  rho={rho_test}: lag-1 autocorr={ac1:.3f}, ESS/T~{ess_ratio:.3f}')
print('Higher correlation -> worse Gibbs mixing (need more steps for same ESS)')

9. PageRank

PageRank models a random web surfer: with prob α\alpha follow a random link, with prob 1α1-\alpha teleport to any page.

Pij=αAijdi+(1α)1NP_{ij} = \alpha \cdot \frac{A_{ij}}{d_i} + (1-\alpha) \cdot \frac{1}{N}

PageRank = stationary distribution of this chain. Computed by power iteration; convergence rate αk\alpha^k.

Code cell 21

# === 9.1 PageRank Power Iteration ===

import numpy as np

def pagerank(A, alpha=0.85, tol=1e-10, max_iter=1000):
    N = A.shape[0]
    out_degree = A.sum(axis=1)
    # Handle dangling nodes
    dangling = (out_degree == 0)
    out_degree[dangling] = 1
    P = A / out_degree[:, np.newaxis]  # row-stochastic
    P[dangling] = 1.0 / N  # dangling nodes teleport
    teleport = np.ones((N, N)) / N
    P_hat = alpha * P + (1 - alpha) * teleport
    # Power iteration
    pi = np.ones(N) / N
    history = [pi.copy()]
    for k in range(max_iter):
        pi_new = pi @ P_hat
        history.append(pi_new.copy())
        if np.max(np.abs(pi_new - pi)) < tol:
            break
        pi = pi_new
    return pi_new, np.array(history)

# 5-node web graph
A5 = np.array([
    [0, 1, 1, 0, 0],
    [0, 0, 1, 1, 0],
    [0, 0, 0, 1, 1],
    [1, 0, 0, 0, 1],
    [0, 1, 0, 0, 0]
], dtype=float)

pr, history = pagerank(A5, alpha=0.85)
print('PageRank for 5-node web graph (alpha=0.85):')
for i, p in enumerate(pr):
    print(f'  Page {i+1}: {p:.6f}')

# Verify: pi P = pi
N = A5.shape[0]
out_degree = A5.sum(axis=1)
P_hat = 0.85 * (A5 / out_degree[:, np.newaxis]) + 0.15 * np.ones((N,N)) / N
residual = np.max(np.abs(pr @ P_hat - pr))
print(f'\nVerify pi P = pi: max residual = {residual:.2e}')
assert residual < 1e-8
print('PASS - PageRank is stationary distribution')

# Convergence rate
tv_hist = [tv_distance(h, pr) for h in history]
print(f'\nConvergence (TV to final PR):')
for k in [1, 5, 10, 20, len(history)-1]:
    print(f'  iter {k:3d}: TV = {tv_hist[min(k, len(tv_hist)-1)]:.2e}')
print(f'Total iterations: {len(history)-1}')

# Impact of dangling node
A5_dangling = A5.copy()
A5_dangling[4] = 0  # make node 5 dangling
pr_dangle, _ = pagerank(A5_dangling, alpha=0.85)
print(f'\nWith dangling node 5: PageRank = {pr_dangle}')

10. Hidden Markov Models

An HMM has hidden states ZtZ_t (Markov chain) and observations XtX_t emitted from ZtZ_t.

Forward algorithm: αt(i)=P(X1,,Xt,Zt=i)\alpha_t(i) = P(X_1,\ldots,X_t, Z_t=i)

αt(j)=ej(Xt)iαt1(i)Tij\alpha_t(j) = e_j(X_t) \sum_i \alpha_{t-1}(i) T_{ij}

Viterbi algorithm: δt(j)=maxz1:t1P(X1,,Xt,Zt=j,z1:t1)\delta_t(j) = \max_{z_{1:t-1}} P(X_1,\ldots,X_t, Z_t=j, z_{1:t-1})

δt(j)=ej(Xt)maxiδt1(i)Tij\delta_t(j) = e_j(X_t) \cdot \max_i \delta_{t-1}(i) T_{ij}

Code cell 23

# === 10.1 HMM Forward Algorithm and Viterbi ===

import numpy as np

def hmm_forward(obs, T, E, pi0):
    """Forward algorithm. Returns alpha matrix and likelihood."""
    n_states = T.shape[0]
    n_obs = len(obs)
    alpha = np.zeros((n_obs, n_states))
    alpha[0] = pi0 * E[:, obs[0]]
    for t in range(1, n_obs):
        for j in range(n_states):
            alpha[t, j] = E[j, obs[t]] * np.dot(alpha[t-1], T[:, j])
    return alpha, alpha[-1].sum()

def hmm_viterbi(obs, T, E, pi0):
    """Viterbi algorithm. Returns most likely state sequence."""
    n_states = T.shape[0]
    n_obs = len(obs)
    delta = np.zeros((n_obs, n_states))
    psi = np.zeros((n_obs, n_states), dtype=int)
    delta[0] = pi0 * E[:, obs[0]]
    for t in range(1, n_obs):
        for j in range(n_states):
            trans = delta[t-1] * T[:, j]
            psi[t, j] = np.argmax(trans)
            delta[t, j] = E[j, obs[t]] * trans[psi[t, j]]
    # Backtrack
    path = np.zeros(n_obs, dtype=int)
    path[-1] = np.argmax(delta[-1])
    for t in range(n_obs-2, -1, -1):
        path[t] = psi[t+1, path[t+1]]
    return path, delta

# HMM: weather CpG model
# States: H (CpG island), L (non-CpG)
# Observations: 0=A, 1=C, 2=G, 3=T
T_hmm = np.array([[0.5, 0.5], [0.4, 0.6]])  # H->H=0.5, H->L=0.5
E_hmm = np.array([[0.1, 0.4, 0.4, 0.1],   # H: C,G enriched
                   [0.3, 0.2, 0.2, 0.3]])   # L: A,T enriched
pi0_hmm = np.array([0.5, 0.5])

# Observation: C,G,C,A,T (indices 1,2,1,0,3)
obs = [1, 2, 1, 0, 3]
alpha, likelihood = hmm_forward(obs, T_hmm, E_hmm, pi0_hmm)
print('HMM forward algorithm:')
print(f'  P(obs=(C,G,C,A,T)) = {likelihood:.6f}')
print(f'  Forward matrix alpha:')
for t, ob in enumerate(['C','G','C','A','T']):
    post = alpha[t] / alpha[t].sum()
    print(f'    t={t} ({ob}): alpha_H={alpha[t,0]:.4f}, alpha_L={alpha[t,1]:.4f} | P(H)={post[0]:.3f}')

# Verify: sum(alpha[t]) = P(X_1..X_t)
print(f'\nVerify sum(alpha[t]) = P(X_1..X_t):')
for t in range(len(obs)):
    print(f'  t={t}: sum(alpha[{t}])={alpha[t].sum():.6f}')

# Viterbi
viterbi_path, delta = hmm_viterbi(obs, T_hmm, E_hmm, pi0_hmm)
state_names = ['H', 'L']
path_str = '->'.join([state_names[s] for s in viterbi_path])
print(f'\nViterbi path: {path_str}')
print(f'Likelihood of Viterbi path: {delta[-1].max():.6f}')

11. Continuous-Time Markov Chains

A CTMC holds in each state for Exp(qi)\text{Exp}(q_i) time, then jumps. The generator matrix QQ has:

  • Qij=qij0Q_{ij} = q_{ij} \geq 0 for iji \neq j (jump rates)
  • Qii=qi=jiqijQ_{ii} = -q_i = -\sum_{j \neq i} q_{ij} (row sums to 0)

Transition matrix: P(t)=eQtP(t) = e^{Qt}

Stationary distribution: πQ=0\pi Q = \mathbf{0}, πi=1\sum \pi_i = 1

M/M/1 queue: arrival rate λ\lambda, service rate μ\mu. Stationary: πn=(1ρ)ρn\pi_n = (1-\rho)\rho^n with ρ=λ/μ\rho = \lambda/\mu if ρ<1\rho < 1.

Code cell 25

# === 11.1 Continuous-Time Markov Chain and M/M/1 Queue ===

import numpy as np
from scipy.linalg import expm

# 3-state CTMC: rates q12=1, q13=0.5, q21=2, q23=0.5, q31=1, q32=0.5
Q = np.array([[-1.5,  1.0,  0.5],
              [ 2.0, -2.5,  0.5],
              [ 1.0,  0.5, -1.5]])
print('Generator matrix Q:')
print(Q)
print(f'Row sums (should be 0): {Q.sum(axis=1)}')

# Stationary: pi Q = 0
def ctmc_stationary(Q):
    N = Q.shape[0]
    A = Q.T.copy()
    A[-1] = 1  # normalisation
    b = np.zeros(N)
    b[-1] = 1
    return np.linalg.solve(A, b)

pi_ctmc = ctmc_stationary(Q)
print(f'\nStationary distribution: {pi_ctmc}')
print(f'Verify pi Q = 0: {pi_ctmc @ Q}  (should be ~0)')
assert np.allclose(pi_ctmc @ Q, 0, atol=1e-10)
print('PASS - CTMC stationary distribution verified')

# P(t) = exp(Q*t)
for t in [0.1, 1.0, 5.0, 10.0]:
    Pt = expm(Q * t)
    print(f'\nP({t}): max |P(t)[i,:] - pi| = {np.max(np.abs(Pt - pi_ctmc)):.6f}')

# M/M/1 Queue
print('\n--- M/M/1 Queue ---')
lam, mu = 0.6, 1.0
rho = lam / mu
N_states = 20  # truncate at 20 customers
Q_mm1 = np.zeros((N_states, N_states))
for i in range(N_states):
    if i < N_states - 1:
        Q_mm1[i, i+1] = lam  # arrival
    if i > 0:
        Q_mm1[i, i-1] = mu  # service
    Q_mm1[i, i] = -Q_mm1[i].sum() - Q_mm1[i, i]  # fix diagonal

pi_mm1 = ctmc_stationary(Q_mm1)
pi_mm1_theory = np.array([(1 - rho) * rho**n for n in range(N_states)])
pi_mm1_theory /= pi_mm1_theory.sum()  # normalise (truncated)

print(f'M/M/1 queue (lambda={lam}, mu={mu}, rho={rho}):')
print(f'  pi_0: sim={pi_mm1[0]:.4f}, theory={(1-rho):.4f}')
print(f'  pi_1: sim={pi_mm1[1]:.4f}, theory={(1-rho)*rho:.4f}')
print(f'  E[queue]: {np.dot(np.arange(N_states), pi_mm1):.3f} (theory {rho/(1-rho):.3f})')
assert np.allclose(pi_mm1[:10], pi_mm1_theory[:10], atol=0.01)
print('PASS - M/M/1 stationary distribution verified')

12. Markov Decision Processes and Reinforcement Learning

An MDP (S,A,P,r,γ)(\mathcal{S}, \mathcal{A}, P, r, \gamma) extends Markov chains with actions. A fixed policy π(as)\pi(a|s) induces a Markov chain Pπ(ss)=aπ(as)P(ss,a)P^\pi(s'|s) = \sum_a \pi(a|s)P(s'|s,a).

Bellman equation: Vπ=rπ+γPπVπV^\pi = r^\pi + \gamma P^\pi V^\pi

Solving: Vπ=(IγPπ)1rπV^\pi = (I - \gamma P^\pi)^{-1} r^\pi

Code cell 27

# === 12.1 MDP Value Function via Bellman Equation ===

import numpy as np

# 5-state chain MDP
N_states = 5
gamma = 0.9
p_right = 0.6

def make_chain_mdp(N, p_right):
    """Build transition matrix P^pi and reward r for uniform policy."""
    P = np.zeros((N, N))
    r = np.zeros(N)
    for s in range(N):
        if s == N-1:
            P[s, s] = 1.0  # terminal
        else:
            s_right = min(s+1, N-1)
            s_left = max(s-1, 0)
            P[s, s_right] += p_right
            P[s, s_left] += 1 - p_right
            if s == N-2:  # reward for reaching goal
                r[s] += p_right * 1.0
    return P, r

P_mdp, r_mdp = make_chain_mdp(N_states, p_right)

# Solve Bellman: V = (I - gamma*P)^{-1} r
A_bellman = np.eye(N_states) - gamma * P_mdp
V_star = np.linalg.solve(A_bellman, r_mdp)

print('Value function via Bellman equation:')
for s in range(N_states):
    print(f'  V*({s}) = {V_star[s]:.6f}')

# Verify via value iteration
V_vi = np.zeros(N_states)
for _ in range(2000):
    V_vi = r_mdp + gamma * P_mdp @ V_vi

print(f'\nValue iteration agrees: {np.allclose(V_star, V_vi, atol=1e-6)}')
assert np.allclose(V_star, V_vi, atol=1e-6)
print('PASS - Bellman and value iteration agree')

# TD(0) convergence
print('\nTD(0) learning:')
V_td = np.zeros(N_states)
visit_counts = np.ones(N_states)
mse_hist = []
for ep in range(5000):
    s = np.random.randint(0, N_states-1)
    for _ in range(50):
        if s == N_states-1: break
        go_right = np.random.rand() < p_right
        s_next = min(s+1, N_states-1) if go_right else max(s-1, 0)
        r = 1.0 if go_right and s == N_states-2 else 0.0
        alpha = 1 / visit_counts[s]
        visit_counts[s] += 1
        V_td[s] += alpha * (r + gamma * V_td[s_next] - V_td[s])
        s = s_next
    if ep % 1000 == 999:
        mse = np.mean((V_td[:N_states-1] - V_star[:N_states-1])**2)
        mse_hist.append(mse)
        print(f'  ep={ep+1:5d}: MSE = {mse:.8f}')
print(f'Final MSE: {mse_hist[-1]:.2e}')
print('PASS - TD(0) converges to V*')

13. Summary Visualizations

Comparing convergence profiles, Metropolis-Hastings traces, and PageRank distributions.

Code cell 29

# === 13.1 Summary Plots ===

import numpy as np
np.random.seed(42)

try:
    import matplotlib.pyplot as plt
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

# === Plot 1: TV distance convergence for different spectral gaps ===
n_steps = 50
gaps = [0.05, 0.2, 0.5, 0.8]
pi_vals = {}
tv_vals = {}

for gap in gaps:
    lambda2 = 1 - gap
    # 2-state chain with this spectral gap
    # Eigenvalues 1 and lambda2; stationary = (0.5, 0.5)
    p = (1 - lambda2) / 2
    P_g = np.array([[1-p, p], [p, 1-p]])
    pi_g = np.array([0.5, 0.5])
    tv_g = []
    for n in range(n_steps+1):
        Pn = np.linalg.matrix_power(P_g, n)
        tv_g.append(0.5 * np.abs(Pn[0] - pi_g).sum())
    tv_vals[gap] = tv_g

if HAS_MPL:
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

    # TV convergence
    for gap, tv in tv_vals.items():
        axes[0].semilogy(range(len(tv)), tv, label=f'gap={gap}')
    axes[0].set_xlabel('Steps'); axes[0].set_ylabel('TV distance (log)')
    axes[0].set_title('Convergence Rate vs Spectral Gap')
    axes[0].legend()

    # MH trace plot
    def log_pi_dw(theta):
        return -theta**4/4 + theta**2/2
    theta = 0.0
    mh_trace = []
    for _ in range(3000):
        t_prime = theta + 0.5 * np.random.randn()
        if np.log(np.random.rand()) < log_pi_dw(t_prime) - log_pi_dw(theta):
            theta = t_prime
        mh_trace.append(theta)
    axes[1].plot(mh_trace, alpha=0.7, lw=0.5)
    axes[1].axhline(1, color='r', linestyle='--', alpha=0.5)
    axes[1].axhline(-1, color='r', linestyle='--', alpha=0.5)
    axes[1].set_title('MH Trace (Double Well)')
    axes[1].set_xlabel('Step')

    # PageRank distribution
    A_pr = np.array([[0,1,1,0,0],[0,0,1,1,0],[0,0,0,1,1],[1,0,0,0,1],[0,1,0,0,0]], dtype=float)
    out_deg = A_pr.sum(axis=1)
    P_pr = 0.85 * (A_pr / out_deg[:, None]) + 0.15 * np.ones((5,5)) / 5
    pi_pr = np.ones(5)/5
    for _ in range(100):
        pi_pr = pi_pr @ P_pr
    axes[2].bar(range(1, 6), pi_pr, color='steelblue')
    axes[2].set_xlabel('Page'); axes[2].set_ylabel('PageRank')
    axes[2].set_title('PageRank Distribution')

    plt.tight_layout(); plt.show()
    print('Summary plots rendered.')
else:
    print('Matplotlib not available; skipping plots.')

print('\nAll theory notebook cells completed.')

Summary

This notebook covered the complete Markov chain theory:

  1. Transition matrices — Chapman-Kolmogorov, nn-step distributions
  2. State classification — communicating classes, recurrence, transience, periodicity
  3. Stationary distributions — Perron-Frobenius, power iteration, eigendecomposition
  4. Convergence — TV distance, mixing time, spectral gap bounds
  5. Detailed balance — reversibility, birth-death chains
  6. Mixing times — spectral gap, lazy chains
  7. Metropolis-Hastings — detailed balance proof, proposal tuning
  8. Gibbs sampling — full conditionals, mixing with correlated targets
  9. PageRank — power iteration, dangling nodes, convergence rate
  10. HMM — forward algorithm, Viterbi decoding
  11. CTMC — generator matrix, M/M/1 queue
  12. MDP/RL — Bellman equation, TD(0) convergence

Next: Chapter 7 Statistics → ../../07-Statistics/

Skill Check

Test this lesson

Answer 4 quick questions to lock in the lesson and feed your adaptive practice queue.

--
Score
0/4
Answered
Not attempted
Status
1

Which module does this lesson belong to?

2

Which section is covered in this lesson content?

3

Which term is most central to this lesson?

4

What is the best way to use this lesson for real learning?

Your answers save locally first, then sync when account storage is available.
Practice queue