Spaces:
Configuration error
Configuration error
| """ | |
| 投资组合优化器 | |
| Portfolio Optimization Engine (FIN 555, FIN 557) | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import cvxpy as cp | |
| from typing import Dict, List, Optional, Tuple, Union, Callable | |
| from scipy.optimize import minimize, differential_evolution | |
| from scipy import linalg | |
| from sklearn.covariance import LedoitWolf, EmpiricalCovariance, OAS | |
| from sklearn.preprocessing import StandardScaler | |
| import logging | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class PortfolioOptimizer: | |
| """投资组合优化器主类""" | |
| def __init__(self, risk_aversion: float = 1.0, transaction_cost: float = 0.001): | |
| self.risk_aversion = risk_aversion | |
| self.transaction_cost = transaction_cost | |
| self.logger = logging.getLogger(__name__) | |
| # 优化历史 | |
| self.optimization_history = [] | |
| self.current_weights = None | |
| # 协方差估计器 | |
| self.covariance_estimators = { | |
| 'sample': EmpiricalCovariance(), | |
| 'ledoit_wolf': LedoitWolf(), | |
| 'oas': OAS() | |
| } | |
| def mean_variance_optimization(self, expected_returns: np.ndarray, | |
| cov_matrix: np.ndarray, | |
| constraints: Dict = None, | |
| solver: str = 'ECOS') -> Dict[str, any]: | |
| """ | |
| 均值-方差投资组合优化 | |
| Args: | |
| expected_returns: 预期收益向量 | |
| cov_matrix: 协方差矩阵 | |
| constraints: 约束条件字典 | |
| solver: CVXPY求解器 | |
| Returns: | |
| Dict: 优化结果 | |
| """ | |
| n_assets = len(expected_returns) | |
| # 定义优化变量 | |
| weights = cp.Variable(n_assets) | |
| # 目标函数:最大化效用 = 预期收益 - 风险厌恶系数 * 方差 | |
| portfolio_return = expected_returns @ weights | |
| portfolio_risk = cp.quad_form(weights, cov_matrix) | |
| utility = portfolio_return - self.risk_aversion * portfolio_risk | |
| objective = cp.Maximize(utility) | |
| # 基本约束 | |
| base_constraints = [ | |
| cp.sum(weights) == 1, # 权重和为1 | |
| weights >= 0 # 不允许卖空(长仓限制) | |
| ] | |
| # 添加自定义约束 | |
| all_constraints = base_constraints | |
| if constraints: | |
| all_constraints.extend(self._build_constraints(weights, constraints)) | |
| # 构建并求解问题 | |
| problem = cp.Problem(objective, all_constraints) | |
| try: | |
| problem.solve(solver=solver, verbose=False) | |
| if problem.status == cp.OPTIMAL: | |
| optimal_weights = weights.value | |
| # 计算组合统计 | |
| portfolio_stats = self._calculate_portfolio_stats( | |
| optimal_weights, expected_returns, cov_matrix | |
| ) | |
| result = { | |
| 'status': 'success', | |
| 'weights': optimal_weights, | |
| 'expected_return': portfolio_stats['return'], | |
| 'volatility': portfolio_stats['volatility'], | |
| 'sharpe_ratio': portfolio_stats['sharpe_ratio'], | |
| 'utility': utility.value, | |
| 'solver_status': problem.status | |
| } | |
| self.current_weights = optimal_weights | |
| self.optimization_history.append(result) | |
| return result | |
| else: | |
| return { | |
| 'status': 'failed', | |
| 'solver_status': problem.status, | |
| 'weights': None | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Optimization failed: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'weights': None | |
| } | |
| def black_litterman_optimization(self, market_caps: np.ndarray, | |
| cov_matrix: np.ndarray, | |
| views: Dict = None, | |
| risk_aversion: Optional[float] = None) -> Dict[str, any]: | |
| """ | |
| Black-Litterman模型优化 | |
| Args: | |
| market_caps: 市场价值权重 | |
| cov_matrix: 协方差矩阵 | |
| views: 投资者观点 {'P': 选择矩阵, 'Q': 观点收益, 'Omega': 观点不确定性} | |
| risk_aversion: 风险厌恶系数 | |
| Returns: | |
| Dict: BL优化结果 | |
| """ | |
| if risk_aversion is None: | |
| risk_aversion = self.risk_aversion | |
| n_assets = len(market_caps) | |
| # 步骤1: 计算隐含平衡收益 | |
| market_weights = market_caps / np.sum(market_caps) | |
| implied_returns = risk_aversion * np.dot(cov_matrix, market_weights) | |
| # 步骤2: 整合投资者观点 | |
| if views and all(key in views for key in ['P', 'Q', 'Omega']): | |
| P = views['P'] # 选择矩阵 | |
| Q = views['Q'] # 观点收益向量 | |
| Omega = views['Omega'] # 观点不确定性矩阵 | |
| # 计算tau (通常设为较小值) | |
| tau = 0.025 | |
| # Black-Litterman公式 | |
| M1 = linalg.inv(tau * cov_matrix) | |
| M2 = np.dot(P.T, np.dot(linalg.inv(Omega), P)) | |
| M3 = np.dot(linalg.inv(tau * cov_matrix), implied_returns) | |
| M4 = np.dot(P.T, np.dot(linalg.inv(Omega), Q)) | |
| # 新的预期收益 | |
| bl_returns = np.dot(linalg.inv(M1 + M2), M3 + M4) | |
| # 新的协方差矩阵 | |
| bl_cov = linalg.inv(M1 + M2) | |
| else: | |
| # 如果没有观点,使用隐含收益 | |
| bl_returns = implied_returns | |
| bl_cov = cov_matrix | |
| # 步骤3: 均值-方差优化 | |
| result = self.mean_variance_optimization(bl_returns, bl_cov) | |
| if result['status'] == 'success': | |
| result.update({ | |
| 'type': 'black_litterman', | |
| 'implied_returns': implied_returns, | |
| 'bl_returns': bl_returns, | |
| 'market_weights': market_weights | |
| }) | |
| return result | |
| def risk_parity_optimization(self, cov_matrix: np.ndarray, | |
| target_risk_contributions: Optional[np.ndarray] = None) -> Dict[str, any]: | |
| """ | |
| 风险平价投资组合优化 | |
| Args: | |
| cov_matrix: 协方差矩阵 | |
| target_risk_contributions: 目标风险贡献(如果为None则等权重风险) | |
| Returns: | |
| Dict: 风险平价优化结果 | |
| """ | |
| n_assets = cov_matrix.shape[0] | |
| if target_risk_contributions is None: | |
| target_risk_contributions = np.ones(n_assets) / n_assets | |
| def risk_parity_objective(weights): | |
| """风险平价目标函数""" | |
| portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) | |
| # 计算每个资产的风险贡献 | |
| marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol | |
| risk_contrib = weights * marginal_contrib / portfolio_vol | |
| # 目标:最小化实际风险贡献与目标风险贡献的差异 | |
| return np.sum((risk_contrib - target_risk_contributions) ** 2) | |
| # 约束条件 | |
| constraints = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, # 权重和为1 | |
| ] | |
| bounds = [(0.001, 0.5) for _ in range(n_assets)] # 每个权重的界限 | |
| # 初始化权重 | |
| initial_weights = np.ones(n_assets) / n_assets | |
| try: | |
| result = minimize( | |
| risk_parity_objective, | |
| initial_weights, | |
| method='SLSQP', | |
| bounds=bounds, | |
| constraints=constraints, | |
| options={'ftol': 1e-9, 'disp': False} | |
| ) | |
| if result.success: | |
| optimal_weights = result.x | |
| # 计算风险贡献 | |
| portfolio_vol = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) | |
| marginal_contrib = np.dot(cov_matrix, optimal_weights) / portfolio_vol | |
| risk_contrib = optimal_weights * marginal_contrib / portfolio_vol | |
| return { | |
| 'status': 'success', | |
| 'weights': optimal_weights, | |
| 'risk_contributions': risk_contrib, | |
| 'target_risk_contributions': target_risk_contributions, | |
| 'portfolio_volatility': portfolio_vol, | |
| 'optimization_result': result | |
| } | |
| else: | |
| return { | |
| 'status': 'failed', | |
| 'error': 'Optimization did not converge', | |
| 'weights': None | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Risk parity optimization failed: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'weights': None | |
| } | |
| def max_diversification_optimization(self, expected_returns: np.ndarray, | |
| cov_matrix: np.ndarray) -> Dict[str, any]: | |
| """ | |
| 最大分散化投资组合优化 | |
| Args: | |
| expected_returns: 预期收益 | |
| cov_matrix: 协方差矩阵 | |
| Returns: | |
| Dict: 最大分散化优化结果 | |
| """ | |
| n_assets = len(expected_returns) | |
| # 定义优化变量 | |
| weights = cp.Variable(n_assets) | |
| # 个股波动率 | |
| individual_vols = np.sqrt(np.diag(cov_matrix)) | |
| # 分散化比率 = 加权平均个股波动率 / 组合波动率 | |
| # 最大化分散化比率等价于最小化组合波动率/加权平均个股波动率 | |
| weighted_avg_vol = individual_vols @ weights | |
| portfolio_vol = cp.norm(cp.sqrt(cov_matrix) @ weights, 2) | |
| # 目标函数:最小化 portfolio_vol / weighted_avg_vol | |
| # 等价于最小化 portfolio_vol^2 / weighted_avg_vol (避免除法) | |
| objective = cp.Minimize(cp.square(portfolio_vol)) | |
| # 约束条件 | |
| constraints = [ | |
| cp.sum(weights) == 1, # 权重和为1 | |
| weights >= 0, # 长仓限制 | |
| weighted_avg_vol == 1 # 标准化约束 | |
| ] | |
| problem = cp.Problem(objective, constraints) | |
| try: | |
| problem.solve(solver='ECOS', verbose=False) | |
| if problem.status == cp.OPTIMAL: | |
| optimal_weights = weights.value | |
| # 计算分散化比率 | |
| portfolio_vol_val = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) | |
| weighted_avg_vol_val = np.dot(individual_vols, optimal_weights) | |
| diversification_ratio = weighted_avg_vol_val / portfolio_vol_val | |
| return { | |
| 'status': 'success', | |
| 'weights': optimal_weights, | |
| 'diversification_ratio': diversification_ratio, | |
| 'portfolio_volatility': portfolio_vol_val, | |
| 'weighted_avg_volatility': weighted_avg_vol_val | |
| } | |
| else: | |
| return { | |
| 'status': 'failed', | |
| 'solver_status': problem.status, | |
| 'weights': None | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Max diversification optimization failed: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'weights': None | |
| } | |
| def robust_optimization(self, expected_returns: np.ndarray, | |
| cov_matrix: np.ndarray, | |
| uncertainty_set: str = 'ellipsoidal', | |
| kappa: float = 0.1) -> Dict[str, any]: | |
| """ | |
| 鲁棒投资组合优化 | |
| Args: | |
| expected_returns: 预期收益 | |
| cov_matrix: 协方差矩阵 | |
| uncertainty_set: 不确定性集合类型 ('ellipsoidal', 'box', 'polyhedral') | |
| kappa: 鲁棒性参数 | |
| Returns: | |
| Dict: 鲁棒优化结果 | |
| """ | |
| n_assets = len(expected_returns) | |
| # 定义优化变量 | |
| weights = cp.Variable(n_assets) | |
| if uncertainty_set == 'ellipsoidal': | |
| # 椭球不确定性集合 | |
| # min w^T μ - κ * ||Σ^(1/2) w||_2 - λ/2 * w^T Σ w | |
| sqrt_cov = linalg.sqrtm(cov_matrix).real | |
| uncertainty_penalty = cp.norm(sqrt_cov @ weights, 2) | |
| portfolio_return = expected_returns @ weights | |
| portfolio_risk = cp.quad_form(weights, cov_matrix) | |
| robust_utility = portfolio_return - kappa * uncertainty_penalty - self.risk_aversion * portfolio_risk | |
| objective = cp.Maximize(robust_utility) | |
| elif uncertainty_set == 'box': | |
| # 盒式不确定性集合 | |
| # 假设收益率在 [μ - κ*σ, μ + κ*σ] 范围内 | |
| return_std = np.sqrt(np.diag(cov_matrix)) | |
| worst_case_returns = expected_returns - kappa * return_std | |
| portfolio_return = worst_case_returns @ weights | |
| portfolio_risk = cp.quad_form(weights, cov_matrix) | |
| robust_utility = portfolio_return - self.risk_aversion * portfolio_risk | |
| objective = cp.Maximize(robust_utility) | |
| else: | |
| raise ValueError(f"Unsupported uncertainty set: {uncertainty_set}") | |
| # 约束条件 | |
| constraints = [ | |
| cp.sum(weights) == 1, | |
| weights >= 0 | |
| ] | |
| problem = cp.Problem(objective, constraints) | |
| try: | |
| problem.solve(solver='ECOS', verbose=False) | |
| if problem.status == cp.OPTIMAL: | |
| optimal_weights = weights.value | |
| return { | |
| 'status': 'success', | |
| 'weights': optimal_weights, | |
| 'uncertainty_set': uncertainty_set, | |
| 'kappa': kappa, | |
| 'robust_utility': robust_utility.value | |
| } | |
| else: | |
| return { | |
| 'status': 'failed', | |
| 'solver_status': problem.status, | |
| 'weights': None | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Robust optimization failed: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'weights': None | |
| } | |
| def multi_objective_optimization(self, expected_returns: np.ndarray, | |
| cov_matrix: np.ndarray, | |
| objectives: List[str] = ['return', 'risk', 'diversification'], | |
| weights_objectives: Optional[List[float]] = None) -> Dict[str, any]: | |
| """ | |
| 多目标投资组合优化 | |
| Args: | |
| expected_returns: 预期收益 | |
| cov_matrix: 协方差矩阵 | |
| objectives: 目标函数列表 | |
| weights_objectives: 目标函数权重 | |
| Returns: | |
| Dict: 多目标优化结果 | |
| """ | |
| n_assets = len(expected_returns) | |
| if weights_objectives is None: | |
| weights_objectives = [1.0 / len(objectives)] * len(objectives) | |
| def multi_objective_function(portfolio_weights): | |
| """多目标函数""" | |
| total_objective = 0.0 | |
| for i, obj_type in enumerate(objectives): | |
| if obj_type == 'return': | |
| obj_value = -np.dot(expected_returns, portfolio_weights) # 负号因为要最大化 | |
| elif obj_type == 'risk': | |
| obj_value = np.sqrt(np.dot(portfolio_weights, np.dot(cov_matrix, portfolio_weights))) | |
| elif obj_type == 'diversification': | |
| # 最小化赫芬达尔指数(权重平方和) | |
| obj_value = np.sum(portfolio_weights ** 2) | |
| elif obj_type == 'drawdown': | |
| # 这里简化处理,实际需要历史数据 | |
| obj_value = np.sum(portfolio_weights ** 2) # 占位符 | |
| else: | |
| obj_value = 0.0 | |
| total_objective += weights_objectives[i] * obj_value | |
| return total_objective | |
| # 约束条件 | |
| constraints = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| ] | |
| bounds = [(0.0, 1.0) for _ in range(n_assets)] | |
| initial_weights = np.ones(n_assets) / n_assets | |
| try: | |
| # 使用差分进化算法求解多目标优化 | |
| result = differential_evolution( | |
| multi_objective_function, | |
| bounds, | |
| constraints=constraints, | |
| seed=42, | |
| maxiter=1000, | |
| atol=1e-6 | |
| ) | |
| if result.success: | |
| optimal_weights = result.x | |
| # 计算各个目标的值 | |
| objective_values = {} | |
| for obj_type in objectives: | |
| if obj_type == 'return': | |
| objective_values[obj_type] = np.dot(expected_returns, optimal_weights) | |
| elif obj_type == 'risk': | |
| objective_values[obj_type] = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) | |
| elif obj_type == 'diversification': | |
| objective_values[obj_type] = 1.0 / np.sum(optimal_weights ** 2) # 有效资产数 | |
| return { | |
| 'status': 'success', | |
| 'weights': optimal_weights, | |
| 'objectives': objectives, | |
| 'objective_values': objective_values, | |
| 'weights_objectives': weights_objectives | |
| } | |
| else: | |
| return { | |
| 'status': 'failed', | |
| 'error': 'Multi-objective optimization did not converge', | |
| 'weights': None | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Multi-objective optimization failed: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'weights': None | |
| } | |
| def _build_constraints(self, weights: cp.Variable, constraints: Dict) -> List: | |
| """构建约束条件""" | |
| constraint_list = [] | |
| if 'max_weight' in constraints: | |
| max_weight = constraints['max_weight'] | |
| constraint_list.append(weights <= max_weight) | |
| if 'min_weight' in constraints: | |
| min_weight = constraints['min_weight'] | |
| constraint_list.append(weights >= min_weight) | |
| if 'group_constraints' in constraints: | |
| for group_constraint in constraints['group_constraints']: | |
| indices = group_constraint['indices'] | |
| max_weight = group_constraint.get('max_weight', 1.0) | |
| min_weight = group_constraint.get('min_weight', 0.0) | |
| group_weight = cp.sum([weights[i] for i in indices]) | |
| constraint_list.append(group_weight <= max_weight) | |
| constraint_list.append(group_weight >= min_weight) | |
| if 'turnover' in constraints and self.current_weights is not None: | |
| max_turnover = constraints['turnover'] | |
| turnover = cp.norm(weights - self.current_weights, 1) | |
| constraint_list.append(turnover <= max_turnover) | |
| return constraint_list | |
| def _calculate_portfolio_stats(self, weights: np.ndarray, | |
| expected_returns: np.ndarray, | |
| cov_matrix: np.ndarray, | |
| risk_free_rate: float = 0.02) -> Dict[str, float]: | |
| """计算投资组合统计信息""" | |
| portfolio_return = np.dot(weights, expected_returns) | |
| portfolio_variance = np.dot(weights, np.dot(cov_matrix, weights)) | |
| portfolio_volatility = np.sqrt(portfolio_variance) | |
| sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility if portfolio_volatility > 0 else 0 | |
| return { | |
| 'return': portfolio_return, | |
| 'volatility': portfolio_volatility, | |
| 'variance': portfolio_variance, | |
| 'sharpe_ratio': sharpe_ratio | |
| } | |
| def estimate_covariance_matrix(self, returns: pd.DataFrame, method: str = 'ledoit_wolf') -> np.ndarray: | |
| """估计协方差矩阵""" | |
| if method not in self.covariance_estimators: | |
| raise ValueError(f"Unknown covariance estimation method: {method}") | |
| estimator = self.covariance_estimators[method] | |
| # 清理数据 | |
| returns_clean = returns.dropna() | |
| if len(returns_clean) < 2: | |
| raise ValueError("Insufficient data for covariance estimation") | |
| # 拟合并获取协方差矩阵 | |
| estimator.fit(returns_clean) | |
| cov_matrix = estimator.covariance_ | |
| return cov_matrix | |
| def estimate_expected_returns(self, returns: pd.DataFrame, method: str = 'historical_mean') -> np.ndarray: | |
| """估计预期收益""" | |
| returns_clean = returns.dropna() | |
| if method == 'historical_mean': | |
| expected_returns = returns_clean.mean().values | |
| elif method == 'ewma': | |
| # 指数加权移动平均 | |
| alpha = 0.94 | |
| expected_returns = returns_clean.ewm(alpha=alpha).mean().iloc[-1].values | |
| elif method == 'shrinkage': | |
| # 向整体均值收缩 | |
| historical_mean = returns_clean.mean().values | |
| overall_mean = np.mean(historical_mean) | |
| shrinkage_factor = 0.2 | |
| expected_returns = (1 - shrinkage_factor) * historical_mean + shrinkage_factor * overall_mean | |
| else: | |
| raise ValueError(f"Unknown expected returns method: {method}") | |
| return expected_returns | |
| class TransactionCostModel: | |
| """交易成本模型""" | |
| def __init__(self): | |
| self.cost_components = {} | |
| def add_cost_component(self, name: str, cost_func: Callable): | |
| """添加成本组件""" | |
| self.cost_components[name] = cost_func | |
| def calculate_total_cost(self, current_weights: np.ndarray, | |
| target_weights: np.ndarray, | |
| portfolio_value: float, | |
| prices: Optional[np.ndarray] = None) -> Dict[str, float]: | |
| """计算总交易成本""" | |
| weight_changes = np.abs(target_weights - current_weights) | |
| trade_values = weight_changes * portfolio_value | |
| costs = {} | |
| total_cost = 0.0 | |
| # 固定成本(佣金) | |
| commission_rate = 0.001 # 0.1% | |
| commission_cost = np.sum(trade_values) * commission_rate | |
| costs['commission'] = commission_cost | |
| total_cost += commission_cost | |
| # 买卖价差成本 | |
| bid_ask_spread = 0.002 # 0.2% 平均价差 | |
| spread_cost = np.sum(trade_values) * bid_ask_spread * 0.5 | |
| costs['bid_ask_spread'] = spread_cost | |
| total_cost += spread_cost | |
| # 市场冲击成本 | |
| impact_cost = self._calculate_market_impact(trade_values, prices) | |
| costs['market_impact'] = impact_cost | |
| total_cost += impact_cost | |
| # 时机成本 | |
| timing_cost = np.sum(trade_values) * 0.0005 # 0.05% | |
| costs['timing'] = timing_cost | |
| total_cost += timing_cost | |
| costs['total'] = total_cost | |
| return costs | |
| def _calculate_market_impact(self, trade_values: np.ndarray, | |
| prices: Optional[np.ndarray] = None) -> float: | |
| """计算市场冲击成本""" | |
| # 简化的方根法则:成本与交易量的平方根成正比 | |
| if prices is not None: | |
| volumes = trade_values / prices | |
| else: | |
| volumes = trade_values # 假设价格为1 | |
| # 市场冲击参数 | |
| impact_coefficient = 0.1 | |
| market_impact = impact_coefficient * np.sum(np.sqrt(volumes)) | |
| return market_impact | |
| class DynamicRebalancing: | |
| """动态再平衡策略""" | |
| def __init__(self, rebalance_threshold: float = 0.05, | |
| max_turnover: float = 0.5, | |
| transaction_cost_model: Optional[TransactionCostModel] = None): | |
| self.rebalance_threshold = rebalance_threshold | |
| self.max_turnover = max_turnover | |
| self.transaction_cost_model = transaction_cost_model or TransactionCostModel() | |
| self.rebalance_history = [] | |
| def should_rebalance(self, current_weights: np.ndarray, | |
| target_weights: np.ndarray, | |
| portfolio_value: float) -> Dict[str, any]: | |
| """判断是否需要再平衡""" | |
| weight_deviations = np.abs(current_weights - target_weights) | |
| max_deviation = np.max(weight_deviations) | |
| # 计算预期交易成本 | |
| expected_costs = self.transaction_cost_model.calculate_total_cost( | |
| current_weights, target_weights, portfolio_value | |
| ) | |
| # 成本-收益分析 | |
| rebalance_benefit = self._estimate_rebalance_benefit( | |
| current_weights, target_weights | |
| ) | |
| net_benefit = rebalance_benefit - expected_costs['total'] | |
| decision = { | |
| 'should_rebalance': (max_deviation > self.rebalance_threshold and net_benefit > 0), | |
| 'max_deviation': max_deviation, | |
| 'expected_costs': expected_costs, | |
| 'rebalance_benefit': rebalance_benefit, | |
| 'net_benefit': net_benefit | |
| } | |
| return decision | |
| def execute_rebalance(self, current_weights: np.ndarray, | |
| target_weights: np.ndarray, | |
| portfolio_value: float) -> Dict[str, any]: | |
| """执行再平衡""" | |
| # 考虑交易成本的最优再平衡 | |
| adjusted_weights = self._cost_aware_rebalance( | |
| current_weights, target_weights, portfolio_value | |
| ) | |
| # 计算实际交易成本 | |
| actual_costs = self.transaction_cost_model.calculate_total_cost( | |
| current_weights, adjusted_weights, portfolio_value | |
| ) | |
| # 记录再平衡历史 | |
| rebalance_record = { | |
| 'timestamp': pd.Timestamp.now(), | |
| 'original_weights': current_weights.copy(), | |
| 'target_weights': target_weights.copy(), | |
| 'adjusted_weights': adjusted_weights.copy(), | |
| 'costs': actual_costs, | |
| 'portfolio_value': portfolio_value | |
| } | |
| self.rebalance_history.append(rebalance_record) | |
| return { | |
| 'new_weights': adjusted_weights, | |
| 'costs': actual_costs, | |
| 'record': rebalance_record | |
| } | |
| def _estimate_rebalance_benefit(self, current_weights: np.ndarray, | |
| target_weights: np.ndarray) -> float: | |
| """估计再平衡收益""" | |
| # 简化估计:基于权重偏差的二次损失 | |
| weight_deviations = current_weights - target_weights | |
| rebalance_benefit = 0.5 * np.sum(weight_deviations ** 2) * 100 # 放大系数 | |
| return rebalance_benefit | |
| def _cost_aware_rebalance(self, current_weights: np.ndarray, | |
| target_weights: np.ndarray, | |
| portfolio_value: float) -> np.ndarray: | |
| """考虑成本的再平衡优化""" | |
| n_assets = len(current_weights) | |
| # 定义优化变量 | |
| weights = cp.Variable(n_assets) | |
| # 目标函数:最小化跟踪误差和交易成本 | |
| tracking_error = cp.sum_squares(weights - target_weights) | |
| # 简化的交易成本(线性近似) | |
| trade_amounts = cp.abs(weights - current_weights) * portfolio_value | |
| transaction_costs = cp.sum(trade_amounts) * 0.003 # 0.3% 总成本率 | |
| # 总目标函数 | |
| objective = cp.Minimize(tracking_error + transaction_costs / portfolio_value) | |
| # 约束条件 | |
| constraints = [ | |
| cp.sum(weights) == 1, | |
| weights >= 0, | |
| cp.norm(weights - current_weights, 1) <= self.max_turnover | |
| ] | |
| problem = cp.Problem(objective, constraints) | |
| try: | |
| problem.solve(solver='ECOS', verbose=False) | |
| if problem.status == cp.OPTIMAL: | |
| return weights.value | |
| else: | |
| # 如果优化失败,返回目标权重 | |
| return target_weights | |
| except Exception: | |
| return target_weights | |
| # 使用示例和测试 | |
| if __name__ == "__main__": | |
| print("Testing Portfolio Optimizer...") | |
| # 创建示例数据 | |
| np.random.seed(42) | |
| n_assets = 5 | |
| n_periods = 252 | |
| # 生成随机收益数据 | |
| returns = pd.DataFrame( | |
| np.random.multivariate_normal( | |
| mean=[0.001] * n_assets, | |
| cov=np.eye(n_assets) * 0.0004 + np.ones((n_assets, n_assets)) * 0.0001, | |
| size=n_periods | |
| ), | |
| columns=[f'Asset_{i}' for i in range(n_assets)] | |
| ) | |
| # 初始化优化器 | |
| optimizer = PortfolioOptimizer(risk_aversion=2.0, transaction_cost=0.001) | |
| # 估计预期收益和协方差矩阵 | |
| expected_returns = optimizer.estimate_expected_returns(returns, method='historical_mean') | |
| cov_matrix = optimizer.estimate_covariance_matrix(returns, method='ledoit_wolf') | |
| print(f"Expected returns: {expected_returns}") | |
| print(f"Covariance matrix shape: {cov_matrix.shape}") | |
| # 1. 均值-方差优化 | |
| print("\n1. Mean-Variance Optimization:") | |
| mv_result = optimizer.mean_variance_optimization(expected_returns, cov_matrix) | |
| if mv_result['status'] == 'success': | |
| print(f"Optimal weights: {mv_result['weights']}") | |
| print(f"Expected return: {mv_result['expected_return']:.4f}") | |
| print(f"Volatility: {mv_result['volatility']:.4f}") | |
| print(f"Sharpe ratio: {mv_result['sharpe_ratio']:.4f}") | |
| # 2. 风险平价优化 | |
| print("\n2. Risk Parity Optimization:") | |
| rp_result = optimizer.risk_parity_optimization(cov_matrix) | |
| if rp_result['status'] == 'success': | |
| print(f"Risk parity weights: {rp_result['weights']}") | |
| print(f"Risk contributions: {rp_result['risk_contributions']}") | |
| print(f"Portfolio volatility: {rp_result['portfolio_volatility']:.4f}") | |
| # 3. 最大分散化优化 | |
| print("\n3. Maximum Diversification Optimization:") | |
| md_result = optimizer.max_diversification_optimization(expected_returns, cov_matrix) | |
| if md_result['status'] == 'success': | |
| print(f"Max diversification weights: {md_result['weights']}") | |
| print(f"Diversification ratio: {md_result['diversification_ratio']:.4f}") | |
| # 4. 鲁棒优化 | |
| print("\n4. Robust Optimization:") | |
| robust_result = optimizer.robust_optimization(expected_returns, cov_matrix, kappa=0.1) | |
| if robust_result['status'] == 'success': | |
| print(f"Robust weights: {robust_result['weights']}") | |
| print(f"Robust utility: {robust_result['robust_utility']:.4f}") | |
| # 5. 多目标优化 | |
| print("\n5. Multi-Objective Optimization:") | |
| mo_result = optimizer.multi_objective_optimization( | |
| expected_returns, cov_matrix, | |
| objectives=['return', 'risk', 'diversification'], | |
| weights_objectives=[0.4, 0.3, 0.3] | |
| ) | |
| if mo_result['status'] == 'success': | |
| print(f"Multi-objective weights: {mo_result['weights']}") | |
| print(f"Objective values: {mo_result['objective_values']}") | |
| # 6. 测试动态再平衡 | |
| print("\n6. Dynamic Rebalancing:") | |
| rebalancer = DynamicRebalancing(rebalance_threshold=0.05) | |
| current_weights = np.array([0.3, 0.25, 0.2, 0.15, 0.1]) | |
| target_weights = mv_result['weights'] if mv_result['status'] == 'success' else np.array([0.2, 0.2, 0.2, 0.2, 0.2]) | |
| portfolio_value = 100000 | |
| should_rebalance = rebalancer.should_rebalance(current_weights, target_weights, portfolio_value) | |
| print(f"Should rebalance: {should_rebalance['should_rebalance']}") | |
| print(f"Max deviation: {should_rebalance['max_deviation']:.4f}") | |
| print(f"Expected costs: ${should_rebalance['expected_costs']['total']:.2f}") | |
| if should_rebalance['should_rebalance']: | |
| rebalance_result = rebalancer.execute_rebalance(current_weights, target_weights, portfolio_value) | |
| print(f"New weights: {rebalance_result['new_weights']}") | |
| print(f"Actual costs: ${rebalance_result['costs']['total']:.2f}") | |
| print("\nPortfolio optimization tests completed!") | |