|
|
| """
|
| 智能分析系统(股票) - 股票市场数据分析系统
|
| 开发者:熊猫大侠
|
| 版本:v2.1.0
|
| 许可证:MIT License
|
| """
|
|
|
| import pandas as pd
|
| import numpy as np
|
| from datetime import datetime, timedelta
|
|
|
| class RiskMonitor:
|
| def __init__(self, analyzer):
|
| self.analyzer = analyzer
|
|
|
| def analyze_stock_risk(self, stock_code, market_type='A'):
|
| """分析单只股票的风险"""
|
| try:
|
|
|
| df = self.analyzer.get_stock_data(stock_code, market_type)
|
| df = self.analyzer.calculate_indicators(df)
|
|
|
|
|
| volatility_risk = self._analyze_volatility_risk(df)
|
| trend_risk = self._analyze_trend_risk(df)
|
| reversal_risk = self._analyze_reversal_risk(df)
|
| volume_risk = self._analyze_volume_risk(df)
|
|
|
|
|
| total_risk_score = (
|
| volatility_risk['score'] * 0.3 +
|
| trend_risk['score'] * 0.3 +
|
| reversal_risk['score'] * 0.25 +
|
| volume_risk['score'] * 0.15
|
| )
|
|
|
|
|
| if total_risk_score >= 80:
|
| risk_level = "极高"
|
| elif total_risk_score >= 60:
|
| risk_level = "高"
|
| elif total_risk_score >= 40:
|
| risk_level = "中等"
|
| elif total_risk_score >= 20:
|
| risk_level = "低"
|
| else:
|
| risk_level = "极低"
|
|
|
|
|
| alerts = []
|
|
|
| if volatility_risk['score'] >= 70:
|
| alerts.append({
|
| "type": "volatility",
|
| "level": "高",
|
| "message": f"波动率风险较高 ({volatility_risk['value']:.2f}%),可能面临大幅波动"
|
| })
|
|
|
| if trend_risk['score'] >= 70:
|
| alerts.append({
|
| "type": "trend",
|
| "level": "高",
|
| "message": f"趋势风险较高,当前处于{trend_risk['trend']}趋势,可能面临加速下跌"
|
| })
|
|
|
| if reversal_risk['score'] >= 70:
|
| alerts.append({
|
| "type": "reversal",
|
| "level": "高",
|
| "message": f"趋势反转风险较高,技术指标显示可能{reversal_risk['direction']}反转"
|
| })
|
|
|
| if volume_risk['score'] >= 70:
|
| alerts.append({
|
| "type": "volume",
|
| "level": "高",
|
| "message": f"成交量异常,{volume_risk['pattern']},可能预示价格波动"
|
| })
|
|
|
| return {
|
| "total_risk_score": total_risk_score,
|
| "risk_level": risk_level,
|
| "volatility_risk": volatility_risk,
|
| "trend_risk": trend_risk,
|
| "reversal_risk": reversal_risk,
|
| "volume_risk": volume_risk,
|
| "alerts": alerts
|
| }
|
|
|
| except Exception as e:
|
| print(f"分析股票风险出错: {str(e)}")
|
| return {
|
| "error": f"分析风险时出错: {str(e)}"
|
| }
|
|
|
| def _analyze_volatility_risk(self, df):
|
| """分析波动率风险"""
|
|
|
| recent_volatility = df.iloc[-1]['Volatility']
|
|
|
|
|
| avg_volatility = df['Volatility'].mean()
|
| volatility_change = recent_volatility / avg_volatility - 1
|
|
|
|
|
| if recent_volatility > 5 and volatility_change > 0.5:
|
| score = 90
|
| elif recent_volatility > 4 and volatility_change > 0.3:
|
| score = 75
|
| elif recent_volatility > 3 and volatility_change > 0.1:
|
| score = 60
|
| elif recent_volatility > 2:
|
| score = 40
|
| elif recent_volatility > 1:
|
| score = 20
|
| else:
|
| score = 0
|
|
|
| return {
|
| "score": score,
|
| "value": recent_volatility,
|
| "change": volatility_change * 100,
|
| "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
| }
|
|
|
| def _analyze_trend_risk(self, df):
|
| """分析趋势风险"""
|
|
|
| ma5 = df.iloc[-1]['MA5']
|
| ma20 = df.iloc[-1]['MA20']
|
| ma60 = df.iloc[-1]['MA60']
|
|
|
|
|
| if ma5 < ma20 < ma60:
|
| trend = "下降"
|
|
|
|
|
| ma5_ma20_gap = (ma20 - ma5) / ma20 * 100
|
|
|
| if ma5_ma20_gap > 5:
|
| score = 90
|
| elif ma5_ma20_gap > 3:
|
| score = 75
|
| elif ma5_ma20_gap > 1:
|
| score = 60
|
| else:
|
| score = 50
|
|
|
| elif ma5 > ma20 > ma60:
|
| trend = "上升"
|
| score = 20
|
| else:
|
| trend = "盘整"
|
| score = 40
|
|
|
| return {
|
| "score": score,
|
| "trend": trend,
|
| "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
| }
|
|
|
| def _analyze_reversal_risk(self, df):
|
| """分析趋势反转风险"""
|
|
|
| rsi = df.iloc[-1]['RSI']
|
| macd = df.iloc[-1]['MACD']
|
| signal = df.iloc[-1]['Signal']
|
| price = df.iloc[-1]['close']
|
| ma20 = df.iloc[-1]['MA20']
|
|
|
|
|
| reversal_signals = 0
|
|
|
|
|
| if rsi > 75:
|
| reversal_signals += 1
|
| direction = "向下"
|
| elif rsi < 25:
|
| reversal_signals += 1
|
| direction = "向上"
|
| else:
|
| direction = "无明确方向"
|
|
|
|
|
| if macd > signal and df.iloc[-2]['MACD'] <= df.iloc[-2]['Signal']:
|
| reversal_signals += 1
|
| direction = "向上"
|
| elif macd < signal and df.iloc[-2]['MACD'] >= df.iloc[-2]['Signal']:
|
| reversal_signals += 1
|
| direction = "向下"
|
|
|
|
|
| if price > ma20 * 1.1:
|
| reversal_signals += 1
|
| direction = "向下"
|
| elif price < ma20 * 0.9:
|
| reversal_signals += 1
|
| direction = "向上"
|
|
|
|
|
| if reversal_signals >= 3:
|
| score = 90
|
| elif reversal_signals == 2:
|
| score = 70
|
| elif reversal_signals == 1:
|
| score = 40
|
| else:
|
| score = 10
|
|
|
| return {
|
| "score": score,
|
| "reversal_signals": reversal_signals,
|
| "direction": direction,
|
| "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
| }
|
|
|
| def _analyze_volume_risk(self, df):
|
| """分析成交量风险"""
|
|
|
| recent_volume = df.iloc[-1]['volume']
|
| avg_volume = df['volume'].rolling(window=20).mean().iloc[-1]
|
| volume_ratio = recent_volume / avg_volume
|
|
|
|
|
| if volume_ratio > 3:
|
| pattern = "成交量暴增"
|
| score = 90
|
| elif volume_ratio > 2:
|
| pattern = "成交量显著放大"
|
| score = 70
|
| elif volume_ratio > 1.5:
|
| pattern = "成交量温和放大"
|
| score = 50
|
| elif volume_ratio < 0.5:
|
| pattern = "成交量萎缩"
|
| score = 40
|
| else:
|
| pattern = "成交量正常"
|
| score = 20
|
|
|
|
|
| price_change = (df.iloc[-1]['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close']
|
| volume_change = (recent_volume - df.iloc[-5]['volume']) / df.iloc[-5]['volume']
|
|
|
| if price_change > 0.05 and volume_change < -0.3:
|
| pattern = "价量背离(价格上涨但量能萎缩)"
|
| score = max(score, 80)
|
| elif price_change < -0.05 and volume_change < -0.3:
|
| pattern = "价量同向(价格下跌且量能萎缩)"
|
| score = max(score, 70)
|
| elif price_change < -0.05 and volume_change > 0.5:
|
| pattern = "价量同向(价格下跌且量能放大)"
|
| score = max(score, 85)
|
|
|
| return {
|
| "score": score,
|
| "volume_ratio": volume_ratio,
|
| "pattern": pattern,
|
| "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
| }
|
|
|
| def analyze_portfolio_risk(self, portfolio):
|
| """分析投资组合整体风险"""
|
| try:
|
| if not portfolio or len(portfolio) == 0:
|
| return {"error": "投资组合为空"}
|
|
|
|
|
| stock_risks = {}
|
| total_weight = 0
|
| weighted_risk_score = 0
|
|
|
| for stock in portfolio:
|
| stock_code = stock.get('stock_code')
|
| weight = stock.get('weight', 1)
|
| market_type = stock.get('market_type', 'A')
|
|
|
| if not stock_code:
|
| continue
|
|
|
|
|
| risk = self.analyze_stock_risk(stock_code, market_type)
|
| stock_risks[stock_code] = risk
|
|
|
|
|
| total_weight += weight
|
| weighted_risk_score += risk.get('total_risk_score', 50) * weight
|
|
|
|
|
| if total_weight > 0:
|
| portfolio_risk_score = weighted_risk_score / total_weight
|
| else:
|
| portfolio_risk_score = 0
|
|
|
|
|
| if portfolio_risk_score >= 80:
|
| risk_level = "极高"
|
| elif portfolio_risk_score >= 60:
|
| risk_level = "高"
|
| elif portfolio_risk_score >= 40:
|
| risk_level = "中等"
|
| elif portfolio_risk_score >= 20:
|
| risk_level = "低"
|
| else:
|
| risk_level = "极低"
|
|
|
|
|
| high_risk_stocks = [
|
| {
|
| "stock_code": code,
|
| "risk_score": risk.get('total_risk_score', 0),
|
| "risk_level": risk.get('risk_level', '未知')
|
| }
|
| for code, risk in stock_risks.items()
|
| if risk.get('total_risk_score', 0) >= 60
|
| ]
|
|
|
|
|
| all_alerts = []
|
| for code, risk in stock_risks.items():
|
| for alert in risk.get('alerts', []):
|
| all_alerts.append({
|
| "stock_code": code,
|
| **alert
|
| })
|
|
|
|
|
| risk_concentration = self._analyze_risk_concentration(portfolio, stock_risks)
|
|
|
| return {
|
| "portfolio_risk_score": portfolio_risk_score,
|
| "risk_level": risk_level,
|
| "high_risk_stocks": high_risk_stocks,
|
| "alerts": all_alerts,
|
| "risk_concentration": risk_concentration,
|
| "stock_risks": stock_risks
|
| }
|
|
|
| except Exception as e:
|
| print(f"分析投资组合风险出错: {str(e)}")
|
| return {
|
| "error": f"分析投资组合风险时出错: {str(e)}"
|
| }
|
|
|
| def _analyze_risk_concentration(self, portfolio, stock_risks):
|
| """分析风险集中度"""
|
|
|
| industries = {}
|
| for stock in portfolio:
|
| stock_code = stock.get('stock_code')
|
| stock_info = self.analyzer.get_stock_info(stock_code)
|
| industry = stock_info.get('行业', '未知')
|
| weight = stock.get('weight', 1)
|
|
|
| if industry in industries:
|
| industries[industry] += weight
|
| else:
|
| industries[industry] = weight
|
|
|
|
|
| max_industry = max(industries.items(), key=lambda x: x[1]) if industries else ('未知', 0)
|
|
|
|
|
| high_risk_weight = 0
|
| for stock in portfolio:
|
| stock_code = stock.get('stock_code')
|
| if stock_code in stock_risks and stock_risks[stock_code].get('total_risk_score', 0) >= 60:
|
| high_risk_weight += stock.get('weight', 1)
|
|
|
| return {
|
| "max_industry": max_industry[0],
|
| "max_industry_weight": max_industry[1],
|
| "high_risk_weight": high_risk_weight
|
| } |