Spaces:
Running
Running
| """Recommendation engine — turns score weaknesses into actionable suggestions.""" | |
| import os | |
| import yaml | |
| from typing import Dict, Any, List, Optional | |
| from dataclasses import dataclass | |
| class Suggestion: | |
| priority: int | |
| sub_score_target: str | |
| message: str | |
| projected_gain: float | |
| trigger_feature: str = "" | |
| trigger_value: float = 0.0 | |
| class RecommendationEngine: | |
| """Generates ranked improvement suggestions from sub-scores and raw features.""" | |
| def __init__(self, rules_path: Optional[str] = None): | |
| self.rules = self._load_rules(rules_path) | |
| self.target_score = 75.0 | |
| self.max_suggestions = 5 | |
| def _load_rules(self, rules_path: Optional[str]) -> List[Dict[str, Any]]: | |
| """Load suggestion rules from YAML.""" | |
| if rules_path and os.path.exists(rules_path): | |
| with open(rules_path, 'r') as f: | |
| data = yaml.safe_load(f) or {} | |
| return data.get("rules", []) | |
| # Try default paths | |
| for path in ['configs/suggestion_rules.yaml', 'viral-images/configs/suggestion_rules.yaml']: | |
| if os.path.exists(path): | |
| with open(path, 'r') as f: | |
| data = yaml.safe_load(f) or {} | |
| return data.get("rules", []) | |
| return [] | |
| def generate(self, sub_scores: Dict[str, float], | |
| raw_features: Dict[str, Any], | |
| use_case: str = "default") -> List[Suggestion]: | |
| """ | |
| Generate ranked improvement suggestions. | |
| Args: | |
| sub_scores: 8 sub-scores [0-100] | |
| raw_features: Raw extracted features | |
| use_case: Current use case preset | |
| Returns: | |
| List of Suggestion objects, ranked by priority. | |
| """ | |
| suggestions = [] | |
| # Sort sub-scores ascending to find weakest areas | |
| weakest = sorted(sub_scores.items(), key=lambda x: x[1]) | |
| # Try to match each weak sub-score with a specific rule | |
| for sub_name, sub_value in weakest: | |
| # Skip improvement_potential (meta-score) | |
| if sub_name == "improvement_potential": | |
| continue | |
| # Find matching rules for this sub-score | |
| matching = self._find_matching_rules(sub_name, sub_value, raw_features) | |
| # Take best matching rule | |
| for rule in matching[:2]: # max 2 per sub-score | |
| gain = min(rule.get("max_gain", 15.0), self.target_score - sub_value) | |
| gain = max(0, gain) | |
| sugg = Suggestion( | |
| priority=0, # assigned later | |
| sub_score_target=sub_name, | |
| message=rule.get("message", ""), | |
| projected_gain=gain, | |
| trigger_feature=rule.get("trigger_feature", ""), | |
| trigger_value=rule.get("trigger_value", 0.0) | |
| ) | |
| suggestions.append(sugg) | |
| # Rank by: (gap to target) × (max_gain weight) = most impactful first | |
| for sugg in suggestions: | |
| current = sub_scores.get(sugg.sub_score_target, 50) | |
| gap = max(0, self.target_score - current) | |
| # Priority score = gap × projected_gain | |
| sugg.priority = gap * sugg.projected_gain | |
| # Sort descending by priority, then take top N | |
| suggestions.sort(key=lambda x: -x.priority) | |
| suggestions = suggestions[:self.max_suggestions] | |
| # Renumber priorities 1..N | |
| for i, sugg in enumerate(suggestions, 1): | |
| sugg.priority = i | |
| return suggestions | |
| def _find_matching_rules(self, sub_score_name: str, sub_score_value: float, | |
| raw_features: Dict[str, Any]) -> List[Dict]: | |
| """Find rules that match this sub-score's weakness.""" | |
| matches = [] | |
| for rule in self.rules: | |
| # Check sub-score match | |
| if rule.get("sub_score") != sub_score_name: | |
| continue | |
| # Check max_score threshold | |
| if sub_score_value > rule.get("max_score", 100): | |
| continue | |
| # Check trigger feature condition | |
| trigger_feature = rule.get("trigger_feature", "") | |
| trigger_value = rule.get("trigger_value", 0.0) | |
| trigger_cond = rule.get("trigger_condition", "lt") | |
| # Look up trigger feature value in raw features | |
| feat_key = None | |
| # Try heuristic_, saliency_, ocr_, quality_, semantic_ prefixes | |
| for prefix in ["heuristic_", "saliency_", "ocr_", "quality_", "semantic_"]: | |
| candidate = f"{prefix}{trigger_feature}" | |
| if candidate in raw_features: | |
| feat_key = candidate | |
| break | |
| if feat_key is None: | |
| feat_key = trigger_feature # try raw key | |
| feat_val = raw_features.get(feat_key) | |
| if feat_val is None: | |
| continue | |
| # Evaluate condition | |
| matched = False | |
| if trigger_cond == "lt" and feat_val < trigger_value: | |
| matched = True | |
| elif trigger_cond == "gt" and feat_val > trigger_value: | |
| matched = True | |
| elif trigger_cond == "eq" and abs(feat_val - trigger_value) < 0.01: | |
| matched = True | |
| if matched: | |
| matches.append(rule) | |
| return matches | |
| def estimate_improvement(self, sub_scores: Dict[str, float], | |
| suggestions: List[Suggestion], | |
| use_case: str) -> float: | |
| """ | |
| Estimate overall score improvement if all suggestions are followed. | |
| Returns projected overall score. | |
| """ | |
| if not suggestions: | |
| return sub_scores.get("overall_score", 50.0) | |
| # Apply projected gains to each targeted sub-score | |
| projected = dict(sub_scores) | |
| # Aggregate gains per sub-score (take max, not sum — overlapping suggestions) | |
| gains_by_sub = {} | |
| for sugg in suggestions: | |
| gains_by_sub[sugg.sub_score_target] = max( | |
| gains_by_sub.get(sugg.sub_score_target, 0), | |
| sugg.projected_gain | |
| ) | |
| for sub_name, gain in gains_by_sub.items(): | |
| current = projected.get(sub_name, 0) | |
| projected[sub_name] = min(100, current + gain) | |
| # Re-aggregate (same weights as scoring engine) | |
| # For simplicity, approximate: mean of top 3 weakest improvements | |
| improved_subs = [projected[s] for s in gains_by_sub.keys()] | |
| if not improved_subs: | |
| return sub_scores.get("overall_score", 50.0) | |
| # Rough estimate: apply gains proportionally to their weights | |
| # This is approximate — in production, call ScoringEngine._aggregate() | |
| current_overall = sum(sub_scores.values()) / len(sub_scores) | |
| projected_overall = sum(projected.values()) / len(projected) | |
| return round(min(100, projected_overall), 1) | |
| def compute_improvement_potential(sub_scores: Dict[str, float]) -> float: | |
| """Compute improvement potential as meta-score.""" | |
| target = 75.0 | |
| gaps = sorted([max(0, target - s) for s in sub_scores.values() if s != "improvement_potential"], reverse=True) | |
| top3 = gaps[:3] | |
| return round(sum(top3) / len(top3) if top3 else 0.0, 1) | |