hk-trading-platform / modules /portfolio_optimizer.py
Humphreykowl's picture
Upload 7 files
85fdeb5 verified
"""
投资组合优化器
Portfolio Optimization Engine (FIN 555, FIN 557)
"""
import numpy as np
import pandas as pd
import cvxpy as cp
from typing import Dict, List, Optional, Tuple, Union, Callable
from scipy.optimize import minimize, differential_evolution
from scipy import linalg
from sklearn.covariance import LedoitWolf, EmpiricalCovariance, OAS
from sklearn.preprocessing import StandardScaler
import logging
import warnings
warnings.filterwarnings('ignore')
class PortfolioOptimizer:
"""投资组合优化器主类"""
def __init__(self, risk_aversion: float = 1.0, transaction_cost: float = 0.001):
self.risk_aversion = risk_aversion
self.transaction_cost = transaction_cost
self.logger = logging.getLogger(__name__)
# 优化历史
self.optimization_history = []
self.current_weights = None
# 协方差估计器
self.covariance_estimators = {
'sample': EmpiricalCovariance(),
'ledoit_wolf': LedoitWolf(),
'oas': OAS()
}
def mean_variance_optimization(self, expected_returns: np.ndarray,
cov_matrix: np.ndarray,
constraints: Dict = None,
solver: str = 'ECOS') -> Dict[str, any]:
"""
均值-方差投资组合优化
Args:
expected_returns: 预期收益向量
cov_matrix: 协方差矩阵
constraints: 约束条件字典
solver: CVXPY求解器
Returns:
Dict: 优化结果
"""
n_assets = len(expected_returns)
# 定义优化变量
weights = cp.Variable(n_assets)
# 目标函数:最大化效用 = 预期收益 - 风险厌恶系数 * 方差
portfolio_return = expected_returns @ weights
portfolio_risk = cp.quad_form(weights, cov_matrix)
utility = portfolio_return - self.risk_aversion * portfolio_risk
objective = cp.Maximize(utility)
# 基本约束
base_constraints = [
cp.sum(weights) == 1, # 权重和为1
weights >= 0 # 不允许卖空(长仓限制)
]
# 添加自定义约束
all_constraints = base_constraints
if constraints:
all_constraints.extend(self._build_constraints(weights, constraints))
# 构建并求解问题
problem = cp.Problem(objective, all_constraints)
try:
problem.solve(solver=solver, verbose=False)
if problem.status == cp.OPTIMAL:
optimal_weights = weights.value
# 计算组合统计
portfolio_stats = self._calculate_portfolio_stats(
optimal_weights, expected_returns, cov_matrix
)
result = {
'status': 'success',
'weights': optimal_weights,
'expected_return': portfolio_stats['return'],
'volatility': portfolio_stats['volatility'],
'sharpe_ratio': portfolio_stats['sharpe_ratio'],
'utility': utility.value,
'solver_status': problem.status
}
self.current_weights = optimal_weights
self.optimization_history.append(result)
return result
else:
return {
'status': 'failed',
'solver_status': problem.status,
'weights': None
}
except Exception as e:
self.logger.error(f"Optimization failed: {str(e)}")
return {
'status': 'error',
'error': str(e),
'weights': None
}
def black_litterman_optimization(self, market_caps: np.ndarray,
cov_matrix: np.ndarray,
views: Dict = None,
risk_aversion: Optional[float] = None) -> Dict[str, any]:
"""
Black-Litterman模型优化
Args:
market_caps: 市场价值权重
cov_matrix: 协方差矩阵
views: 投资者观点 {'P': 选择矩阵, 'Q': 观点收益, 'Omega': 观点不确定性}
risk_aversion: 风险厌恶系数
Returns:
Dict: BL优化结果
"""
if risk_aversion is None:
risk_aversion = self.risk_aversion
n_assets = len(market_caps)
# 步骤1: 计算隐含平衡收益
market_weights = market_caps / np.sum(market_caps)
implied_returns = risk_aversion * np.dot(cov_matrix, market_weights)
# 步骤2: 整合投资者观点
if views and all(key in views for key in ['P', 'Q', 'Omega']):
P = views['P'] # 选择矩阵
Q = views['Q'] # 观点收益向量
Omega = views['Omega'] # 观点不确定性矩阵
# 计算tau (通常设为较小值)
tau = 0.025
# Black-Litterman公式
M1 = linalg.inv(tau * cov_matrix)
M2 = np.dot(P.T, np.dot(linalg.inv(Omega), P))
M3 = np.dot(linalg.inv(tau * cov_matrix), implied_returns)
M4 = np.dot(P.T, np.dot(linalg.inv(Omega), Q))
# 新的预期收益
bl_returns = np.dot(linalg.inv(M1 + M2), M3 + M4)
# 新的协方差矩阵
bl_cov = linalg.inv(M1 + M2)
else:
# 如果没有观点,使用隐含收益
bl_returns = implied_returns
bl_cov = cov_matrix
# 步骤3: 均值-方差优化
result = self.mean_variance_optimization(bl_returns, bl_cov)
if result['status'] == 'success':
result.update({
'type': 'black_litterman',
'implied_returns': implied_returns,
'bl_returns': bl_returns,
'market_weights': market_weights
})
return result
def risk_parity_optimization(self, cov_matrix: np.ndarray,
target_risk_contributions: Optional[np.ndarray] = None) -> Dict[str, any]:
"""
风险平价投资组合优化
Args:
cov_matrix: 协方差矩阵
target_risk_contributions: 目标风险贡献(如果为None则等权重风险)
Returns:
Dict: 风险平价优化结果
"""
n_assets = cov_matrix.shape[0]
if target_risk_contributions is None:
target_risk_contributions = np.ones(n_assets) / n_assets
def risk_parity_objective(weights):
"""风险平价目标函数"""
portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
# 计算每个资产的风险贡献
marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol
risk_contrib = weights * marginal_contrib / portfolio_vol
# 目标:最小化实际风险贡献与目标风险贡献的差异
return np.sum((risk_contrib - target_risk_contributions) ** 2)
# 约束条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, # 权重和为1
]
bounds = [(0.001, 0.5) for _ in range(n_assets)] # 每个权重的界限
# 初始化权重
initial_weights = np.ones(n_assets) / n_assets
try:
result = minimize(
risk_parity_objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'ftol': 1e-9, 'disp': False}
)
if result.success:
optimal_weights = result.x
# 计算风险贡献
portfolio_vol = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights)))
marginal_contrib = np.dot(cov_matrix, optimal_weights) / portfolio_vol
risk_contrib = optimal_weights * marginal_contrib / portfolio_vol
return {
'status': 'success',
'weights': optimal_weights,
'risk_contributions': risk_contrib,
'target_risk_contributions': target_risk_contributions,
'portfolio_volatility': portfolio_vol,
'optimization_result': result
}
else:
return {
'status': 'failed',
'error': 'Optimization did not converge',
'weights': None
}
except Exception as e:
self.logger.error(f"Risk parity optimization failed: {str(e)}")
return {
'status': 'error',
'error': str(e),
'weights': None
}
def max_diversification_optimization(self, expected_returns: np.ndarray,
cov_matrix: np.ndarray) -> Dict[str, any]:
"""
最大分散化投资组合优化
Args:
expected_returns: 预期收益
cov_matrix: 协方差矩阵
Returns:
Dict: 最大分散化优化结果
"""
n_assets = len(expected_returns)
# 定义优化变量
weights = cp.Variable(n_assets)
# 个股波动率
individual_vols = np.sqrt(np.diag(cov_matrix))
# 分散化比率 = 加权平均个股波动率 / 组合波动率
# 最大化分散化比率等价于最小化组合波动率/加权平均个股波动率
weighted_avg_vol = individual_vols @ weights
portfolio_vol = cp.norm(cp.sqrt(cov_matrix) @ weights, 2)
# 目标函数:最小化 portfolio_vol / weighted_avg_vol
# 等价于最小化 portfolio_vol^2 / weighted_avg_vol (避免除法)
objective = cp.Minimize(cp.square(portfolio_vol))
# 约束条件
constraints = [
cp.sum(weights) == 1, # 权重和为1
weights >= 0, # 长仓限制
weighted_avg_vol == 1 # 标准化约束
]
problem = cp.Problem(objective, constraints)
try:
problem.solve(solver='ECOS', verbose=False)
if problem.status == cp.OPTIMAL:
optimal_weights = weights.value
# 计算分散化比率
portfolio_vol_val = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights)))
weighted_avg_vol_val = np.dot(individual_vols, optimal_weights)
diversification_ratio = weighted_avg_vol_val / portfolio_vol_val
return {
'status': 'success',
'weights': optimal_weights,
'diversification_ratio': diversification_ratio,
'portfolio_volatility': portfolio_vol_val,
'weighted_avg_volatility': weighted_avg_vol_val
}
else:
return {
'status': 'failed',
'solver_status': problem.status,
'weights': None
}
except Exception as e:
self.logger.error(f"Max diversification optimization failed: {str(e)}")
return {
'status': 'error',
'error': str(e),
'weights': None
}
def robust_optimization(self, expected_returns: np.ndarray,
cov_matrix: np.ndarray,
uncertainty_set: str = 'ellipsoidal',
kappa: float = 0.1) -> Dict[str, any]:
"""
鲁棒投资组合优化
Args:
expected_returns: 预期收益
cov_matrix: 协方差矩阵
uncertainty_set: 不确定性集合类型 ('ellipsoidal', 'box', 'polyhedral')
kappa: 鲁棒性参数
Returns:
Dict: 鲁棒优化结果
"""
n_assets = len(expected_returns)
# 定义优化变量
weights = cp.Variable(n_assets)
if uncertainty_set == 'ellipsoidal':
# 椭球不确定性集合
# min w^T μ - κ * ||Σ^(1/2) w||_2 - λ/2 * w^T Σ w
sqrt_cov = linalg.sqrtm(cov_matrix).real
uncertainty_penalty = cp.norm(sqrt_cov @ weights, 2)
portfolio_return = expected_returns @ weights
portfolio_risk = cp.quad_form(weights, cov_matrix)
robust_utility = portfolio_return - kappa * uncertainty_penalty - self.risk_aversion * portfolio_risk
objective = cp.Maximize(robust_utility)
elif uncertainty_set == 'box':
# 盒式不确定性集合
# 假设收益率在 [μ - κ*σ, μ + κ*σ] 范围内
return_std = np.sqrt(np.diag(cov_matrix))
worst_case_returns = expected_returns - kappa * return_std
portfolio_return = worst_case_returns @ weights
portfolio_risk = cp.quad_form(weights, cov_matrix)
robust_utility = portfolio_return - self.risk_aversion * portfolio_risk
objective = cp.Maximize(robust_utility)
else:
raise ValueError(f"Unsupported uncertainty set: {uncertainty_set}")
# 约束条件
constraints = [
cp.sum(weights) == 1,
weights >= 0
]
problem = cp.Problem(objective, constraints)
try:
problem.solve(solver='ECOS', verbose=False)
if problem.status == cp.OPTIMAL:
optimal_weights = weights.value
return {
'status': 'success',
'weights': optimal_weights,
'uncertainty_set': uncertainty_set,
'kappa': kappa,
'robust_utility': robust_utility.value
}
else:
return {
'status': 'failed',
'solver_status': problem.status,
'weights': None
}
except Exception as e:
self.logger.error(f"Robust optimization failed: {str(e)}")
return {
'status': 'error',
'error': str(e),
'weights': None
}
def multi_objective_optimization(self, expected_returns: np.ndarray,
cov_matrix: np.ndarray,
objectives: List[str] = ['return', 'risk', 'diversification'],
weights_objectives: Optional[List[float]] = None) -> Dict[str, any]:
"""
多目标投资组合优化
Args:
expected_returns: 预期收益
cov_matrix: 协方差矩阵
objectives: 目标函数列表
weights_objectives: 目标函数权重
Returns:
Dict: 多目标优化结果
"""
n_assets = len(expected_returns)
if weights_objectives is None:
weights_objectives = [1.0 / len(objectives)] * len(objectives)
def multi_objective_function(portfolio_weights):
"""多目标函数"""
total_objective = 0.0
for i, obj_type in enumerate(objectives):
if obj_type == 'return':
obj_value = -np.dot(expected_returns, portfolio_weights) # 负号因为要最大化
elif obj_type == 'risk':
obj_value = np.sqrt(np.dot(portfolio_weights, np.dot(cov_matrix, portfolio_weights)))
elif obj_type == 'diversification':
# 最小化赫芬达尔指数(权重平方和)
obj_value = np.sum(portfolio_weights ** 2)
elif obj_type == 'drawdown':
# 这里简化处理,实际需要历史数据
obj_value = np.sum(portfolio_weights ** 2) # 占位符
else:
obj_value = 0.0
total_objective += weights_objectives[i] * obj_value
return total_objective
# 约束条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0},
]
bounds = [(0.0, 1.0) for _ in range(n_assets)]
initial_weights = np.ones(n_assets) / n_assets
try:
# 使用差分进化算法求解多目标优化
result = differential_evolution(
multi_objective_function,
bounds,
constraints=constraints,
seed=42,
maxiter=1000,
atol=1e-6
)
if result.success:
optimal_weights = result.x
# 计算各个目标的值
objective_values = {}
for obj_type in objectives:
if obj_type == 'return':
objective_values[obj_type] = np.dot(expected_returns, optimal_weights)
elif obj_type == 'risk':
objective_values[obj_type] = np.sqrt(np.dot(optimal_weights, np.dot(cov_matrix, optimal_weights)))
elif obj_type == 'diversification':
objective_values[obj_type] = 1.0 / np.sum(optimal_weights ** 2) # 有效资产数
return {
'status': 'success',
'weights': optimal_weights,
'objectives': objectives,
'objective_values': objective_values,
'weights_objectives': weights_objectives
}
else:
return {
'status': 'failed',
'error': 'Multi-objective optimization did not converge',
'weights': None
}
except Exception as e:
self.logger.error(f"Multi-objective optimization failed: {str(e)}")
return {
'status': 'error',
'error': str(e),
'weights': None
}
def _build_constraints(self, weights: cp.Variable, constraints: Dict) -> List:
"""构建约束条件"""
constraint_list = []
if 'max_weight' in constraints:
max_weight = constraints['max_weight']
constraint_list.append(weights <= max_weight)
if 'min_weight' in constraints:
min_weight = constraints['min_weight']
constraint_list.append(weights >= min_weight)
if 'group_constraints' in constraints:
for group_constraint in constraints['group_constraints']:
indices = group_constraint['indices']
max_weight = group_constraint.get('max_weight', 1.0)
min_weight = group_constraint.get('min_weight', 0.0)
group_weight = cp.sum([weights[i] for i in indices])
constraint_list.append(group_weight <= max_weight)
constraint_list.append(group_weight >= min_weight)
if 'turnover' in constraints and self.current_weights is not None:
max_turnover = constraints['turnover']
turnover = cp.norm(weights - self.current_weights, 1)
constraint_list.append(turnover <= max_turnover)
return constraint_list
def _calculate_portfolio_stats(self, weights: np.ndarray,
expected_returns: np.ndarray,
cov_matrix: np.ndarray,
risk_free_rate: float = 0.02) -> Dict[str, float]:
"""计算投资组合统计信息"""
portfolio_return = np.dot(weights, expected_returns)
portfolio_variance = np.dot(weights, np.dot(cov_matrix, weights))
portfolio_volatility = np.sqrt(portfolio_variance)
sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility if portfolio_volatility > 0 else 0
return {
'return': portfolio_return,
'volatility': portfolio_volatility,
'variance': portfolio_variance,
'sharpe_ratio': sharpe_ratio
}
def estimate_covariance_matrix(self, returns: pd.DataFrame, method: str = 'ledoit_wolf') -> np.ndarray:
"""估计协方差矩阵"""
if method not in self.covariance_estimators:
raise ValueError(f"Unknown covariance estimation method: {method}")
estimator = self.covariance_estimators[method]
# 清理数据
returns_clean = returns.dropna()
if len(returns_clean) < 2:
raise ValueError("Insufficient data for covariance estimation")
# 拟合并获取协方差矩阵
estimator.fit(returns_clean)
cov_matrix = estimator.covariance_
return cov_matrix
def estimate_expected_returns(self, returns: pd.DataFrame, method: str = 'historical_mean') -> np.ndarray:
"""估计预期收益"""
returns_clean = returns.dropna()
if method == 'historical_mean':
expected_returns = returns_clean.mean().values
elif method == 'ewma':
# 指数加权移动平均
alpha = 0.94
expected_returns = returns_clean.ewm(alpha=alpha).mean().iloc[-1].values
elif method == 'shrinkage':
# 向整体均值收缩
historical_mean = returns_clean.mean().values
overall_mean = np.mean(historical_mean)
shrinkage_factor = 0.2
expected_returns = (1 - shrinkage_factor) * historical_mean + shrinkage_factor * overall_mean
else:
raise ValueError(f"Unknown expected returns method: {method}")
return expected_returns
class TransactionCostModel:
"""交易成本模型"""
def __init__(self):
self.cost_components = {}
def add_cost_component(self, name: str, cost_func: Callable):
"""添加成本组件"""
self.cost_components[name] = cost_func
def calculate_total_cost(self, current_weights: np.ndarray,
target_weights: np.ndarray,
portfolio_value: float,
prices: Optional[np.ndarray] = None) -> Dict[str, float]:
"""计算总交易成本"""
weight_changes = np.abs(target_weights - current_weights)
trade_values = weight_changes * portfolio_value
costs = {}
total_cost = 0.0
# 固定成本(佣金)
commission_rate = 0.001 # 0.1%
commission_cost = np.sum(trade_values) * commission_rate
costs['commission'] = commission_cost
total_cost += commission_cost
# 买卖价差成本
bid_ask_spread = 0.002 # 0.2% 平均价差
spread_cost = np.sum(trade_values) * bid_ask_spread * 0.5
costs['bid_ask_spread'] = spread_cost
total_cost += spread_cost
# 市场冲击成本
impact_cost = self._calculate_market_impact(trade_values, prices)
costs['market_impact'] = impact_cost
total_cost += impact_cost
# 时机成本
timing_cost = np.sum(trade_values) * 0.0005 # 0.05%
costs['timing'] = timing_cost
total_cost += timing_cost
costs['total'] = total_cost
return costs
def _calculate_market_impact(self, trade_values: np.ndarray,
prices: Optional[np.ndarray] = None) -> float:
"""计算市场冲击成本"""
# 简化的方根法则:成本与交易量的平方根成正比
if prices is not None:
volumes = trade_values / prices
else:
volumes = trade_values # 假设价格为1
# 市场冲击参数
impact_coefficient = 0.1
market_impact = impact_coefficient * np.sum(np.sqrt(volumes))
return market_impact
class DynamicRebalancing:
"""动态再平衡策略"""
def __init__(self, rebalance_threshold: float = 0.05,
max_turnover: float = 0.5,
transaction_cost_model: Optional[TransactionCostModel] = None):
self.rebalance_threshold = rebalance_threshold
self.max_turnover = max_turnover
self.transaction_cost_model = transaction_cost_model or TransactionCostModel()
self.rebalance_history = []
def should_rebalance(self, current_weights: np.ndarray,
target_weights: np.ndarray,
portfolio_value: float) -> Dict[str, any]:
"""判断是否需要再平衡"""
weight_deviations = np.abs(current_weights - target_weights)
max_deviation = np.max(weight_deviations)
# 计算预期交易成本
expected_costs = self.transaction_cost_model.calculate_total_cost(
current_weights, target_weights, portfolio_value
)
# 成本-收益分析
rebalance_benefit = self._estimate_rebalance_benefit(
current_weights, target_weights
)
net_benefit = rebalance_benefit - expected_costs['total']
decision = {
'should_rebalance': (max_deviation > self.rebalance_threshold and net_benefit > 0),
'max_deviation': max_deviation,
'expected_costs': expected_costs,
'rebalance_benefit': rebalance_benefit,
'net_benefit': net_benefit
}
return decision
def execute_rebalance(self, current_weights: np.ndarray,
target_weights: np.ndarray,
portfolio_value: float) -> Dict[str, any]:
"""执行再平衡"""
# 考虑交易成本的最优再平衡
adjusted_weights = self._cost_aware_rebalance(
current_weights, target_weights, portfolio_value
)
# 计算实际交易成本
actual_costs = self.transaction_cost_model.calculate_total_cost(
current_weights, adjusted_weights, portfolio_value
)
# 记录再平衡历史
rebalance_record = {
'timestamp': pd.Timestamp.now(),
'original_weights': current_weights.copy(),
'target_weights': target_weights.copy(),
'adjusted_weights': adjusted_weights.copy(),
'costs': actual_costs,
'portfolio_value': portfolio_value
}
self.rebalance_history.append(rebalance_record)
return {
'new_weights': adjusted_weights,
'costs': actual_costs,
'record': rebalance_record
}
def _estimate_rebalance_benefit(self, current_weights: np.ndarray,
target_weights: np.ndarray) -> float:
"""估计再平衡收益"""
# 简化估计:基于权重偏差的二次损失
weight_deviations = current_weights - target_weights
rebalance_benefit = 0.5 * np.sum(weight_deviations ** 2) * 100 # 放大系数
return rebalance_benefit
def _cost_aware_rebalance(self, current_weights: np.ndarray,
target_weights: np.ndarray,
portfolio_value: float) -> np.ndarray:
"""考虑成本的再平衡优化"""
n_assets = len(current_weights)
# 定义优化变量
weights = cp.Variable(n_assets)
# 目标函数:最小化跟踪误差和交易成本
tracking_error = cp.sum_squares(weights - target_weights)
# 简化的交易成本(线性近似)
trade_amounts = cp.abs(weights - current_weights) * portfolio_value
transaction_costs = cp.sum(trade_amounts) * 0.003 # 0.3% 总成本率
# 总目标函数
objective = cp.Minimize(tracking_error + transaction_costs / portfolio_value)
# 约束条件
constraints = [
cp.sum(weights) == 1,
weights >= 0,
cp.norm(weights - current_weights, 1) <= self.max_turnover
]
problem = cp.Problem(objective, constraints)
try:
problem.solve(solver='ECOS', verbose=False)
if problem.status == cp.OPTIMAL:
return weights.value
else:
# 如果优化失败,返回目标权重
return target_weights
except Exception:
return target_weights
# 使用示例和测试
if __name__ == "__main__":
print("Testing Portfolio Optimizer...")
# 创建示例数据
np.random.seed(42)
n_assets = 5
n_periods = 252
# 生成随机收益数据
returns = pd.DataFrame(
np.random.multivariate_normal(
mean=[0.001] * n_assets,
cov=np.eye(n_assets) * 0.0004 + np.ones((n_assets, n_assets)) * 0.0001,
size=n_periods
),
columns=[f'Asset_{i}' for i in range(n_assets)]
)
# 初始化优化器
optimizer = PortfolioOptimizer(risk_aversion=2.0, transaction_cost=0.001)
# 估计预期收益和协方差矩阵
expected_returns = optimizer.estimate_expected_returns(returns, method='historical_mean')
cov_matrix = optimizer.estimate_covariance_matrix(returns, method='ledoit_wolf')
print(f"Expected returns: {expected_returns}")
print(f"Covariance matrix shape: {cov_matrix.shape}")
# 1. 均值-方差优化
print("\n1. Mean-Variance Optimization:")
mv_result = optimizer.mean_variance_optimization(expected_returns, cov_matrix)
if mv_result['status'] == 'success':
print(f"Optimal weights: {mv_result['weights']}")
print(f"Expected return: {mv_result['expected_return']:.4f}")
print(f"Volatility: {mv_result['volatility']:.4f}")
print(f"Sharpe ratio: {mv_result['sharpe_ratio']:.4f}")
# 2. 风险平价优化
print("\n2. Risk Parity Optimization:")
rp_result = optimizer.risk_parity_optimization(cov_matrix)
if rp_result['status'] == 'success':
print(f"Risk parity weights: {rp_result['weights']}")
print(f"Risk contributions: {rp_result['risk_contributions']}")
print(f"Portfolio volatility: {rp_result['portfolio_volatility']:.4f}")
# 3. 最大分散化优化
print("\n3. Maximum Diversification Optimization:")
md_result = optimizer.max_diversification_optimization(expected_returns, cov_matrix)
if md_result['status'] == 'success':
print(f"Max diversification weights: {md_result['weights']}")
print(f"Diversification ratio: {md_result['diversification_ratio']:.4f}")
# 4. 鲁棒优化
print("\n4. Robust Optimization:")
robust_result = optimizer.robust_optimization(expected_returns, cov_matrix, kappa=0.1)
if robust_result['status'] == 'success':
print(f"Robust weights: {robust_result['weights']}")
print(f"Robust utility: {robust_result['robust_utility']:.4f}")
# 5. 多目标优化
print("\n5. Multi-Objective Optimization:")
mo_result = optimizer.multi_objective_optimization(
expected_returns, cov_matrix,
objectives=['return', 'risk', 'diversification'],
weights_objectives=[0.4, 0.3, 0.3]
)
if mo_result['status'] == 'success':
print(f"Multi-objective weights: {mo_result['weights']}")
print(f"Objective values: {mo_result['objective_values']}")
# 6. 测试动态再平衡
print("\n6. Dynamic Rebalancing:")
rebalancer = DynamicRebalancing(rebalance_threshold=0.05)
current_weights = np.array([0.3, 0.25, 0.2, 0.15, 0.1])
target_weights = mv_result['weights'] if mv_result['status'] == 'success' else np.array([0.2, 0.2, 0.2, 0.2, 0.2])
portfolio_value = 100000
should_rebalance = rebalancer.should_rebalance(current_weights, target_weights, portfolio_value)
print(f"Should rebalance: {should_rebalance['should_rebalance']}")
print(f"Max deviation: {should_rebalance['max_deviation']:.4f}")
print(f"Expected costs: ${should_rebalance['expected_costs']['total']:.2f}")
if should_rebalance['should_rebalance']:
rebalance_result = rebalancer.execute_rebalance(current_weights, target_weights, portfolio_value)
print(f"New weights: {rebalance_result['new_weights']}")
print(f"Actual costs: ${rebalance_result['costs']['total']:.2f}")
print("\nPortfolio optimization tests completed!")