"""Recommendation engine — turns score weaknesses into actionable suggestions.""" import os import yaml from typing import Dict, Any, List, Optional from dataclasses import dataclass @dataclass class Suggestion: priority: int sub_score_target: str message: str projected_gain: float trigger_feature: str = "" trigger_value: float = 0.0 class RecommendationEngine: """Generates ranked improvement suggestions from sub-scores and raw features.""" def __init__(self, rules_path: Optional[str] = None): self.rules = self._load_rules(rules_path) self.target_score = 75.0 self.max_suggestions = 5 def _load_rules(self, rules_path: Optional[str]) -> List[Dict[str, Any]]: """Load suggestion rules from YAML.""" if rules_path and os.path.exists(rules_path): with open(rules_path, 'r') as f: data = yaml.safe_load(f) or {} return data.get("rules", []) # Try default paths for path in ['configs/suggestion_rules.yaml', 'viral-images/configs/suggestion_rules.yaml']: if os.path.exists(path): with open(path, 'r') as f: data = yaml.safe_load(f) or {} return data.get("rules", []) return [] def generate(self, sub_scores: Dict[str, float], raw_features: Dict[str, Any], use_case: str = "default") -> List[Suggestion]: """ Generate ranked improvement suggestions. Args: sub_scores: 8 sub-scores [0-100] raw_features: Raw extracted features use_case: Current use case preset Returns: List of Suggestion objects, ranked by priority. """ suggestions = [] # Sort sub-scores ascending to find weakest areas weakest = sorted(sub_scores.items(), key=lambda x: x[1]) # Try to match each weak sub-score with a specific rule for sub_name, sub_value in weakest: # Skip improvement_potential (meta-score) if sub_name == "improvement_potential": continue # Find matching rules for this sub-score matching = self._find_matching_rules(sub_name, sub_value, raw_features) # Take best matching rule for rule in matching[:2]: # max 2 per sub-score gain = min(rule.get("max_gain", 15.0), self.target_score - sub_value) gain = max(0, gain) sugg = Suggestion( priority=0, # assigned later sub_score_target=sub_name, message=rule.get("message", ""), projected_gain=gain, trigger_feature=rule.get("trigger_feature", ""), trigger_value=rule.get("trigger_value", 0.0) ) suggestions.append(sugg) # Rank by: (gap to target) × (max_gain weight) = most impactful first for sugg in suggestions: current = sub_scores.get(sugg.sub_score_target, 50) gap = max(0, self.target_score - current) # Priority score = gap × projected_gain sugg.priority = gap * sugg.projected_gain # Sort descending by priority, then take top N suggestions.sort(key=lambda x: -x.priority) suggestions = suggestions[:self.max_suggestions] # Renumber priorities 1..N for i, sugg in enumerate(suggestions, 1): sugg.priority = i return suggestions def _find_matching_rules(self, sub_score_name: str, sub_score_value: float, raw_features: Dict[str, Any]) -> List[Dict]: """Find rules that match this sub-score's weakness.""" matches = [] for rule in self.rules: # Check sub-score match if rule.get("sub_score") != sub_score_name: continue # Check max_score threshold if sub_score_value > rule.get("max_score", 100): continue # Check trigger feature condition trigger_feature = rule.get("trigger_feature", "") trigger_value = rule.get("trigger_value", 0.0) trigger_cond = rule.get("trigger_condition", "lt") # Look up trigger feature value in raw features feat_key = None # Try heuristic_, saliency_, ocr_, quality_, semantic_ prefixes for prefix in ["heuristic_", "saliency_", "ocr_", "quality_", "semantic_"]: candidate = f"{prefix}{trigger_feature}" if candidate in raw_features: feat_key = candidate break if feat_key is None: feat_key = trigger_feature # try raw key feat_val = raw_features.get(feat_key) if feat_val is None: continue # Evaluate condition matched = False if trigger_cond == "lt" and feat_val < trigger_value: matched = True elif trigger_cond == "gt" and feat_val > trigger_value: matched = True elif trigger_cond == "eq" and abs(feat_val - trigger_value) < 0.01: matched = True if matched: matches.append(rule) return matches def estimate_improvement(self, sub_scores: Dict[str, float], suggestions: List[Suggestion], use_case: str) -> float: """ Estimate overall score improvement if all suggestions are followed. Returns projected overall score. """ if not suggestions: return sub_scores.get("overall_score", 50.0) # Apply projected gains to each targeted sub-score projected = dict(sub_scores) # Aggregate gains per sub-score (take max, not sum — overlapping suggestions) gains_by_sub = {} for sugg in suggestions: gains_by_sub[sugg.sub_score_target] = max( gains_by_sub.get(sugg.sub_score_target, 0), sugg.projected_gain ) for sub_name, gain in gains_by_sub.items(): current = projected.get(sub_name, 0) projected[sub_name] = min(100, current + gain) # Re-aggregate (same weights as scoring engine) # For simplicity, approximate: mean of top 3 weakest improvements improved_subs = [projected[s] for s in gains_by_sub.keys()] if not improved_subs: return sub_scores.get("overall_score", 50.0) # Rough estimate: apply gains proportionally to their weights # This is approximate — in production, call ScoringEngine._aggregate() current_overall = sum(sub_scores.values()) / len(sub_scores) projected_overall = sum(projected.values()) / len(projected) return round(min(100, projected_overall), 1) def compute_improvement_potential(sub_scores: Dict[str, float]) -> float: """Compute improvement potential as meta-score.""" target = 75.0 gaps = sorted([max(0, target - s) for s in sub_scores.values() if s != "improvement_potential"], reverse=True) top3 = gaps[:3] return round(sum(top3) / len(top3) if top3 else 0.0, 1)