Source code for tensorquantlib.finance.volatility

"""Volatility surface models: SABR and SVI parameterizations.

Implements industry-standard volatility surface parameterizations:
- SABR (Hagan et al., 2002): stochastic alpha-beta-rho model
- SVI (Gatheral, 2004): stochastic volatility inspired parameterization
"""

from __future__ import annotations

import numpy as np
from scipy.optimize import minimize

# ---------------------------------------------------------------------------
# SABR model — Hagan (2002) approximation
# ---------------------------------------------------------------------------


[docs] def sabr_implied_vol( F: float, K: float | np.ndarray, T: float, alpha: float, beta: float, rho: float, nu: float, ) -> float | np.ndarray: """Compute SABR implied volatility using the Hagan (2002) approximation. Parameters ---------- F : float Forward price. K : float or array Strike price(s). T : float Time to expiry in years. alpha : float Initial volatility level (α > 0). beta : float CEV exponent (0 ≤ β ≤ 1). rho : float Correlation between forward and vol (-1 < ρ < 1). nu : float Vol-of-vol (ν ≥ 0). Returns ------- float or array SABR implied Black volatility. """ K = np.asarray(K, dtype=float) scalar_input = K.ndim == 0 K = np.atleast_1d(K) # Avoid division by zero for ATM eps = 1e-12 log_FK = np.log(F / K) atm_mask = np.abs(log_FK) < eps sigma = np.empty_like(K, dtype=float) # --- ATM formula --- if np.any(atm_mask): Fm = F ** (1.0 - beta) term1 = alpha / Fm corr = ( 1.0 + ( ((1.0 - beta) ** 2 / 24.0) * alpha**2 / (F ** (2.0 - 2.0 * beta)) + 0.25 * rho * beta * nu * alpha / (F ** (1.0 - beta)) + (2.0 - 3.0 * rho**2) / 24.0 * nu**2 ) * T ) sigma[atm_mask] = term1 * corr # --- Non-ATM formula --- non_atm = ~atm_mask if np.any(non_atm): K_na = K[non_atm] FK_na = F * K_na FK_mid_na = np.sqrt(FK_na) FK_beta_na = FK_mid_na ** (1.0 - beta) log_FK_na = np.log(F / K_na) z = (nu / alpha) * FK_beta_na * log_FK_na x = np.log((np.sqrt(1.0 - 2.0 * rho * z + z**2) + z - rho) / (1.0 - rho)) # Protect against x = 0 (use safe division to avoid RuntimeWarning) safe_x = np.where(np.abs(x) < eps, 1.0, x) ratio = np.where(np.abs(x) < eps, 1.0, z / safe_x) # Denominator corrections term1 = (1.0 - beta) ** 2 / 24.0 * log_FK_na**2 term2 = (1.0 - beta) ** 4 / 1920.0 * log_FK_na**4 denom = FK_beta_na * (1.0 + term1 + term2) # Numerator correction corr = ( 1.0 + ( ((1.0 - beta) ** 2 / 24.0) * alpha**2 / (FK_mid_na ** (2.0 - 2.0 * beta)) + 0.25 * rho * beta * nu * alpha / FK_beta_na + (2.0 - 3.0 * rho**2) / 24.0 * nu**2 ) * T ) sigma[non_atm] = (alpha / denom) * ratio * corr return float(sigma[0]) if scalar_input else sigma
[docs] def sabr_calibrate( market_vols: np.ndarray, F: float, strikes: np.ndarray, T: float, beta: float = 0.5, initial_guess: tuple[float, float, float] | None = None, ) -> dict[str, float]: """Calibrate SABR parameters (alpha, rho, nu) to market implied vols. Parameters ---------- market_vols : array Market implied volatilities. F : float Forward price. strikes : array Strike prices corresponding to market_vols. T : float Time to expiry. beta : float Fixed CEV exponent (default 0.5). initial_guess : tuple, optional Initial (alpha, rho, nu). Defaults to (0.2, -0.3, 0.4). Returns ------- dict {'alpha': ..., 'beta': ..., 'rho': ..., 'nu': ..., 'rmse': ...} """ market_vols = np.asarray(market_vols, dtype=float) strikes = np.asarray(strikes, dtype=float) if initial_guess is None: initial_guess = (0.2, -0.3, 0.4) def objective(params): alpha, rho, nu = params if alpha <= 0 or nu < 0 or rho <= -1 or rho >= 1: return 1e10 try: model_vols = sabr_implied_vol(F, strikes, T, alpha, beta, rho, nu) return np.sum((model_vols - market_vols) ** 2) except (ValueError, RuntimeWarning): return 1e10 result = minimize( objective, x0=initial_guess, method="Nelder-Mead", options={"maxiter": 5000, "xatol": 1e-10, "fatol": 1e-10}, ) alpha, rho, nu = result.x model_vols = sabr_implied_vol(F, strikes, T, alpha, beta, rho, nu) rmse = float(np.sqrt(np.mean((model_vols - market_vols) ** 2))) return { "alpha": float(alpha), "beta": float(beta), "rho": float(rho), "nu": float(nu), "rmse": rmse, }
# --------------------------------------------------------------------------- # SVI model — Gatheral (2004) raw parameterization # ---------------------------------------------------------------------------
[docs] def svi_raw( k: float | np.ndarray, a: float, b: float, rho: float, m: float, sigma: float, ) -> float | np.ndarray: """SVI raw parameterization for total implied variance. w(k) = a + b * (rho * (k - m) + sqrt((k - m)^2 + sigma^2)) Parameters ---------- k : float or array Log-moneyness ln(K/F). a : float Overall level of variance. b : float Slope parameter (b ≥ 0). rho : float Rotation parameter (-1 ≤ ρ ≤ 1). m : float Translation parameter. sigma : float Smoothing parameter (σ > 0). Returns ------- float or array Total implied variance w(k). """ k = np.asarray(k, dtype=float) dm = k - m return a + b * (rho * dm + np.sqrt(dm**2 + sigma**2))
[docs] def svi_implied_vol( k: float | np.ndarray, T: float, a: float, b: float, rho: float, m: float, sigma: float, ) -> float | np.ndarray: """Convert SVI total variance to Black implied volatility. Parameters ---------- k : float or array Log-moneyness ln(K/F). T : float Time to expiry. a, b, rho, m, sigma : float SVI raw parameters. Returns ------- float or array Implied volatility (annualized). """ w = svi_raw(k, a, b, rho, m, sigma) # Total variance -> annualized vol return np.sqrt(np.maximum(w, 0.0) / T)
[docs] def svi_calibrate( market_vols: np.ndarray, strikes: np.ndarray, F: float, T: float, initial_guess: tuple[float, float, float, float, float] | None = None, ) -> dict[str, float]: """Calibrate SVI raw parameters to market implied vols. Parameters ---------- market_vols : array Market implied volatilities. strikes : array Strike prices. F : float Forward price. T : float Time to expiry. initial_guess : tuple, optional Initial (a, b, rho, m, sigma). Returns ------- dict {'a': ..., 'b': ..., 'rho': ..., 'm': ..., 'sigma': ..., 'rmse': ...} """ market_vols = np.asarray(market_vols, dtype=float) strikes = np.asarray(strikes, dtype=float) k = np.log(strikes / F) market_total_var = market_vols**2 * T if initial_guess is None: initial_guess = ( float(np.mean(market_total_var)), 0.1, -0.1, 0.0, 0.1, ) def objective(params): a, b, rho, m, sigma_p = params if b < 0 or sigma_p <= 0 or rho < -1 or rho > 1: return 1e10 if a + b * sigma_p * np.sqrt(1.0 - rho**2) < 0: return 1e10 try: w = svi_raw(k, a, b, rho, m, sigma_p) if np.any(w < 0): return 1e10 return float(np.sum((w - market_total_var) ** 2)) except (ValueError, RuntimeWarning): return 1e10 result = minimize( objective, x0=initial_guess, method="Nelder-Mead", options={"maxiter": 10000, "xatol": 1e-12, "fatol": 1e-12}, ) a, b, rho, m, sigma_p = result.x model_vols = svi_implied_vol(k, T, a, b, rho, m, sigma_p) rmse = float(np.sqrt(np.mean((model_vols - market_vols) ** 2))) return { "a": float(a), "b": float(b), "rho": float(rho), "m": float(m), "sigma": float(sigma_p), "rmse": rmse, }
[docs] def svi_surface( strikes: np.ndarray, expiries: np.ndarray, F: float | np.ndarray, params_per_expiry: list[dict[str, float]], ) -> np.ndarray: """Build an implied volatility surface from SVI parameters per expiry. Parameters ---------- strikes : array, shape (n_strikes,) Strike prices. expiries : array, shape (n_expiries,) Expiry times. F : float or array of shape (n_expiries,) Forward price(s) per expiry. params_per_expiry : list of dicts Each dict must have keys 'a', 'b', 'rho', 'm', 'sigma'. Returns ------- array, shape (n_expiries, n_strikes) Implied volatility surface. """ strikes = np.asarray(strikes, dtype=float) expiries = np.asarray(expiries, dtype=float) F_arr = np.broadcast_to(np.asarray(F, dtype=float), expiries.shape) surface = np.empty((len(expiries), len(strikes))) for i, (T, f, params) in enumerate(zip(expiries, F_arr, params_per_expiry)): k = np.log(strikes / f) surface[i] = svi_implied_vol( k, T, params["a"], params["b"], params["rho"], params["m"], params["sigma"] ) return surface