Spaces:
Sleeping
Sleeping
| """ | |
| Result Classifier - Classifies MCP server responses for reliability analysis. | |
| Classification categories: | |
| - SUCCESS: Valid response with expected data | |
| - PARTIAL: Response OK but missing some fields | |
| - FALLBACK: Primary source failed, secondary succeeded | |
| - TRANSIENT: Temporary error (rate limit, timeout) | |
| - PERSISTENT: Repeated failures | |
| - HARD_FAILURE: Unrecoverable error | |
| """ | |
| import json | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| class ResultCategory(Enum): | |
| SUCCESS = "success" | |
| PARTIAL = "partial" | |
| FALLBACK = "fallback" | |
| TRANSIENT = "transient" | |
| PERSISTENT = "persistent" | |
| HARD_FAILURE = "hard_failure" | |
| RATE_LIMITED = "rate_limited" | |
| TIMEOUT = "timeout" | |
| HF_DEPENDENCY = "hf_dependency" | |
| COLD_START = "cold_start" | |
| UNKNOWN = "unknown" | |
| class ClassificationResult: | |
| """Result of classifying an MCP response.""" | |
| category: ResultCategory | |
| server: str | |
| ticker: str | |
| latency_ms: float | |
| data_completeness: float # 0.0 to 1.0 | |
| fallback_used: bool = False | |
| primary_source: Optional[str] = None | |
| fallback_source: Optional[str] = None | |
| error_message: Optional[str] = None | |
| raw_response: Optional[Dict] = None | |
| timestamp: datetime = field(default_factory=datetime.utcnow) | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary for logging/serialization.""" | |
| return { | |
| "timestamp": self.timestamp.isoformat() + "Z", | |
| "category": self.category.value, | |
| "server": self.server, | |
| "ticker": self.ticker, | |
| "latency_ms": self.latency_ms, | |
| "data_completeness": self.data_completeness, | |
| "fallback_used": self.fallback_used, | |
| "primary_source": self.primary_source, | |
| "fallback_source": self.fallback_source, | |
| "error_message": self.error_message | |
| } | |
| def to_json(self) -> str: | |
| """Convert to JSON string for logging.""" | |
| return json.dumps(self.to_dict()) | |
| class ResultClassifier: | |
| """Classifies MCP server responses based on content and error patterns.""" | |
| # Expected fields per server for completeness calculation | |
| EXPECTED_FIELDS = { | |
| "fundamentals-basket": { | |
| "required": ["ticker", "financials"], | |
| "optional": ["debt", "cash_flow", "swot_category"] | |
| }, | |
| "valuation-basket": { | |
| "required": ["metrics"], | |
| "optional": ["overall_signal", "swot_category"] | |
| }, | |
| "volatility-basket": { | |
| "required": ["metrics"], | |
| "optional": ["swot_category", "interpretation"] | |
| }, | |
| "macro-basket": { | |
| "required": ["metrics"], | |
| "optional": ["swot_category", "interpretation"] | |
| }, | |
| "news-basket": { | |
| "required": ["results"], | |
| "optional": ["query", "source"] | |
| }, | |
| "sentiment-basket": { | |
| "required": ["composite_score"], | |
| "optional": ["finnhub_score", "reddit_score", "overall_swot_category"] | |
| } | |
| } | |
| # Fallback detection patterns | |
| FALLBACK_INDICATORS = { | |
| "fundamentals-basket": { | |
| "field": "source", | |
| "fallback_values": ["yahoo_fallback", "yfinance"] | |
| }, | |
| "volatility-basket": { | |
| "field": "vix_source", | |
| "fallback_values": ["yahoo", "yfinance"] | |
| }, | |
| "news-basket": { | |
| "primary_field": "tavily_results", | |
| "fallback_field": "nyt_results" | |
| }, | |
| "sentiment-basket": { | |
| "field": "finnhub_score", | |
| "fallback_indicator": None # null means fallback to reddit | |
| } | |
| } | |
| def __init__(self): | |
| self.attempt_counts: Dict[str, int] = {} # Track consecutive failures | |
| def classify( | |
| self, | |
| server: str, | |
| ticker: str, | |
| response: Optional[Dict], | |
| error: Optional[Exception], | |
| latency_ms: float | |
| ) -> ClassificationResult: | |
| """Classify an MCP server response. | |
| Args: | |
| server: MCP server name | |
| ticker: Stock ticker tested | |
| response: Response dict (if successful) | |
| error: Exception (if failed) | |
| latency_ms: Request latency | |
| Returns: | |
| ClassificationResult with category and metadata | |
| """ | |
| key = f"{server}:{ticker}" | |
| # Handle errors first | |
| if error: | |
| return self._classify_error(server, ticker, error, latency_ms) | |
| # Handle missing response | |
| if response is None: | |
| return ClassificationResult( | |
| category=ResultCategory.HARD_FAILURE, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=0.0, | |
| error_message="No response received" | |
| ) | |
| # Check for error in response | |
| if isinstance(response, dict) and "error" in response: | |
| return self._classify_response_error(server, ticker, response, latency_ms) | |
| # Successful response - check completeness and fallback | |
| completeness = self._calculate_completeness(server, response) | |
| fallback_info = self._detect_fallback(server, response) | |
| # Reset failure counter on success | |
| self.attempt_counts[key] = 0 | |
| if fallback_info["used"]: | |
| return ClassificationResult( | |
| category=ResultCategory.FALLBACK, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=completeness, | |
| fallback_used=True, | |
| primary_source=fallback_info.get("primary"), | |
| fallback_source=fallback_info.get("fallback"), | |
| raw_response=response | |
| ) | |
| elif completeness < 0.5: | |
| return ClassificationResult( | |
| category=ResultCategory.PARTIAL, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=completeness, | |
| raw_response=response | |
| ) | |
| else: | |
| return ClassificationResult( | |
| category=ResultCategory.SUCCESS, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=completeness, | |
| raw_response=response | |
| ) | |
| def _classify_error( | |
| self, | |
| server: str, | |
| ticker: str, | |
| error: Exception, | |
| latency_ms: float | |
| ) -> ClassificationResult: | |
| """Classify an error response.""" | |
| key = f"{server}:{ticker}" | |
| error_str = str(error).lower() | |
| # Increment attempt counter | |
| self.attempt_counts[key] = self.attempt_counts.get(key, 0) + 1 | |
| attempts = self.attempt_counts[key] | |
| # Classify error type | |
| if "429" in error_str or "rate limit" in error_str: | |
| category = ResultCategory.RATE_LIMITED | |
| elif "timeout" in error_str or "timed out" in error_str: | |
| category = ResultCategory.TIMEOUT | |
| elif "huggingface" in error_str or "hf.space" in error_str: | |
| category = ResultCategory.HF_DEPENDENCY | |
| elif "cold start" in error_str: | |
| category = ResultCategory.COLD_START | |
| elif "503" in error_str or "502" in error_str or "500" in error_str: | |
| category = ResultCategory.TRANSIENT if attempts < 3 else ResultCategory.PERSISTENT | |
| elif "400" in error_str or "401" in error_str or "403" in error_str or "404" in error_str: | |
| category = ResultCategory.HARD_FAILURE | |
| else: | |
| category = ResultCategory.TRANSIENT if attempts < 3 else ResultCategory.PERSISTENT | |
| return ClassificationResult( | |
| category=category, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=0.0, | |
| error_message=str(error) | |
| ) | |
| def _classify_response_error( | |
| self, | |
| server: str, | |
| ticker: str, | |
| response: Dict, | |
| latency_ms: float | |
| ) -> ClassificationResult: | |
| """Classify an error embedded in a response.""" | |
| error_msg = response.get("error", "Unknown error") | |
| return ClassificationResult( | |
| category=ResultCategory.HARD_FAILURE, | |
| server=server, | |
| ticker=ticker, | |
| latency_ms=latency_ms, | |
| data_completeness=0.0, | |
| error_message=error_msg, | |
| raw_response=response | |
| ) | |
| def _calculate_completeness(self, server: str, response: Dict) -> float: | |
| """Calculate data completeness for a response.""" | |
| schema = self.EXPECTED_FIELDS.get(server, {"required": [], "optional": []}) | |
| required = schema["required"] | |
| optional = schema["optional"] | |
| if not required and not optional: | |
| return 1.0 # Unknown server, assume complete | |
| required_present = sum(1 for f in required if f in response and response[f]) | |
| optional_present = sum(1 for f in optional if f in response and response[f]) | |
| total_required = len(required) | |
| total_optional = len(optional) | |
| if total_required == 0: | |
| return 1.0 if total_optional == 0 else optional_present / total_optional | |
| # Weight: required fields = 70%, optional = 30% | |
| required_score = required_present / total_required if total_required else 1.0 | |
| optional_score = optional_present / total_optional if total_optional else 1.0 | |
| return 0.7 * required_score + 0.3 * optional_score | |
| def _detect_fallback(self, server: str, response: Dict) -> Dict: | |
| """Detect if fallback was used in response.""" | |
| indicators = self.FALLBACK_INDICATORS.get(server) | |
| if not indicators: | |
| return {"used": False} | |
| # Simple field-based detection | |
| if "field" in indicators: | |
| field = indicators["field"] | |
| value = response.get(field) | |
| if "fallback_values" in indicators: | |
| if value in indicators["fallback_values"]: | |
| return { | |
| "used": True, | |
| "primary": f"primary_{server}", | |
| "fallback": value | |
| } | |
| if "fallback_indicator" in indicators: | |
| if value is indicators["fallback_indicator"]: | |
| return { | |
| "used": True, | |
| "primary": field, | |
| "fallback": "alternative" | |
| } | |
| # News-basket: check if primary is empty but fallback has data | |
| if "primary_field" in indicators and "fallback_field" in indicators: | |
| primary = response.get(indicators["primary_field"], []) | |
| fallback = response.get(indicators["fallback_field"], []) | |
| if not primary and fallback: | |
| return { | |
| "used": True, | |
| "primary": indicators["primary_field"], | |
| "fallback": indicators["fallback_field"] | |
| } | |
| return {"used": False} | |
| def reset_counters(self): | |
| """Reset all attempt counters.""" | |
| self.attempt_counts.clear() | |
| class ResultAggregator: | |
| """Aggregates classification results for analysis.""" | |
| def __init__(self): | |
| self.results: List[ClassificationResult] = [] | |
| self.counts: Dict[ResultCategory, int] = {cat: 0 for cat in ResultCategory} | |
| self.by_server: Dict[str, Dict[ResultCategory, int]] = {} | |
| self.latencies: List[float] = [] | |
| def add(self, result: ClassificationResult): | |
| """Add a classification result.""" | |
| self.results.append(result) | |
| self.counts[result.category] += 1 | |
| self.latencies.append(result.latency_ms) | |
| if result.server not in self.by_server: | |
| self.by_server[result.server] = {cat: 0 for cat in ResultCategory} | |
| self.by_server[result.server][result.category] += 1 | |
| def summary(self) -> Dict: | |
| """Generate summary statistics.""" | |
| total = len(self.results) | |
| if total == 0: | |
| return {"total": 0, "success_rate": 0.0} | |
| success_count = self.counts[ResultCategory.SUCCESS] + self.counts[ResultCategory.PARTIAL] | |
| fallback_count = self.counts[ResultCategory.FALLBACK] | |
| return { | |
| "total": total, | |
| "success_rate": (success_count + fallback_count) / total, | |
| "fallback_rate": fallback_count / total, | |
| "failure_rate": sum( | |
| self.counts[c] for c in [ | |
| ResultCategory.HARD_FAILURE, | |
| ResultCategory.PERSISTENT | |
| ] | |
| ) / total, | |
| "by_category": {cat.value: count for cat, count in self.counts.items()}, | |
| "by_server": { | |
| server: {cat.value: count for cat, count in cats.items()} | |
| for server, cats in self.by_server.items() | |
| }, | |
| "latency_p50": sorted(self.latencies)[len(self.latencies)//2] if self.latencies else 0, | |
| "latency_p95": sorted(self.latencies)[int(len(self.latencies)*0.95)] if self.latencies else 0, | |
| "latency_p99": sorted(self.latencies)[int(len(self.latencies)*0.99)] if self.latencies else 0 | |
| } | |
| if __name__ == "__main__": | |
| # Demo usage | |
| classifier = ResultClassifier() | |
| aggregator = ResultAggregator() | |
| # Simulate some results | |
| test_cases = [ | |
| ("fundamentals-basket", "AAPL", {"ticker": "AAPL", "financials": {"revenue": 1000}}, None, 250), | |
| ("fundamentals-basket", "MSFT", {"ticker": "MSFT", "financials": {"revenue": 2000}, "source": "yahoo_fallback"}, None, 500), | |
| ("valuation-basket", "GOOGL", {"metrics": {"pe_ratio": 25}}, None, 150), | |
| ("news-basket", "TSLA", None, Exception("429 Rate limit exceeded"), 0), | |
| ("sentiment-basket", "NVDA", {"error": "Finnhub API key invalid"}, None, 100), | |
| ] | |
| for server, ticker, response, error, latency in test_cases: | |
| result = classifier.classify(server, ticker, response, error, latency) | |
| aggregator.add(result) | |
| print(f"{ticker} via {server}: {result.category.value}") | |
| print("\nSummary:") | |
| print(json.dumps(aggregator.summary(), indent=2)) | |