""" 投资组合优化器 Portfolio Optimization Engine (FIN 555, FIN 557) """ import numpy as np import pandas as pd import cvxpy as cp from typing import Dict, List, Optional, Tuple, Union, Callable from scipy.optimize import minimize, differential_evolution from scipy import linalg from sklearn.covariance import LedoitWolf, EmpiricalCovariance, OAS from sklearn.preprocessing import StandardScaler import logging import warnings warnings.filterwarnings('ignore') class PortfolioOptimizer: """投资组合优化器主类""" def __init__(self, risk_aversion: float = 1.0, transaction_cost: float = 0.001): self.risk_aversion = risk_aversion self.transaction_cost = transaction_cost self.logger = logging.getLogger(__name__) # 优化历史 self.optimization_history = [] self.current_weights = None # 协方差估计器 self.covariance_estimators = { 'sample': EmpiricalCovariance(), 'ledoit_wolf': LedoitWolf(), 'oas': OAS() } def mean_variance_optimization(self, expected_returns: np.ndarray, cov_matrix: np.ndarray, constraints: Dict = None, solver: str = 'ECOS') -> Dict[str, any]: """ 均值-方差投资组合优化 Args: expected_returns: 预期收益向量 cov_matrix: 协方差矩阵 constraints: 约束条件字典 solver: CVXPY求解器 Returns: Dict: 优化结果 """ n_assets = len(expected_returns) # 定义优化变量 weights = cp.Variable(n_assets) # 目标函数:最大化效用 = 预期收益 - 风险厌恶系数 * 方差 portfolio_return = expected_returns @ weights portfolio_risk = cp.quad_form(weights, cov_matrix) utility = portfolio_return - self.risk_aversion * portfolio_risk objective = cp.Maximize(utility) # 基本约束 base_constraints = [ cp.sum(weights) == 1, # 权重和为1 weights >= 0 # 不允许卖空(长仓限制) ] # 添加自定义约束 all_constraints = base_constraints if constraints: all_constraints.extend(self._build_constraints(weights, constraints)) # 构建并求解问题 problem = cp.Problem(objective, all_constraints) try: problem.solve(solver=solver, verbose=False) if problem.status == cp.OPTIMAL: optimal_weights = weights.value # 计算组合统计 portfolio_stats = self._calculate_portfolio_stats( optimal_weights, expected_returns, cov_matrix ) result = { 'status': 'success', 'weights': optimal_weights, 'expected_return': portfolio_stats['return'], 'volatility': portfolio_stats['volatility'], 'sharpe_ratio': portfolio_stats['sharpe_ratio'], 'utility': utility.value, 'solver_status': problem.status } self.current_weights = optimal_weights self.optimization_history.append(result) return result else: return { 'status': 'failed', 'solver_status': problem.status, 'weights': None } except Exception as e: self.logger.error(f"Optimization failed: {str(e)}") return { 'status': 'error', 'error': str(e), 'weights': None } def black_litterman_optimization(self, market_caps: np.ndarray, cov_matrix: np.ndarray, views: Dict = None, risk_aversion: Optional[float] = None) -> Dict[str, any]: """ Black-Litterman模型优化 Args: market_caps: 市场价值权重 cov_matrix: 协方差矩阵 views: 投资者观点 {'P': 选择矩阵, 'Q': 观点收益, 'Omega': 观点不确定性} risk_aversion: 风险厌恶系数 Returns: Dict: BL优化结果 """ if risk_aversion is None: risk_aversion = self.risk_aversion n_assets = len(market_caps) # 步骤1: 计算隐含平衡收益 market_weights = market_caps / np.sum(market_caps) implied_returns = risk_aversion * np.dot(cov_matrix, market_weights) # 步骤2: 整合投资者观点 if views and all(key in views for key in ['P', 'Q', 'Omega']): P = views['P'] # 选择矩阵 Q = views['Q'] # 观点收益向量 Omega = views['Omega'] # 观点不确定性矩阵 # 计算tau (通常设为较小值) tau = 0.025 # Black-Litterman公式 M1 = linalg.inv(tau * cov_matrix) M2 = np.dot(P.T, np.dot(linalg.inv(Omega), P)) M3 = np.dot(linalg.inv(tau * cov_matrix), implied_returns) M4 = np.dot(P.T, np.dot(linalg.inv(Omega), Q)) # 新的预期收益 bl_returns = np.dot(linalg.inv(M1 + M2), M3 + M4) # 新的协方差矩阵 bl_cov = linalg.inv(M1 + M2) else: # 如果没有观点,使用隐含收益 bl_returns = implied_returns bl_cov = cov_matrix # 步骤3: 均值-方差优化 result = self.mean_variance_optimization(bl_returns, bl_cov) if result['status'] == 'success': result.update({ 'type': 'black_litterman', 'implied_returns': implied_returns, 'bl_returns': bl_returns, 'market_weights': market_weights }) return result def risk_parity_optimization(self, cov_matrix: np.ndarray, target_risk_contributions: Optional[np.ndarray] = None) -> Dict[str, any]: """ 风险平价投资组合优化 Args: cov_matrix: 协方差矩阵 target_risk_contributions: 目标风险贡献(如果为None则等权重风险) Returns: Dict: 风险平价优化结果 """ n_assets = cov_matrix.shape[0] if target_risk_contributions is None: target_risk_contributions = np.ones(n_assets) / n_assets def risk_parity_objective(weights): """风险平价目标函数""" portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) # 计算每个资产的风险贡献 marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol risk_contrib = weights * marginal_contrib / portfolio_vol # 目标:最小化实际风险贡献与目标风险贡献的差异 return np.sum((risk_contrib - target_risk_contributions) ** 2) # 约束条件 constraints = [ {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, # 权重和为1 ] bounds = [(0.001, 0.5) for _ in range(n_assets)] # 每个权重的界限 # 初始化权重 initial_weights = np.ones(n_assets) / n_assets try: result = minimize( risk_parity_objective, initial_weights, method='SLSQP', bounds=bounds, constraints=constraints, options={'ftol': 1e-9, 'disp': False} ) if result.success: optimal_weights = result.x # 计算风险贡献 portfolio_vol = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) marginal_contrib = np.dot(cov_matrix, optimal_weights) / portfolio_vol risk_contrib = optimal_weights * marginal_contrib / portfolio_vol return { 'status': 'success', 'weights': optimal_weights, 'risk_contributions': risk_contrib, 'target_risk_contributions': target_risk_contributions, 'portfolio_volatility': portfolio_vol, 'optimization_result': result } else: return { 'status': 'failed', 'error': 'Optimization did not converge', 'weights': None } except Exception as e: self.logger.error(f"Risk parity optimization failed: {str(e)}") return { 'status': 'error', 'error': str(e), 'weights': None } def max_diversification_optimization(self, expected_returns: np.ndarray, cov_matrix: np.ndarray) -> Dict[str, any]: """ 最大分散化投资组合优化 Args: expected_returns: 预期收益 cov_matrix: 协方差矩阵 Returns: Dict: 最大分散化优化结果 """ n_assets = len(expected_returns) # 定义优化变量 weights = cp.Variable(n_assets) # 个股波动率 individual_vols = np.sqrt(np.diag(cov_matrix)) # 分散化比率 = 加权平均个股波动率 / 组合波动率 # 最大化分散化比率等价于最小化组合波动率/加权平均个股波动率 weighted_avg_vol = individual_vols @ weights portfolio_vol = cp.norm(cp.sqrt(cov_matrix) @ weights, 2) # 目标函数:最小化 portfolio_vol / weighted_avg_vol # 等价于最小化 portfolio_vol^2 / weighted_avg_vol (避免除法) objective = cp.Minimize(cp.square(portfolio_vol)) # 约束条件 constraints = [ cp.sum(weights) == 1, # 权重和为1 weights >= 0, # 长仓限制 weighted_avg_vol == 1 # 标准化约束 ] problem = cp.Problem(objective, constraints) try: problem.solve(solver='ECOS', verbose=False) if problem.status == cp.OPTIMAL: optimal_weights = weights.value # 计算分散化比率 portfolio_vol_val = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) weighted_avg_vol_val = np.dot(individual_vols, optimal_weights) diversification_ratio = weighted_avg_vol_val / portfolio_vol_val return { 'status': 'success', 'weights': optimal_weights, 'diversification_ratio': diversification_ratio, 'portfolio_volatility': portfolio_vol_val, 'weighted_avg_volatility': weighted_avg_vol_val } else: return { 'status': 'failed', 'solver_status': problem.status, 'weights': None } except Exception as e: self.logger.error(f"Max diversification optimization failed: {str(e)}") return { 'status': 'error', 'error': str(e), 'weights': None } def robust_optimization(self, expected_returns: np.ndarray, cov_matrix: np.ndarray, uncertainty_set: str = 'ellipsoidal', kappa: float = 0.1) -> Dict[str, any]: """ 鲁棒投资组合优化 Args: expected_returns: 预期收益 cov_matrix: 协方差矩阵 uncertainty_set: 不确定性集合类型 ('ellipsoidal', 'box', 'polyhedral') kappa: 鲁棒性参数 Returns: Dict: 鲁棒优化结果 """ n_assets = len(expected_returns) # 定义优化变量 weights = cp.Variable(n_assets) if uncertainty_set == 'ellipsoidal': # 椭球不确定性集合 # min w^T μ - κ * ||Σ^(1/2) w||_2 - λ/2 * w^T Σ w sqrt_cov = linalg.sqrtm(cov_matrix).real uncertainty_penalty = cp.norm(sqrt_cov @ weights, 2) portfolio_return = expected_returns @ weights portfolio_risk = cp.quad_form(weights, cov_matrix) robust_utility = portfolio_return - kappa * uncertainty_penalty - self.risk_aversion * portfolio_risk objective = cp.Maximize(robust_utility) elif uncertainty_set == 'box': # 盒式不确定性集合 # 假设收益率在 [μ - κ*σ, μ + κ*σ] 范围内 return_std = np.sqrt(np.diag(cov_matrix)) worst_case_returns = expected_returns - kappa * return_std portfolio_return = worst_case_returns @ weights portfolio_risk = cp.quad_form(weights, cov_matrix) robust_utility = portfolio_return - self.risk_aversion * portfolio_risk objective = cp.Maximize(robust_utility) else: raise ValueError(f"Unsupported uncertainty set: {uncertainty_set}") # 约束条件 constraints = [ cp.sum(weights) == 1, weights >= 0 ] problem = cp.Problem(objective, constraints) try: problem.solve(solver='ECOS', verbose=False) if problem.status == cp.OPTIMAL: optimal_weights = weights.value return { 'status': 'success', 'weights': optimal_weights, 'uncertainty_set': uncertainty_set, 'kappa': kappa, 'robust_utility': robust_utility.value } else: return { 'status': 'failed', 'solver_status': problem.status, 'weights': None } except Exception as e: self.logger.error(f"Robust optimization failed: {str(e)}") return { 'status': 'error', 'error': str(e), 'weights': None } def multi_objective_optimization(self, expected_returns: np.ndarray, cov_matrix: np.ndarray, objectives: List[str] = ['return', 'risk', 'diversification'], weights_objectives: Optional[List[float]] = None) -> Dict[str, any]: """ 多目标投资组合优化 Args: expected_returns: 预期收益 cov_matrix: 协方差矩阵 objectives: 目标函数列表 weights_objectives: 目标函数权重 Returns: Dict: 多目标优化结果 """ n_assets = len(expected_returns) if weights_objectives is None: weights_objectives = [1.0 / len(objectives)] * len(objectives) def multi_objective_function(portfolio_weights): """多目标函数""" total_objective = 0.0 for i, obj_type in enumerate(objectives): if obj_type == 'return': obj_value = -np.dot(expected_returns, portfolio_weights) # 负号因为要最大化 elif obj_type == 'risk': obj_value = np.sqrt(np.dot(portfolio_weights, np.dot(cov_matrix, portfolio_weights))) elif obj_type == 'diversification': # 最小化赫芬达尔指数(权重平方和) obj_value = np.sum(portfolio_weights ** 2) elif obj_type == 'drawdown': # 这里简化处理,实际需要历史数据 obj_value = np.sum(portfolio_weights ** 2) # 占位符 else: obj_value = 0.0 total_objective += weights_objectives[i] * obj_value return total_objective # 约束条件 constraints = [ {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, ] bounds = [(0.0, 1.0) for _ in range(n_assets)] initial_weights = np.ones(n_assets) / n_assets try: # 使用差分进化算法求解多目标优化 result = differential_evolution( multi_objective_function, bounds, constraints=constraints, seed=42, maxiter=1000, atol=1e-6 ) if result.success: optimal_weights = result.x # 计算各个目标的值 objective_values = {} for obj_type in objectives: if obj_type == 'return': objective_values[obj_type] = np.dot(expected_returns, optimal_weights) elif obj_type == 'risk': objective_values[obj_type] = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights))) elif obj_type == 'diversification': objective_values[obj_type] = 1.0 / np.sum(optimal_weights ** 2) # 有效资产数 return { 'status': 'success', 'weights': optimal_weights, 'objectives': objectives, 'objective_values': objective_values, 'weights_objectives': weights_objectives } else: return { 'status': 'failed', 'error': 'Multi-objective optimization did not converge', 'weights': None } except Exception as e: self.logger.error(f"Multi-objective optimization failed: {str(e)}") return { 'status': 'error', 'error': str(e), 'weights': None } def _build_constraints(self, weights: cp.Variable, constraints: Dict) -> List: """构建约束条件""" constraint_list = [] if 'max_weight' in constraints: max_weight = constraints['max_weight'] constraint_list.append(weights <= max_weight) if 'min_weight' in constraints: min_weight = constraints['min_weight'] constraint_list.append(weights >= min_weight) if 'group_constraints' in constraints: for group_constraint in constraints['group_constraints']: indices = group_constraint['indices'] max_weight = group_constraint.get('max_weight', 1.0) min_weight = group_constraint.get('min_weight', 0.0) group_weight = cp.sum([weights[i] for i in indices]) constraint_list.append(group_weight <= max_weight) constraint_list.append(group_weight >= min_weight) if 'turnover' in constraints and self.current_weights is not None: max_turnover = constraints['turnover'] turnover = cp.norm(weights - self.current_weights, 1) constraint_list.append(turnover <= max_turnover) return constraint_list def _calculate_portfolio_stats(self, weights: np.ndarray, expected_returns: np.ndarray, cov_matrix: np.ndarray, risk_free_rate: float = 0.02) -> Dict[str, float]: """计算投资组合统计信息""" portfolio_return = np.dot(weights, expected_returns) portfolio_variance = np.dot(weights, np.dot(cov_matrix, weights)) portfolio_volatility = np.sqrt(portfolio_variance) sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility if portfolio_volatility > 0 else 0 return { 'return': portfolio_return, 'volatility': portfolio_volatility, 'variance': portfolio_variance, 'sharpe_ratio': sharpe_ratio } def estimate_covariance_matrix(self, returns: pd.DataFrame, method: str = 'ledoit_wolf') -> np.ndarray: """估计协方差矩阵""" if method not in self.covariance_estimators: raise ValueError(f"Unknown covariance estimation method: {method}") estimator = self.covariance_estimators[method] # 清理数据 returns_clean = returns.dropna() if len(returns_clean) < 2: raise ValueError("Insufficient data for covariance estimation") # 拟合并获取协方差矩阵 estimator.fit(returns_clean) cov_matrix = estimator.covariance_ return cov_matrix def estimate_expected_returns(self, returns: pd.DataFrame, method: str = 'historical_mean') -> np.ndarray: """估计预期收益""" returns_clean = returns.dropna() if method == 'historical_mean': expected_returns = returns_clean.mean().values elif method == 'ewma': # 指数加权移动平均 alpha = 0.94 expected_returns = returns_clean.ewm(alpha=alpha).mean().iloc[-1].values elif method == 'shrinkage': # 向整体均值收缩 historical_mean = returns_clean.mean().values overall_mean = np.mean(historical_mean) shrinkage_factor = 0.2 expected_returns = (1 - shrinkage_factor) * historical_mean + shrinkage_factor * overall_mean else: raise ValueError(f"Unknown expected returns method: {method}") return expected_returns class TransactionCostModel: """交易成本模型""" def __init__(self): self.cost_components = {} def add_cost_component(self, name: str, cost_func: Callable): """添加成本组件""" self.cost_components[name] = cost_func def calculate_total_cost(self, current_weights: np.ndarray, target_weights: np.ndarray, portfolio_value: float, prices: Optional[np.ndarray] = None) -> Dict[str, float]: """计算总交易成本""" weight_changes = np.abs(target_weights - current_weights) trade_values = weight_changes * portfolio_value costs = {} total_cost = 0.0 # 固定成本(佣金) commission_rate = 0.001 # 0.1% commission_cost = np.sum(trade_values) * commission_rate costs['commission'] = commission_cost total_cost += commission_cost # 买卖价差成本 bid_ask_spread = 0.002 # 0.2% 平均价差 spread_cost = np.sum(trade_values) * bid_ask_spread * 0.5 costs['bid_ask_spread'] = spread_cost total_cost += spread_cost # 市场冲击成本 impact_cost = self._calculate_market_impact(trade_values, prices) costs['market_impact'] = impact_cost total_cost += impact_cost # 时机成本 timing_cost = np.sum(trade_values) * 0.0005 # 0.05% costs['timing'] = timing_cost total_cost += timing_cost costs['total'] = total_cost return costs def _calculate_market_impact(self, trade_values: np.ndarray, prices: Optional[np.ndarray] = None) -> float: """计算市场冲击成本""" # 简化的方根法则:成本与交易量的平方根成正比 if prices is not None: volumes = trade_values / prices else: volumes = trade_values # 假设价格为1 # 市场冲击参数 impact_coefficient = 0.1 market_impact = impact_coefficient * np.sum(np.sqrt(volumes)) return market_impact class DynamicRebalancing: """动态再平衡策略""" def __init__(self, rebalance_threshold: float = 0.05, max_turnover: float = 0.5, transaction_cost_model: Optional[TransactionCostModel] = None): self.rebalance_threshold = rebalance_threshold self.max_turnover = max_turnover self.transaction_cost_model = transaction_cost_model or TransactionCostModel() self.rebalance_history = [] def should_rebalance(self, current_weights: np.ndarray, target_weights: np.ndarray, portfolio_value: float) -> Dict[str, any]: """判断是否需要再平衡""" weight_deviations = np.abs(current_weights - target_weights) max_deviation = np.max(weight_deviations) # 计算预期交易成本 expected_costs = self.transaction_cost_model.calculate_total_cost( current_weights, target_weights, portfolio_value ) # 成本-收益分析 rebalance_benefit = self._estimate_rebalance_benefit( current_weights, target_weights ) net_benefit = rebalance_benefit - expected_costs['total'] decision = { 'should_rebalance': (max_deviation > self.rebalance_threshold and net_benefit > 0), 'max_deviation': max_deviation, 'expected_costs': expected_costs, 'rebalance_benefit': rebalance_benefit, 'net_benefit': net_benefit } return decision def execute_rebalance(self, current_weights: np.ndarray, target_weights: np.ndarray, portfolio_value: float) -> Dict[str, any]: """执行再平衡""" # 考虑交易成本的最优再平衡 adjusted_weights = self._cost_aware_rebalance( current_weights, target_weights, portfolio_value ) # 计算实际交易成本 actual_costs = self.transaction_cost_model.calculate_total_cost( current_weights, adjusted_weights, portfolio_value ) # 记录再平衡历史 rebalance_record = { 'timestamp': pd.Timestamp.now(), 'original_weights': current_weights.copy(), 'target_weights': target_weights.copy(), 'adjusted_weights': adjusted_weights.copy(), 'costs': actual_costs, 'portfolio_value': portfolio_value } self.rebalance_history.append(rebalance_record) return { 'new_weights': adjusted_weights, 'costs': actual_costs, 'record': rebalance_record } def _estimate_rebalance_benefit(self, current_weights: np.ndarray, target_weights: np.ndarray) -> float: """估计再平衡收益""" # 简化估计:基于权重偏差的二次损失 weight_deviations = current_weights - target_weights rebalance_benefit = 0.5 * np.sum(weight_deviations ** 2) * 100 # 放大系数 return rebalance_benefit def _cost_aware_rebalance(self, current_weights: np.ndarray, target_weights: np.ndarray, portfolio_value: float) -> np.ndarray: """考虑成本的再平衡优化""" n_assets = len(current_weights) # 定义优化变量 weights = cp.Variable(n_assets) # 目标函数:最小化跟踪误差和交易成本 tracking_error = cp.sum_squares(weights - target_weights) # 简化的交易成本(线性近似) trade_amounts = cp.abs(weights - current_weights) * portfolio_value transaction_costs = cp.sum(trade_amounts) * 0.003 # 0.3% 总成本率 # 总目标函数 objective = cp.Minimize(tracking_error + transaction_costs / portfolio_value) # 约束条件 constraints = [ cp.sum(weights) == 1, weights >= 0, cp.norm(weights - current_weights, 1) <= self.max_turnover ] problem = cp.Problem(objective, constraints) try: problem.solve(solver='ECOS', verbose=False) if problem.status == cp.OPTIMAL: return weights.value else: # 如果优化失败,返回目标权重 return target_weights except Exception: return target_weights # 使用示例和测试 if __name__ == "__main__": print("Testing Portfolio Optimizer...") # 创建示例数据 np.random.seed(42) n_assets = 5 n_periods = 252 # 生成随机收益数据 returns = pd.DataFrame( np.random.multivariate_normal( mean=[0.001] * n_assets, cov=np.eye(n_assets) * 0.0004 + np.ones((n_assets, n_assets)) * 0.0001, size=n_periods ), columns=[f'Asset_{i}' for i in range(n_assets)] ) # 初始化优化器 optimizer = PortfolioOptimizer(risk_aversion=2.0, transaction_cost=0.001) # 估计预期收益和协方差矩阵 expected_returns = optimizer.estimate_expected_returns(returns, method='historical_mean') cov_matrix = optimizer.estimate_covariance_matrix(returns, method='ledoit_wolf') print(f"Expected returns: {expected_returns}") print(f"Covariance matrix shape: {cov_matrix.shape}") # 1. 均值-方差优化 print("\n1. Mean-Variance Optimization:") mv_result = optimizer.mean_variance_optimization(expected_returns, cov_matrix) if mv_result['status'] == 'success': print(f"Optimal weights: {mv_result['weights']}") print(f"Expected return: {mv_result['expected_return']:.4f}") print(f"Volatility: {mv_result['volatility']:.4f}") print(f"Sharpe ratio: {mv_result['sharpe_ratio']:.4f}") # 2. 风险平价优化 print("\n2. Risk Parity Optimization:") rp_result = optimizer.risk_parity_optimization(cov_matrix) if rp_result['status'] == 'success': print(f"Risk parity weights: {rp_result['weights']}") print(f"Risk contributions: {rp_result['risk_contributions']}") print(f"Portfolio volatility: {rp_result['portfolio_volatility']:.4f}") # 3. 最大分散化优化 print("\n3. Maximum Diversification Optimization:") md_result = optimizer.max_diversification_optimization(expected_returns, cov_matrix) if md_result['status'] == 'success': print(f"Max diversification weights: {md_result['weights']}") print(f"Diversification ratio: {md_result['diversification_ratio']:.4f}") # 4. 鲁棒优化 print("\n4. Robust Optimization:") robust_result = optimizer.robust_optimization(expected_returns, cov_matrix, kappa=0.1) if robust_result['status'] == 'success': print(f"Robust weights: {robust_result['weights']}") print(f"Robust utility: {robust_result['robust_utility']:.4f}") # 5. 多目标优化 print("\n5. Multi-Objective Optimization:") mo_result = optimizer.multi_objective_optimization( expected_returns, cov_matrix, objectives=['return', 'risk', 'diversification'], weights_objectives=[0.4, 0.3, 0.3] ) if mo_result['status'] == 'success': print(f"Multi-objective weights: {mo_result['weights']}") print(f"Objective values: {mo_result['objective_values']}") # 6. 测试动态再平衡 print("\n6. Dynamic Rebalancing:") rebalancer = DynamicRebalancing(rebalance_threshold=0.05) current_weights = np.array([0.3, 0.25, 0.2, 0.15, 0.1]) target_weights = mv_result['weights'] if mv_result['status'] == 'success' else np.array([0.2, 0.2, 0.2, 0.2, 0.2]) portfolio_value = 100000 should_rebalance = rebalancer.should_rebalance(current_weights, target_weights, portfolio_value) print(f"Should rebalance: {should_rebalance['should_rebalance']}") print(f"Max deviation: {should_rebalance['max_deviation']:.4f}") print(f"Expected costs: ${should_rebalance['expected_costs']['total']:.2f}") if should_rebalance['should_rebalance']: rebalance_result = rebalancer.execute_rebalance(current_weights, target_weights, portfolio_value) print(f"New weights: {rebalance_result['new_weights']}") print(f"Actual costs: ${rebalance_result['costs']['total']:.2f}") print("\nPortfolio optimization tests completed!")