Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| 升级版 app.py — 高精度数值 / 高性能统计 + 精细化 LLM 策略输出 | |
| 功能亮点: | |
| - Crank–Nicolson PDE(Black–Scholes) | |
| - Monte Carlo:Antithetic + Control variates(使用 BS 解析作为控制变量) | |
| - GARCH(1,1) 使用 arch (若可用)或 MLE minimize 回退 | |
| - Johansen 协整检验(statsmodels 若可用) | |
| - 组合优化使用 cvxpy(若可用)或 SciPy 回退 | |
| - LLM 生成结构化 JSON 策略(策略说明、信号、伪代码、回测/风险提示) | |
| - 保持之前的几何/Whitney/Noise/Gradient 模块兼容 | |
| """ | |
| import os | |
| import json | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Any, Dict, Optional, Tuple, List | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| # torch used for embedding / potential LSTM | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.optim import Adam | |
| # statsmodels optional | |
| try: | |
| import statsmodels.api as sm | |
| from statsmodels.tsa.vector_ar.vecm import coint_johansen | |
| from statsmodels.tsa.api import VAR | |
| STATS_MODELS_AVAILABLE = True | |
| except Exception: | |
| STATS_MODELS_AVAILABLE = False | |
| # arch package (GARCH) optional | |
| try: | |
| from arch import arch_model | |
| ARCH_AVAILABLE = True | |
| except Exception: | |
| ARCH_AVAILABLE = False | |
| # cvxpy for portfolio optimization optional | |
| try: | |
| import cvxpy as cp | |
| CVXPY_AVAILABLE = True | |
| except Exception: | |
| CVXPY_AVAILABLE = False | |
| # scipy fallback utilities | |
| from scipy import stats | |
| from scipy.optimize import minimize | |
| from scipy.linalg import toeplitz | |
| # HTTP for LLM | |
| import requests | |
| # Gradio UI | |
| import gradio as gr | |
| # Logging | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("quant_upgraded") | |
| # base dir | |
| BASE_DIR = Path("/tmp/quant_upgraded") | |
| BASE_DIR.mkdir(parents=True, exist_ok=True) | |
| # --------------------- | |
| # Configuration | |
| # --------------------- | |
| class Config: | |
| def __init__(self): | |
| self.device = 'cpu' | |
| if torch.cuda.is_available(): | |
| self.device = 'cuda' | |
| self.hf_token = os.getenv("HF_API_TOKEN", "") | |
| self.hf_default_model = "Qwen/Qwen2.5-1.5B-Instruct" | |
| self.mc_default_paths = 20000 | |
| self.cv_solver = "cvxpy" if CVXPY_AVAILABLE else "scipy" | |
| self.statsmodels = STATS_MODELS_AVAILABLE | |
| self.arch = ARCH_AVAILABLE | |
| config = Config() | |
| # --------------------- | |
| # Geometry / existing modules (compact) | |
| # --------------------- | |
| class FiberBundleTheory: | |
| def __init__(self, fiber_dim=16): | |
| self.fiber_dim = fiber_dim | |
| self.whitney_factor = 2 * fiber_dim | |
| def project_to_base(self, x: np.ndarray) -> np.ndarray: | |
| x = np.asarray(x).ravel() | |
| if len(x) < self.fiber_dim: | |
| x = np.pad(x, (0, self.fiber_dim - len(x))) | |
| half = self.fiber_dim // 2 | |
| vix2 = float(np.sum(x[:half]**2) / (half + 1e-12)) | |
| rv = float(np.std(x[half:])) | |
| return np.array([vix2, rv]) | |
| def compute_vrp(self, base_point: np.ndarray) -> float: | |
| vix2, rv = base_point | |
| return vix2 - rv | |
| class NoiseExplorer: | |
| def regress_vix2_vs_rv(self, vix2: np.ndarray, rv: np.ndarray): | |
| X = np.vstack([rv, np.ones_like(rv)]).T | |
| coef, *_ = np.linalg.lstsq(X, vix2, rcond=None) | |
| a, b = float(coef[0]), float(coef[1]) | |
| preds = a * rv + b | |
| resid = vix2 - preds | |
| return {'a': a, 'b': b, 'preds': preds, 'resid': resid} | |
| def resid_stats(self, resid: np.ndarray): | |
| resid = np.asarray(resid) | |
| mean = float(np.mean(resid)) | |
| var = float(np.var(resid)) | |
| ac1 = float(np.corrcoef(resid[:-1], resid[1:])[0,1]) if len(resid) > 2 else 0.0 | |
| fft = np.fft.rfft(resid - mean) | |
| freqs = np.fft.rfftfreq(len(resid)) | |
| power = np.abs(fft)**2 | |
| dominant_freq = float(freqs[np.argmax(power[1:])+1]) if len(power) > 1 else 0.0 | |
| return {'mean': mean, 'var': var, 'ac1': ac1, 'dominant_freq': dominant_freq} | |
| def explore(self, df: pd.DataFrame, vix2_col: Optional[str]=None, rv_col: Optional[str]=None): | |
| numcols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| if not numcols: | |
| return None | |
| if vix2_col is None or rv_col is None: | |
| vix2_col = numcols[0] | |
| rv_col = numcols[1] if len(numcols) > 1 else numcols[0] | |
| vix2 = df[vix2_col].fillna(method='ffill').values | |
| rv = df[rv_col].fillna(method='ffill').values | |
| reg = self.regress_vix2_vs_rv(vix2, rv) | |
| st = self.resid_stats(reg['resid']) | |
| vrp = vix2 - reg['preds'] | |
| return { | |
| 'vix2_col': vix2_col, | |
| 'rv_col': rv_col, | |
| 'reg': {'a': reg['a'], 'b': reg['b']}, | |
| 'resid_stats': st, | |
| 'vrp_mean': float(np.mean(vrp)), | |
| 'vrp_std': float(np.std(vrp)), | |
| 'vrp_series': vrp.tolist(), | |
| 'residuals': reg['resid'].tolist() | |
| } | |
| # --------------------- | |
| # Quant modules (upgraded) | |
| # --------------------- | |
| class StochasticModels: | |
| """High-precision stochastic processes and pricing helpers.""" | |
| def bs_price(S: float, K: float, r: float, q: float, sigma: float, T: float, option_type: str='call') -> float: | |
| """Black-Scholes closed-form price (with dividend yield q).""" | |
| S, K, r, q, sigma, T = map(float, (S, K, r, q, sigma, T)) | |
| if T <= 0 or sigma <= 0: | |
| return float(max(S - K, 0.0) if option_type == 'call' else max(K - S, 0.0)) | |
| d1 = (np.log(S / K) + (r - q + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T)) | |
| d2 = d1 - sigma * np.sqrt(T) | |
| if option_type == 'call': | |
| price = S * np.exp(-q * T) * stats.norm.cdf(d1) - K * np.exp(-r * T) * stats.norm.cdf(d2) | |
| else: | |
| price = K * np.exp(-r * T) * stats.norm.cdf(-d2) - S * np.exp(-q * T) * stats.norm.cdf(-d1) | |
| return float(price) | |
| def heston_simulate(S0: float, v0: float, r: float, kappa: float, theta: float, xi: float, rho: float, T: float, | |
| n_steps: int=252, n_paths: int=2000, seed: Optional[int]=None): | |
| """ | |
| Euler-Maruyama with full-reflection for variance (CIR-like) — more stable by forcing v>=0. | |
| Keep path count moderate unless GPU simulation used externally. | |
| """ | |
| if seed is not None: | |
| np.random.seed(seed) | |
| dt = T / n_steps | |
| S = np.zeros((n_paths, n_steps+1)) | |
| v = np.zeros((n_paths, n_steps+1)) | |
| S[:,0] = S0 | |
| v[:,0] = v0 | |
| for t in range(n_steps): | |
| z1 = np.random.randn(n_paths) | |
| z2 = np.random.randn(n_paths) | |
| w1 = z1 | |
| w2 = rho * z1 + np.sqrt(max(0.0, 1 - rho**2)) * z2 | |
| v_prev = np.maximum(v[:,t], 0.0) | |
| # full truncation Euler | |
| dv = kappa * (theta - v_prev) * dt + xi * np.sqrt(v_prev * dt) * w2 | |
| v_new = np.maximum(v_prev + dv, 1e-8) | |
| dS = r * S[:,t] * dt + np.sqrt(v_prev * dt) * S[:,t] * w1 | |
| S[:,t+1] = S[:,t] + dS | |
| v[:,t+1] = v_new | |
| return S, v | |
| def merton_jump_diffusion(S0: float, mu: float, sigma: float, lamb: float, mu_j: float, sigma_j: float, | |
| T: float, n_steps: int=252, n_paths: int=2000, seed: Optional[int]=None): | |
| """Improved Merton simulator with vectorized operations.""" | |
| if seed is not None: | |
| np.random.seed(seed) | |
| dt = T / n_steps | |
| S = np.full((n_paths, n_steps+1), S0, dtype=float) | |
| for t in range(n_steps): | |
| z = np.random.randn(n_paths) | |
| pois = np.random.poisson(lamb * dt, size=n_paths) | |
| jumps = np.exp(mu_j + sigma_j * np.random.randn(n_paths)) - 1.0 | |
| S[:, t+1] = S[:, t] * (1 + mu*dt + sigma*np.sqrt(dt)*z) + S[:, t] * (jumps * pois) | |
| S[:, t+1] = np.maximum(S[:, t+1], 1e-8) | |
| return S | |
| class NumericalMethods: | |
| """Crank-Nicolson PDE + Monte Carlo with variance reduction.""" | |
| def bs_crank_nicolson(S0: float, K: float, r: float, q: float, sigma: float, T: float, | |
| Smax_mult: float=3.0, M: int=400, N: int=400, option_type: str='call') -> float: | |
| """ | |
| Crank-Nicolson solver for Black-Scholes PDE. More stable with sufficient grid resolution. | |
| M: number of asset steps, N: time steps. | |
| """ | |
| Smax = S0 * Smax_mult | |
| dS = Smax / M | |
| dt = T / N | |
| grid = np.zeros((M+1, N+1)) | |
| Svals = np.linspace(0, Smax, M+1) | |
| # terminal condition | |
| if option_type == 'call': | |
| grid[:, -1] = np.maximum(Svals - K, 0) | |
| else: | |
| grid[:, -1] = np.maximum(K - Svals, 0) | |
| # boundary conditions | |
| grid[0, :] = 0.0 if option_type == 'call' else K * np.exp(-r * (T - np.linspace(0, T, N+1))) | |
| grid[-1, :] = (Smax - K * np.exp(-r * (T - np.linspace(0, T, N+1)))) if option_type == 'call' else 0.0 | |
| # prepare tridiagonal coefficients | |
| j = np.arange(1, M) | |
| a = 0.25 * dt * (sigma**2 * j**2 - (r - q) * j) | |
| b = -0.5 * dt * (sigma**2 * j**2 + r) | |
| c = 0.25 * dt * (sigma**2 * j**2 + (r - q) * j) | |
| # construct A and B matrices (tridiagonal) | |
| A = np.zeros((M-1, M-1)) | |
| B = np.zeros((M-1, M-1)) | |
| for idx in range(M-1): | |
| if idx > 0: | |
| A[idx, idx-1] = -a[idx+1] | |
| B[idx, idx-1] = a[idx+1] | |
| A[idx, idx] = 1 - b[idx+1] | |
| B[idx, idx] = 1 + b[idx+1] | |
| if idx < M-2: | |
| A[idx, idx+1] = -c[idx+1] | |
| B[idx, idx+1] = c[idx+1] | |
| # backward time stepping | |
| from numpy.linalg import solve | |
| for n in reversed(range(N)): | |
| rhs = B.dot(grid[1:M, n+1]) | |
| # add boundary contributions | |
| rhs[0] += a[1] * (grid[0, n] + grid[0, n+1]) | |
| rhs[-1] += c[M-1] * (grid[M, n] + grid[M, n+1]) | |
| grid[1:M, n] = solve(A, rhs) | |
| # interpolate at S0 | |
| i = int(S0 / dS) | |
| if i >= M: | |
| return float(grid[-1, 0]) | |
| w = (S0 - i * dS) / dS | |
| price = (1-w) * grid[i, 0] + w * grid[i+1, 0] | |
| return float(price) | |
| def mc_price_bs_cv(S0: float, K: float, r: float, q: float, sigma: float, T: float, | |
| option_type: str='call', n_paths: int=20000, antithetic: bool=True, seed: Optional[int]=None): | |
| """ | |
| Monte Carlo with antithetic variates and control variate (BS analytic). | |
| Control variate: use discount payoff under geometric Brownian motion analytic expectation = BS price with same params. | |
| """ | |
| if seed is not None: | |
| np.random.seed(seed) | |
| n = n_paths | |
| half = n // 2 if antithetic else n | |
| Z = np.random.randn(half) | |
| if antithetic: | |
| Z = np.concatenate([Z, -Z]) | |
| ST = S0 * np.exp((r - q - 0.5*sigma**2) * T + sigma * np.sqrt(T) * Z) | |
| if option_type == 'call': | |
| payoff = np.maximum(ST - K, 0) | |
| else: | |
| payoff = np.maximum(K - ST, 0) | |
| # control variate: use discounted ST (or log ST) expectation known | |
| # use analytic BS price as control target | |
| bs_analytic = StochasticModels.bs_price(S0, K, r, q, sigma, T, option_type=option_type) | |
| # choose control variable as discounted payoff under geometric mean? simple: use ST | |
| control = ST # expectation of ST under risk-neutral = S0 * exp((r-q)T) | |
| control_mean = S0 * np.exp((r - q) * T) | |
| # compute covariance and adjust | |
| cov_pc = np.cov(payoff, control, ddof=1)[0,1] | |
| var_c = np.var(control, ddof=1) | |
| if var_c > 0: | |
| beta = cov_pc / var_c | |
| else: | |
| beta = 0.0 | |
| adj_payoff = payoff - beta * (control - control_mean) | |
| price = np.exp(-r * T) * np.mean(adj_payoff) | |
| # bias correction via analytic price difference if helpful | |
| return float(price) | |
| class Econometrics: | |
| """GARCH via arch package (preferred) or MLE fallback; Johansen using statsmodels if available""" | |
| def garch_11_fit(returns: np.ndarray): | |
| r = np.asarray(returns).astype(float) | |
| r = r - np.mean(r) | |
| if config.arch: | |
| try: | |
| am = arch_model(r * 100.0, vol='Garch', p=1, q=1, dist='normal') # scale to percent to help arch convergence | |
| res = am.fit(disp='off') | |
| params = res.params.to_dict() | |
| cond_var = res.conditional_volatility / 100.0 | |
| return {'method': 'arch', 'params': params, 'cond_var': cond_var.tolist()} | |
| except Exception as e: | |
| logger.warning(f"arch fit failed: {e}; falling back to MLE.") | |
| # MLE fallback | |
| T = len(r) | |
| def neglog(params): | |
| omega, alpha, beta = params | |
| if omega <= 0 or alpha < 0 or beta < 0 or alpha + beta >= 0.9999: | |
| return 1e12 | |
| h = np.zeros(T) | |
| h[0] = np.var(r) | |
| ll = 0.0 | |
| for t in range(1, T): | |
| h[t] = omega + alpha * r[t-1]**2 + beta * h[t-1] | |
| ll = 0.5 * (np.log(2*np.pi) + np.log(h) + (r**2)/h) | |
| return np.sum(ll) | |
| init = np.array([1e-6, 0.05, 0.9]) | |
| bnds = [(1e-12, None), (0, 0.9999), (0, 0.9999)] | |
| res = minimize(neglog, x0=init, bounds=bnds) | |
| if not res.success: | |
| logger.warning("GARCH MLE did not converge; returning fallback params") | |
| omega, alpha, beta = init | |
| else: | |
| omega, alpha, beta = res.x | |
| # compute h | |
| h = np.zeros(T) | |
| h[0] = np.var(r) | |
| for t in range(1, T): | |
| h[t] = omega + alpha * r[t-1]**2 + beta * h[t-1] | |
| return {'method': 'mle', 'params': {'omega': float(omega), 'alpha': float(alpha), 'beta': float(beta)}, 'cond_var': h.tolist()} | |
| def johansen_test(data: np.ndarray, det_order: int=0, k_ar_diff: int=1): | |
| if config.statsmodels: | |
| try: | |
| res = coint_johansen(data, det_order, k_ar_diff) | |
| return {'eig': res.eig.tolist(), 'lr1': res.lr1.tolist(), 'cvm': res.cvt.tolist()} | |
| except Exception as e: | |
| logger.warning(f"Johansen failed: {e}") | |
| return None | |
| else: | |
| return None | |
| class PortfolioOptimization: | |
| """Black-Litterman and Markowitz using cvxpy if available, else SciPy minimize""" | |
| def gmv_weights(returns: np.ndarray): | |
| R = np.asarray(returns) | |
| cov = np.cov(R.T) | |
| n = cov.shape[0] | |
| if CVXPY_AVAILABLE: | |
| w = cp.Variable(n) | |
| prob = cp.Problem(cp.Minimize(cp.quad_form(w, cov)), | |
| [cp.sum(w) == 1]) | |
| prob.solve(solver=cp.SCS, verbose=False) | |
| w_opt = np.array(w.value).ravel() | |
| return w_opt | |
| else: | |
| # analytic GMV: invcov * 1 / (1^T invcov 1) | |
| invcov = np.linalg.pinv(cov) | |
| ones = np.ones((n,)) | |
| w = invcov.dot(ones) | |
| w = w / (ones.dot(invcov).dot(ones)) | |
| return w | |
| def mean_variance_opt(returns: np.ndarray, target_return: Optional[float]=None): | |
| R = np.asarray(returns) | |
| mu = np.mean(R, axis=0) | |
| cov = np.cov(R.T) | |
| n = len(mu) | |
| if CVXPY_AVAILABLE: | |
| w = cp.Variable(n) | |
| constraints = [cp.sum(w) == 1] | |
| if target_return is not None: | |
| constraints.append(mu @ w >= target_return) | |
| prob = cp.Problem(cp.Minimize(cp.quad_form(w, cov)), constraints) | |
| prob.solve(solver=cp.SCS, verbose=False) | |
| return np.array(w.value).ravel() | |
| else: | |
| # solve using analytical formula for target_return or GMV fallback | |
| if target_return is None: | |
| return PortfolioOptimization.gmv_weights(R) | |
| invcov = np.linalg.pinv(cov) | |
| ones = np.ones(n) | |
| A = ones.T.dot(invcov).dot(ones) | |
| B = ones.T.dot(invcov).dot(mu) | |
| C = mu.T.dot(invcov).dot(mu) | |
| denom = A * C - B**2 | |
| lam = (C - target_return * B) / denom | |
| gamma = (target_return * A - B) / denom | |
| w = invcov.dot(lam * ones + gamma * mu) | |
| return w | |
| # --------------------- | |
| # ML for Finance helpers | |
| # --------------------- | |
| class MLForFinance: | |
| def compute_basic_features(price: np.ndarray, mom_window: int=20, vol_window: int=20): | |
| p = np.asarray(price).ravel() | |
| ret = np.concatenate([[0], np.diff(np.log(p + 1e-12))]) | |
| mom = pd.Series(p).pct_change(mom_window).fillna(0).values | |
| rv = pd.Series(ret).rolling(vol_window).std().fillna(method='bfill').values | |
| sma = pd.Series(p).rolling(mom_window).mean().fillna(method='bfill').values | |
| features = np.vstack([ret, mom, rv, sma]).T | |
| return features | |
| def lasso_select(X: np.ndarray, y: np.ndarray): | |
| model = None | |
| try: | |
| from sklearn.linear_model import LassoCV | |
| model = LassoCV(cv=5, n_jobs=1).fit(X, y.ravel()) | |
| coef = model.coef_ | |
| selected = list(np.where(np.abs(coef) > 1e-6)[0]) | |
| return {'coef': coef.tolist(), 'selected': selected, 'alpha': float(model.alpha_)} | |
| except Exception as e: | |
| logger.warning(f"LASSO selection failed: {e}") | |
| return None | |
| # --------------------- | |
| # LLM interface (detailed prompt + structured JSON output) | |
| # --------------------- | |
| class LLMInterface: | |
| def __init__(self, model_name: str = None, hf_token: Optional[str] = None): | |
| self.model_name = model_name or config.hf_default_model | |
| self.api_url = f"https://api-inference.huggingface.co/models/{self.model_name}" | |
| self.hf_token = hf_token or config.hf_token | |
| def _call_api(self, prompt: str, max_length: int = 700) -> str: | |
| headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {"Content-Type": "application/json"} | |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": max_length, "temperature": 0.2}} | |
| try: | |
| r = requests.post(self.api_url, headers=headers, json=payload, timeout=40) | |
| if r.status_code == 200: | |
| res = r.json() | |
| if isinstance(res, list) and isinstance(res[0], dict): | |
| return res[0].get("generated_text", str(res[0])) | |
| return str(res) | |
| else: | |
| return f"API_ERROR_{r.status_code}: {r.text[:200]}" | |
| except Exception as e: | |
| return f"API_EXCEPTION: {e}" | |
| def generate_structured_strategy(self, | |
| analysis: Dict[str, Any], | |
| market_snapshot: str, | |
| requirements: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Produce structured JSON with keys: | |
| - strategy_summary | |
| - signals (list of rules) | |
| - risk_management | |
| - pseudocode (string) | |
| - backtest_guidance | |
| """ | |
| instr = ( | |
| "You are a quantitative researcher writing a concise Quant Research Note. " | |
| "Produce structured JSON only, with keys: strategy_summary, signals, risk_management, pseudocode, backtest_guidance, notes.\n\n" | |
| "Requirements: " | |
| f"{json.dumps(requirements)}\n\n" | |
| "Analysis (numerical results):\n" | |
| f"{json.dumps(analysis, indent=2, ensure_ascii=False)[:4000]}\n\n" | |
| "Market snapshot:\n" | |
| f"{market_snapshot[:2000]}\n\n" | |
| "Be specific: signals should include exact mathematical conditions (e.g. vrp > vrp_sma_short AND rsi < 30). " | |
| "Pseudocode should include function signatures: compute_features(data), generate_signal(features), risk_manage(position), execute(signal). " | |
| "Backtest guidance should specify data frequency, in-sample/out-of-sample split, sample length, and slippage/commission assumptions. " | |
| "Keep outputs compact but precise." | |
| ) | |
| raw = self._call_api(instr, max_length=800) | |
| # Try to parse JSON from raw; if fails, fallback to heuristics | |
| try: | |
| # sometimes HF returns text with JSON in it — try to extract first JSON object | |
| start = raw.find("{") | |
| end = raw.rfind("}") | |
| if start != -1 and end != -1: | |
| candidate = raw[start:end+1] | |
| data = json.loads(candidate) | |
| return data | |
| except Exception as e: | |
| logger.warning(f"LLM did not return pure JSON: {e}") | |
| # fallback: craft deterministic template using analysis and requirements | |
| fallback = { | |
| "strategy_summary": "Fallback strategy: VRP mean-reversion with momentum filter.", | |
| "signals": [ | |
| "entry: vrp < vrp_sma_short and momentum > 0.5", | |
| "exit: vrp > vrp_sma_long or price crosses stop loss" | |
| ], | |
| "risk_management": "max position risk 0.5% NAV; use stop-loss and time-based exit", | |
| "pseudocode": ( | |
| "def compute_features(data):\n" | |
| " features = {...} # vrp, sma, momentum\n" | |
| "def generate_signal(features):\n" | |
| " if features['vrp'] < features['vrp_sma_short'] and features['mom'] > 0:\n" | |
| " return 1\n" | |
| " return 0\n" | |
| "def risk_manage(pos):\n" | |
| " # apply stop loss / position sizing\n" | |
| ), | |
| "backtest_guidance": "Use 1-minute bars, in-sample 2 years, OOS 6 months, slippage 0.02%, commission 0.0005 per trade", | |
| "notes": "LLM API failed or returned non-JSON; this is a deterministic fallback." | |
| } | |
| return fallback | |
| # --------------------- | |
| # Integrative Trainer / Platform | |
| # --------------------- | |
| class QuantPlatform: | |
| def __init__(self): | |
| self.fiber = FiberBundleTheory() | |
| self.noise = NoiseExplorer() | |
| self.trainer_ml = None | |
| self.llm = LLMInterface() | |
| self.current_data = None | |
| self.analysis_results = {} | |
| # Data ingestion & basic analysis | |
| def upload_and_analyze(self, file): | |
| if file is None: | |
| return "请上传 CSV / Excel 文件", None, None | |
| fname = file.name | |
| try: | |
| if fname.endswith('.csv'): | |
| df = pd.read_csv(fname) | |
| else: | |
| df = pd.read_excel(fname) | |
| except Exception as e: | |
| return f"读取失败: {e}", None, None | |
| self.current_data = df | |
| numeric = df.select_dtypes(include=[np.number]).columns.tolist() | |
| summary = f"Rows: {len(df)}, Cols: {len(df.columns)}, Numeric: {numeric}" | |
| # noise exploration (first two numeric columns) | |
| try: | |
| noise_res = self.noise.explore(df) | |
| noise_summary = f"VRP mean {noise_res['vrp_mean']:.6f}, vrp std {noise_res['vrp_std']:.6f}, resid ac1 {noise_res['resid_stats']['ac1']:.4f}" | |
| except Exception as e: | |
| noise_summary = f"噪声分析失败: {e}" | |
| noise_res = None | |
| # garch quick fit on first numeric column returns (if plausible) | |
| garch_summary = "GARCH not run" | |
| if numeric: | |
| series = df[numeric[0]].pct_change().dropna().values | |
| if len(series) > 30: | |
| try: | |
| garch_res = Econometrics.garch_11_fit(series) | |
| garch_summary = f"GARCH method: {garch_res.get('method','?')}, params keys: {list(garch_res.get('params',{}).keys()) if 'params' in garch_res else 'n/a'}" | |
| except Exception as e: | |
| garch_summary = f"GARCH失败: {e}" | |
| self.analysis_results = {'noise': noise_res, 'garch': garch_summary} | |
| return summary, noise_summary, garch_summary | |
| # Pricing / PDE / MC wrappers | |
| def price_bs_cn(self, S, K, r, q, sigma, T, Smax_mult=3.0, M=400, N=400, option_type='call'): | |
| try: | |
| p = NumericalMethods.bs_crank_nicolson(float(S), float(K), float(r), float(q), float(sigma), float(T), | |
| Smax_mult=float(Smax_mult), M=int(M), N=int(N), option_type=option_type) | |
| return f"Crank–Nicolson price: {p:.6f}" | |
| except Exception as e: | |
| return f"PDE pricing failed: {e}" | |
| def price_bs_mc(self, S, K, r, q, sigma, T, option_type='call', n_paths=20000, antithetic=True): | |
| try: | |
| p = NumericalMethods.mc_price_bs_cv(float(S), float(K), float(r), float(q), float(sigma), float(T), | |
| option_type=option_type, n_paths=int(n_paths), antithetic=bool(antithetic)) | |
| return f"MC price (CV): {p:.6f}" | |
| except Exception as e: | |
| return f"MC pricing failed: {e}" | |
| def simulate_heston(self, S0, v0, r, kappa, theta, xi, rho, T, n_steps=252, n_paths=2000): | |
| try: | |
| S, v = StochasticModels.heston_simulate(float(S0), float(v0), float(r), float(kappa), float(theta), float(xi), float(rho), float(T), int(n_steps), int(n_paths)) | |
| # return minimal summary and a small plot (first 3 paths) | |
| fig, ax = plt.subplots() | |
| for i in range(min(3, S.shape[0])): | |
| ax.plot(S[i,:], label=f'path{i}') | |
| ax.set_title("Heston sample paths (first few)") | |
| ax.legend() | |
| return "Heston simulation success", fig | |
| except Exception as e: | |
| return f"Heston simulation failed: {e}", None | |
| # Econometrics wrappers | |
| def garch_fit(self): | |
| if self.current_data is None: | |
| return "请先上传数据" | |
| numeric = self.current_data.select_dtypes(include=[np.number]).columns.tolist() | |
| if not numeric: | |
| return "数据无数值列" | |
| series = self.current_data[numeric[0]].pct_change().dropna().values | |
| if len(series) < 30: | |
| return "样本过短,至少需要30个观测用于GARCH拟合" | |
| try: | |
| res = Econometrics.garch_11_fit(series) | |
| return json.dumps({'method': res.get('method','mle'), 'params': res.get('params') if 'params' in res else 'omega/alpha/beta', 'cond_var_mean': float(np.mean(res.get('cond_var',[])) if res.get('cond_var') else np.nan)}, indent=2) | |
| except Exception as e: | |
| return f"GARCH拟合失败: {e}" | |
| def johansen(self): | |
| if self.current_data is None: | |
| return "请先上传数据" | |
| data = self.current_data.select_dtypes(include=[np.number]).dropna().values | |
| if data.shape[0] < 50 or data.shape[1] < 2: | |
| return "数据不足以做 Johansen 协整检验(至少 50 行,2 列)" | |
| try: | |
| res = Econometrics.johansen_test(data) | |
| if res is None: | |
| return "Johansen 不可用(statsmodels 未安装或出错)" | |
| return json.dumps({'eig_top5': res['eig'][:5], 'lr1_top5': res['lr1'][:5]}, indent=2) | |
| except Exception as e: | |
| return f"Johansen 失败: {e}" | |
| # Portfolio & Risk | |
| def compute_gmv(self): | |
| if self.current_data is None: | |
| return "请先上传数据" | |
| df = self.current_data.select_dtypes(include=[np.number]).dropna() | |
| if df.shape[0] < 10 or df.shape[1] < 1: | |
| return "数据不足" | |
| returns = df.pct_change().dropna().values | |
| w = PortfolioOptimization.gmv_weights(returns) | |
| return f"GMV weights (len {len(w)}): {np.round(w,4).tolist()}" | |
| def mean_var_opt(self, target_return: Optional[float]=None): | |
| if self.current_data is None: | |
| return "请先上传数据" | |
| df = self.current_data.select_dtypes(include=[np.number]).dropna() | |
| returns = df.pct_change().dropna().values | |
| try: | |
| w = PortfolioOptimization.mean_variance_opt(returns, target_return=float(target_return) if target_return is not None else None) | |
| return f"Optimized weights (len {len(w)}): {np.round(w,4).tolist()}" | |
| except Exception as e: | |
| return f"Mean-Variance optimization failed: {e}" | |
| # ML | |
| def lasso_select(self): | |
| if self.current_data is None: | |
| return "请先上传数据" | |
| df = self.current_data.select_dtypes(include=[np.number]).dropna() | |
| if df.shape[1] < 2 or df.shape[0] < 30: | |
| return "数据不足以做 LASSO" | |
| y = df.iloc[:,0].pct_change().dropna().values | |
| X = df.iloc[:,1:].pct_change().dropna().values | |
| # align lengths | |
| minlen = min(len(y), len(X)) | |
| if minlen <= 10: | |
| return "数据对齐后样本太短" | |
| y = y[-minlen:] | |
| X = X[-minlen:] | |
| res = MLForFinance.lasso_select(X, y) | |
| if res is None: | |
| return "LASSO 失败" | |
| return f"Selected indices: {res['selected']}, alpha: {res['alpha']:.6g}" | |
| # LLM strategy (structured) | |
| def generate_strategy(self, user_prompt: str, intraday: bool=True, model_name: Optional[str]=None) -> str: | |
| if self.current_data is None: | |
| return json.dumps({'error': '请先上传数据'}, ensure_ascii=False) | |
| # Build analysis dict | |
| analysis = {} | |
| if self.analysis_results.get('noise'): | |
| analysis['noise'] = self.analysis_results['noise'] | |
| # GARCH cond var mean if available | |
| try: | |
| g = self.garch_fit() | |
| analysis['garch_summary'] = json.loads(g) if g and g.startswith("{") else g | |
| except Exception: | |
| analysis['garch_summary'] = "GARCH无法解析" | |
| # market snapshot: last 50 rows numeric describe | |
| do_numeric = self.current_data.select_dtypes(include=[np.number]).tail(50).describe().to_string() | |
| requirements = {'intraday': intraday, 'pseudocode': True, 'user_prompt': user_prompt} | |
| if model_name: | |
| self.llm = LLMInterface(model_name=model_name) | |
| result = self.llm.generate_structured_strategy(analysis, do_numeric, requirements) | |
| # return pretty JSON | |
| return json.dumps(result, ensure_ascii=False, indent=2) | |
| # --------------------- | |
| # Gradio UI | |
| # --------------------- | |
| def create_ui(): | |
| platform = QuantPlatform() | |
| with gr.Blocks(title="Quant Upgraded Platform") as demo: | |
| gr.Markdown("# Quant Upgraded Platform — 高精度/高性能 + 精细化 LLM 策略") | |
| with gr.Tabs(): | |
| with gr.TabItem("📁 数据上传 & 基础分析"): | |
| with gr.Row(): | |
| file_input = gr.File(label="上传 CSV / Excel") | |
| upload_btn = gr.Button("上传并分析") | |
| summary = gr.Textbox(label="数据摘要", lines=2) | |
| noise = gr.Textbox(label="噪声探索摘要", lines=2) | |
| garch = gr.Textbox(label="GARCH 摘要", lines=2) | |
| upload_btn.click(platform.upload_and_analyze, inputs=[file_input], outputs=[summary, noise, garch]) | |
| with gr.TabItem("📊 Pricing / PDE / MC"): | |
| with gr.Row(): | |
| S = gr.Number(value=100.0, label="Spot S") | |
| K = gr.Number(value=100.0, label="Strike K") | |
| r = gr.Number(value=0.01, label="r") | |
| q = gr.Number(value=0.0, label="q") | |
| sigma = gr.Number(value=0.2, label="sigma") | |
| T = gr.Number(value=0.5, label="T (yrs)") | |
| with gr.Row(): | |
| bs_cn_btn = gr.Button("Crank–Nicolson BS PDE 价格") | |
| bs_cn_out = gr.Textbox(label="PDE Price", lines=1) | |
| bs_cn_btn.click(platform.price_bs_cn, inputs=[S,K,r,q,sigma,T, gr.Number(value=3.0), gr.Slider(100,800,value=400), gr.Slider(100,800,value=400), gr.Dropdown(['call','put'], value='call')], outputs=[bs_cn_out]) | |
| with gr.Row(): | |
| mc_btn = gr.Button("Monte Carlo (Antithetic + Control Var)") | |
| mc_out = gr.Textbox(label="MC Price (CV)", lines=1) | |
| mc_btn.click(platform.price_bs_mc, inputs=[S,K,r,q,sigma,T, gr.Dropdown(['call','put'], value='call'), gr.Number(value=config.mc_default_paths), gr.Checkbox(value=True, label="Antithetic")], outputs=[mc_out]) | |
| with gr.TabItem("🔢 Econometrics"): | |
| garch_btn = gr.Button("GARCH(1,1) 拟合") | |
| garch_out = gr.Textbox(label="GARCH 结果", lines=8) | |
| garch_btn.click(platform.garch_fit, inputs=None, outputs=[garch_out]) | |
| joh_btn = gr.Button("Johansen 协整检验") | |
| joh_out = gr.Textbox(label="Johansen 结果", lines=6) | |
| joh_btn.click(platform.johansen, inputs=None, outputs=[joh_out]) | |
| with gr.TabItem("📈 Portfolio & Risk"): | |
| gmv_btn = gr.Button("计算 GMV 权重") | |
| gmv_out = gr.Textbox(label="GMV 权重", lines=3) | |
| gmv_btn.click(platform.compute_gmv, inputs=None, outputs=[gmv_out]) | |
| mv_btn = gr.Button("均值-方差 优化 (可选目标收益)") | |
| target = gr.Number(label="目标收益 (可空)", value=None) | |
| mv_out = gr.Textbox(label="MV 结果", lines=3) | |
| mv_btn.click(platform.mean_var_opt, inputs=[target], outputs=[mv_out]) | |
| with gr.TabItem("🤖 LLM 策略生成 (结构化)"): | |
| user_q = gr.Textbox(label="你的问题(策略 / 日内 / 回测)", lines=3, value="基于当前数据,给出日内量化策略并生成伪代码") | |
| intraday = gr.Checkbox(label="日内策略", value=True) | |
| model_sel = gr.Dropdown(label="LLM 模型 (若无Token或模型不可用会回退)", choices=[config.hf_default_model], value=config.hf_default_model) | |
| strat_out = gr.Textbox(label="结构化策略输出 (JSON)", lines=20) | |
| strat_btn = gr.Button("生成策略") | |
| strat_btn.click(platform.generate_strategy, inputs=[user_q, intraday, model_sel], outputs=[strat_out]) | |
| with gr.TabItem("🔬 Dynamics & Geometry (原有)"): | |
| noise_btn = gr.Button("运行噪声探索") | |
| noise_text = gr.Textbox(label="Noise summary", lines=3) | |
| def run_noise(): | |
| if platform.current_data is None: | |
| return "请先上传数据" | |
| res = platform.noise.explore(platform.current_data) | |
| return f"VRP mean {res['vrp_mean']:.6f}, resid ac1 {res['resid_stats']['ac1']:.4f}" | |
| noise_btn.click(run_noise, inputs=None, outputs=[noise_text]) | |
| sim_vix2 = gr.Number(value=1.0, label="start VIX^2") | |
| sim_rv = gr.Number(value=0.8, label="start RV") | |
| T_sim = gr.Number(value=1.0, label="T") | |
| dt_sim = gr.Number(value=0.01, label="dt") | |
| sim_btn = gr.Button("模拟梯度动力学") | |
| sim_out = gr.Plot(label="Dynamics path") | |
| def run_sim(vix2, rv, T, dt): | |
| # lightweight simulate using gradient dynamics (reuse earlier pattern) | |
| gradient = GradientDynamicsLite() | |
| path = gradient.simulate_flow([vix2, rv], T=float(T), dt=float(dt)) | |
| fig, ax = plt.subplots() | |
| ax.plot(path[:,0], label='VIX^2') | |
| ax.plot(path[:,1], label='RV') | |
| ax.legend() | |
| ax.set_title("Gradient dynamics (VIX^2 & RV)") | |
| return fig | |
| sim_btn.click(run_sim, inputs=[sim_vix2, sim_rv, T_sim, dt_sim], outputs=[sim_out]) | |
| gr.Markdown("注:本系统为研究用途,不构成投资建议。部分功能依赖外部库(statsmodels, arch, cvxpy)。") | |
| return demo | |
| # --------------------- | |
| # Small helper: GradientDynamicsLite (used only in UI simulation) | |
| # --------------------- | |
| class GradientDynamicsLite: | |
| def __init__(self, eta=0.5, sigma=0.02): | |
| self.eta = eta | |
| self.sigma = sigma | |
| def U_vrp(self, b): | |
| vix2 = b[...,0] | |
| rv = b[...,1] | |
| vrp = vix2 - rv | |
| return 0.5 * vrp**2 | |
| def grad_U(self, b): | |
| # analytic gradient for U = 0.5*(vix2 - rv)^2 | |
| vix2 = b[0] | |
| rv = b[1] | |
| # dU/dvix2 = (vix2 - rv); dU/drv = -(vix2 - rv) | |
| g = np.array([vix2 - rv, -(vix2 - rv)], dtype=float) | |
| return g | |
| def simulate_flow(self, b0, T=1.0, dt=0.01, seed=None): | |
| if seed is not None: | |
| np.random.seed(seed) | |
| n_steps = int(T / dt) | |
| path = np.zeros((n_steps+1, 2)) | |
| path[0] = np.array(b0, dtype=float) | |
| for i in range(n_steps): | |
| bcur = path[i] | |
| grad = self.grad_U(bcur) | |
| db_det = - self.eta * grad | |
| db_stoch = self.sigma * np.sqrt(dt) * np.random.randn(2) | |
| path[i+1] = bcur + db_det * dt + db_stoch | |
| return path | |
| # --------------------- | |
| # Entrypoint | |
| # --------------------- | |
| if __name__ == "__main__": | |
| app = create_ui() | |
| # Launch locally | |
| app.launch(server_name="0.0.0.0", server_port=7860, share=False) | |