Source code for tensorquantlib.finance.heston

"""
Heston stochastic volatility model — analytical pricing and Monte Carlo simulation.

The Heston (1993) model augments Black-Scholes with a mean-reverting variance process:

    dS = (r - q) S dt + sqrt(V) S dW_S
    dV = kappa (theta - V) dt + xi sqrt(V) dW_V
    Corr(dW_S, dW_V) = rho * dt

This module provides:
    heston_price_mc   -- Monte Carlo price (reference implementation)
    heston_price      -- Semi-analytic price via COS/Carr-Madan characteristic function
    heston_greeks     -- Delta, Gamma, Vega, Theta via finite differences on heston_price
    HestonCalibrator  -- Calibrate Heston parameters to market IV surface

References:
    Heston, S.L. (1993). A Closed-Form Solution for Options with Stochastic Volatility
    with Applications to Bond and Currency Options. RFS 6(2), 327-343.

    Fang, F. & Oosterlee, C.W. (2008). A Novel Pricing Method for European Options Based
    on Fourier-Cosine Series Expansions. SIAM J. Sci. Comput., 31(2), 826-848.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Union

import numpy as np
from scipy.integrate import quad
from scipy.optimize import minimize
from scipy.stats import norm as _scipy_norm

# Local alias used in QE scheme (norm PPF for V sampling)
norm_ppf = _scipy_norm.ppf



# ------------------------------------------------------------------ #
# Parameter container
# ------------------------------------------------------------------ #


[docs] @dataclass class HestonParams: """Heston model parameters. Attributes: kappa: Mean-reversion speed (>0). theta: Long-run variance (>0). xi: Vol-of-vol (>0). rho: Correlation between asset and variance Brownian motions (-1 < rho < 1). v0: Initial variance (>0). """ kappa: float = 2.0 theta: float = 0.04 xi: float = 0.3 rho: float = -0.7 v0: float = 0.04
[docs] def feller_satisfied(self) -> bool: """Check the Feller condition: 2 * kappa * theta > xi^2.""" return 2.0 * self.kappa * self.theta > self.xi**2
def to_array(self) -> np.ndarray: return np.array([self.kappa, self.theta, self.xi, self.rho, self.v0]) @classmethod def from_array(cls, arr: np.ndarray) -> HestonParams: return cls(kappa=arr[0], theta=arr[1], xi=arr[2], rho=arr[3], v0=arr[4])
# ------------------------------------------------------------------ # # Characteristic function # ------------------------------------------------------------------ # def _heston_cf( phi: complex, S: float, K: float, T: float, r: float, q: float, params: HestonParams, ) -> complex: """Log-price characteristic function under Heston model (Heston 1993 formulation). Returns E[exp(i * phi * log(S_T))] under the risk-neutral measure. """ kappa, theta, xi, rho, v0 = params.kappa, params.theta, params.xi, params.rho, params.v0 x = np.log(S) + (r - q) * T # Standard Heston characteristic function (Albrecher et al. 2007 stable form) d = np.sqrt((rho * xi * 1j * phi - kappa) ** 2 + xi**2 * (1j * phi + phi**2)) g_num = kappa - rho * xi * 1j * phi - d g_den = kappa - rho * xi * 1j * phi + d g = g_num / g_den exp_dT = np.exp(-d * T) C = (r - q) * 1j * phi * T + (kappa * theta / xi**2) * ( g_num * T - 2.0 * np.log((1.0 - g * exp_dT) / (1.0 - g)) ) D = (g_num / xi**2) * (1.0 - exp_dT) / (1.0 - g * exp_dT) return np.exp(C + D * v0 + 1j * phi * x) # ------------------------------------------------------------------ # # Semi-analytic (Carr-Madan) pricing # ------------------------------------------------------------------ #
[docs] def heston_price( S: float, K: float, T: float, r: float, params: HestonParams, q: float = 0.0, option_type: str = "call", *, integration_limit: float = 200.0, n_points: int = 100, ) -> float: """Semi-analytic Heston call/put price via Gil-Pelaez inversion. Uses numerical integration of the characteristic function. Args: S: Spot price. K: Strike price. T: Time to expiry (years). r: Risk-free rate. params: HestonParams instance. q: Continuous dividend yield. option_type: 'call' or 'put'. integration_limit: Upper limit for numerical integration (default 200). n_points: Number of integration points (higher = more accurate). Returns: Option price. Example: >>> p = HestonParams(kappa=2.0, theta=0.04, xi=0.3, rho=-0.7, v0=0.04) >>> price = heston_price(100.0, 100.0, 1.0, 0.05, p) >>> 5.0 < price < 20.0 True """ log_K = np.log(K) # Precompute CF(-i) for normalisation of the P1 integrand (S-numeraire probability) cf_minus_i = _heston_cf(-1j, S, K, T, r, q, params) def integrand_p1(phi: float) -> float: # P1 = Prob(S_T > K) under the S-numeraire (forward measure equivalent). # Uses the Radon-Nikodym derivative: CF_S(phi) = CF_Q(phi - i) / CF_Q(-i) # where CF_Q(u) = E^Q[exp(i*u*ln(S_T))]. cf_val = _heston_cf(phi - 1j, S, K, T, r, q, params) num = np.exp(-1j * phi * log_K) * cf_val return float(np.real(num / (1j * phi * cf_minus_i))) def integrand_p2(phi: float) -> float: # P2 = Prob(S_T > K) under the risk-neutral measure Q. cf_val = _heston_cf(phi, S, K, T, r, q, params) num = np.exp(-1j * phi * log_K) * cf_val return float(np.real(num / (1j * phi))) # Gauss-Legendre quadrature (avoid singularity at phi=0) eps = 1e-6 p1, _ = quad(integrand_p1, eps, integration_limit, limit=n_points) p2, _ = quad(integrand_p2, eps, integration_limit, limit=n_points) P1 = 0.5 + p1 / np.pi P2 = 0.5 + p2 / np.pi call_price = S * np.exp(-q * T) * P1 - K * np.exp(-r * T) * P2 call_price = float(call_price) if option_type == "call": return max(call_price, 0.0) else: # Put via put-call parity return float(call_price - S * np.exp(-q * T) + K * np.exp(-r * T))
# ------------------------------------------------------------------ # # Monte Carlo reference implementation # ------------------------------------------------------------------ #
[docs] def heston_price_mc( S: float, K: float, T: float, r: float, params: HestonParams, q: float = 0.0, option_type: str = "call", *, n_paths: int = 100_000, n_steps: int = 252, seed: int | None = None, return_stderr: bool = False, scheme: str = "qe", ) -> Union[float, tuple[float, float]]: """Monte Carlo Heston price using the Quadratic-Exponential (QE) discretisation scheme. The QE scheme (Andersen 2008) provides high accuracy for the CIR variance process, virtually eliminating the discretisation bias of Euler-Maruyama. Args: S: Spot. K: Strike. T: Time to expiry (years). r: Risk-free rate. params: HestonParams. q: Dividend yield. option_type: 'call' or 'put'. n_paths: Number of simulation paths (default 100k). n_steps: Time steps (default 252 = daily). seed: Random seed. return_stderr: If True, returns (price, stderr) tuple. scheme: Discretisation scheme — 'qe' (default, Andersen 2008) or 'euler' (Euler full-truncation, faster but biased). Returns: Price, or (price, stderr) if return_stderr=True. References: Andersen, L.B.G. (2008). Efficient Simulation of the Heston Stochastic Volatility Model. Journal of Computational Finance, 11(3). Example: >>> p = HestonParams() >>> price, se = heston_price_mc(100, 100, 1.0, 0.05, p, return_stderr=True, seed=42) >>> 5.0 < price < 20.0 True """ rng = np.random.default_rng(seed) dt = T / n_steps sqrt_dt = np.sqrt(dt) kappa, theta, xi, rho, v0 = params.kappa, params.theta, params.xi, params.rho, params.v0 S_t = np.full(n_paths, S, dtype=float) V_t = np.full(n_paths, v0, dtype=float) if scheme == "euler": # Euler-Maruyama with full-truncation for V (fast but biased) for _ in range(n_steps): z1 = rng.standard_normal(n_paths) z2 = rng.standard_normal(n_paths) zv = rho * z1 + np.sqrt(1.0 - rho**2) * z2 V_t_pos = np.maximum(V_t, 0.0) sqrt_V = np.sqrt(V_t_pos) S_t = S_t * np.exp((r - q - 0.5 * V_t_pos) * dt + sqrt_V * z1 * sqrt_dt) V_t = V_t + kappa * (theta - V_t_pos) * dt + xi * sqrt_V * zv * sqrt_dt V_t = np.maximum(V_t, 0.0) else: # Quadratic-Exponential (QE) scheme (Andersen 2008) for variance # Precompute constants exp_kdt = np.exp(-kappa * dt) xi2 = xi**2 K0 = -rho * kappa * theta * dt / xi K1 = 0.5 * dt * (kappa * rho / xi - 0.5) - rho / xi K2 = 0.5 * dt * (kappa * rho / xi - 0.5) + rho / xi K3 = 0.5 * dt * (1.0 - rho**2) K4 = K3 # same by symmetry # QE critical ratio threshold (Andersen suggests psi_c = 1.5) psi_c = 1.5 for _ in range(n_steps): z1 = rng.standard_normal(n_paths) u_v = rng.uniform(0.0, 1.0, n_paths) # for V update # QE step for V_t+dt given V_t # Conditional mean and variance of V(t+dt)|V(t) for CIR process m = theta + (V_t - theta) * exp_kdt s2 = ( V_t * xi2 * exp_kdt / kappa * (1.0 - exp_kdt) + theta * xi2 / (2.0 * kappa) * (1.0 - exp_kdt) ** 2 ) s2 = np.maximum(s2, 0.0) psi = s2 / (m**2) # High-psi regime (exponential mixture): psi > psi_c # Low-psi regime (quadratic normal): psi <= psi_c hi = psi > psi_c # --- Low-psi branch: quadratic approximation --- inv_psi = 1.0 / np.where(hi, 1.0, psi) # safe denominator b2 = np.maximum( 2.0 * inv_psi - 1.0 + np.sqrt(2.0 * inv_psi) * np.sqrt(2.0 * inv_psi - 1.0), 0.0 ) b = np.sqrt(b2) a = m / (1.0 + b2) z_v = norm_ppf(u_v) # standard normal quantile V_lo = a * (b + z_v) ** 2 # --- High-psi branch: exponential distribution --- p_exp = (psi - 1.0) / (psi + 1.0) beta = (1.0 - p_exp) / np.where(hi, m, 1.0) # avoid /0 in lo branch # Inversion of mixed distribution # u ~ [0, p] -> V = 0; u ~ (p, 1] -> V = ln(...) / beta v_hi_safe = np.where( u_v > p_exp, np.log((1.0 - p_exp) / np.maximum(1.0 - u_v, 1e-300)) / np.where(beta > 0, beta, 1.0), 0.0, ) V_hi = np.where(u_v > p_exp, v_hi_safe, 0.0) V_next = np.where(hi, V_hi, V_lo) # Log-asset update using V_t and V_next (trapezoidal integral approximation) S_t = S_t * np.exp( (r - q) * dt + K0 + K1 * V_t + K2 * V_next + np.sqrt(K3 * V_t + K4 * V_next) * z1 ) V_t = V_next if option_type == "call": payoffs = np.maximum(S_t - K, 0.0) else: payoffs = np.maximum(K - S_t, 0.0) discount = np.exp(-r * T) price = discount * float(np.mean(payoffs)) stderr = discount * float(np.std(payoffs) / np.sqrt(n_paths)) if return_stderr: return price, stderr return price
# ------------------------------------------------------------------ # # Greeks (finite differences on the analytic price) # ------------------------------------------------------------------ #
[docs] def heston_greeks( S: float, K: float, T: float, r: float, params: HestonParams, q: float = 0.0, option_type: str = "call", *, dS: float = 0.5, dT: float = 1.0 / 365.0, dv0: float = 0.001, ) -> dict[str, float]: """Compute Heston Greeks via finite differences on the semi-analytic price. Args: S: Spot price. K, T, r, q, option_type: Option parameters. params: HestonParams instance. dS: Bump size for Delta/Gamma (default 0.5). dT: Bump size for Theta (default 1/365 year). dv0: Bump size for Vega (relative to v0, default 0.001). Returns: Dictionary with keys: 'delta', 'gamma', 'theta', 'vega'. Vega is reported per unit change in v0 (not in sigma). Example: >>> p = HestonParams() >>> g = heston_greeks(100.0, 100.0, 1.0, 0.05, p) >>> 0.0 < g['delta'] < 1.0 True """ price_0 = heston_price(S, K, T, r, params, q=q, option_type=option_type) price_up = heston_price(S + dS, K, T, r, params, q=q, option_type=option_type) price_dn = heston_price(S - dS, K, T, r, params, q=q, option_type=option_type) delta = (price_up - price_dn) / (2.0 * dS) gamma = (price_up - 2.0 * price_0 + price_dn) / (dS**2) price_T = heston_price(S, K, T - dT, r, params, q=q, option_type=option_type) theta = (price_T - price_0) / dT # per year, negative = time decay p_vega = HestonParams( kappa=params.kappa, theta=params.theta, xi=params.xi, rho=params.rho, v0=params.v0 + dv0, ) price_v = heston_price(S, K, T, r, p_vega, q=q, option_type=option_type) vega = (price_v - price_0) / dv0 # per unit v0 bump return {"delta": delta, "gamma": gamma, "theta": theta, "vega": vega}
# ------------------------------------------------------------------ # # Calibration # ------------------------------------------------------------------ #
[docs] @dataclass class HestonCalibrator: """Calibrate Heston model parameters to a market implied-volatility surface. Usage:: from tensorquantlib.finance.heston import HestonCalibrator, HestonParams import numpy as np cal = HestonCalibrator(S=100.0, r=0.05) K_grid = np.array([90., 95., 100., 105., 110.]) T_grid = np.array([0.5, 1.0, 1.5]) # Build synthetic market IV (20% flat) iv_mkt = np.full((len(K_grid), len(T_grid)), 0.20) cal.fit(iv_mkt, K_grid, T_grid) print(cal.params_) Attributes: S: Spot price (fixed during calibration). r: Risk-free rate. q: Dividend yield. params_: Calibrated HestonParams (set after fit()). rmse_: Root-mean-square error in IV after fit. """ S: float r: float q: float = 0.0 option_type: str = "call" params_: HestonParams = field(default_factory=HestonParams) rmse_: float = field(default=float("nan"), init=False, repr=False)
[docs] def fit( self, iv_market: np.ndarray, K_grid: np.ndarray, T_grid: np.ndarray, *, n_restarts: int = 3, maxiter: int = 500, verbose: bool = False, ) -> HestonCalibrator: """Fit Heston parameters to market IV surface. Args: iv_market: 2-D array of market implied vols, shape (len(K_grid), len(T_grid)). K_grid: 1-D array of strikes. T_grid: 1-D array of expiries. n_restarts: Number of random re-starts for the optimiser. maxiter: Maximum optimiser iterations per restart. verbose: Print progress. Returns: self """ from tensorquantlib.finance.implied_vol import implied_vol # avoid circular import def _objective(x: np.ndarray) -> float: kappa, theta, xi, rho, v0 = x params = HestonParams(kappa=kappa, theta=theta, xi=xi, rho=rho, v0=v0) sq_err = 0.0 for i, K in enumerate(K_grid): for j, T in enumerate(T_grid): try: model_price = heston_price( self.S, float(K), float(T), self.r, params, q=self.q, option_type=self.option_type, ) # Convert model price to IV model_iv = implied_vol( model_price, self.S, float(K), float(T), self.r, q=self.q, option_type=self.option_type, ) sq_err += (model_iv - iv_market[i, j]) ** 2 except Exception: sq_err += 1.0 return sq_err bounds = [ (0.1, 10.0), # kappa (0.001, 0.5), # theta (0.1, 2.0), # xi (-0.99, 0.99), # rho (0.001, 0.5), # v0 ] best_result = None rng = np.random.default_rng(42) for restart in range(n_restarts): if restart == 0: x0 = np.array([2.0, 0.04, 0.3, -0.5, 0.04]) else: # Random start within bounds x0 = np.array([rng.uniform(lo, hi) for lo, hi in bounds]) result = minimize( _objective, x0, method="L-BFGS-B", bounds=bounds, options={"maxiter": maxiter, "ftol": 1e-12, "gtol": 1e-8}, ) if best_result is None or result.fun < best_result.fun: best_result = result if verbose: print( f"Restart {restart + 1}/{n_restarts}: RMSE={np.sqrt(result.fun / (len(K_grid) * len(T_grid))):.6f}" ) if best_result is not None: self.params_ = HestonParams.from_array(best_result.x) n_obs = len(K_grid) * len(T_grid) self.rmse_ = float(np.sqrt(best_result.fun / n_obs)) return self
[docs] def implied_vol_surface( self, K_grid: np.ndarray, T_grid: np.ndarray, ) -> np.ndarray: """Compute the model IV surface for the calibrated parameters. Args: K_grid: 1-D array of strikes. T_grid: 1-D array of expiries. Returns: 2-D array of model-implied volatilities, shape (len(K_grid), len(T_grid)). """ from tensorquantlib.finance.implied_vol import implied_vol surface = np.full((len(K_grid), len(T_grid)), np.nan) for i, K in enumerate(K_grid): for j, T in enumerate(T_grid): try: price = heston_price( self.S, float(K), float(T), self.r, self.params_, q=self.q, option_type=self.option_type, ) surface[i, j] = implied_vol( price, self.S, float(K), float(T), self.r, q=self.q, option_type=self.option_type, ) except Exception: surface[i, j] = np.nan return surface