Babajaan's picture
Full Viral Images v1.0 implementation - all modules and configs
6ceaa94 verified
"""Recommendation engine — turns score weaknesses into actionable suggestions."""
import os
import yaml
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class Suggestion:
priority: int
sub_score_target: str
message: str
projected_gain: float
trigger_feature: str = ""
trigger_value: float = 0.0
class RecommendationEngine:
"""Generates ranked improvement suggestions from sub-scores and raw features."""
def __init__(self, rules_path: Optional[str] = None):
self.rules = self._load_rules(rules_path)
self.target_score = 75.0
self.max_suggestions = 5
def _load_rules(self, rules_path: Optional[str]) -> List[Dict[str, Any]]:
"""Load suggestion rules from YAML."""
if rules_path and os.path.exists(rules_path):
with open(rules_path, 'r') as f:
data = yaml.safe_load(f) or {}
return data.get("rules", [])
# Try default paths
for path in ['configs/suggestion_rules.yaml', 'viral-images/configs/suggestion_rules.yaml']:
if os.path.exists(path):
with open(path, 'r') as f:
data = yaml.safe_load(f) or {}
return data.get("rules", [])
return []
def generate(self, sub_scores: Dict[str, float],
raw_features: Dict[str, Any],
use_case: str = "default") -> List[Suggestion]:
"""
Generate ranked improvement suggestions.
Args:
sub_scores: 8 sub-scores [0-100]
raw_features: Raw extracted features
use_case: Current use case preset
Returns:
List of Suggestion objects, ranked by priority.
"""
suggestions = []
# Sort sub-scores ascending to find weakest areas
weakest = sorted(sub_scores.items(), key=lambda x: x[1])
# Try to match each weak sub-score with a specific rule
for sub_name, sub_value in weakest:
# Skip improvement_potential (meta-score)
if sub_name == "improvement_potential":
continue
# Find matching rules for this sub-score
matching = self._find_matching_rules(sub_name, sub_value, raw_features)
# Take best matching rule
for rule in matching[:2]: # max 2 per sub-score
gain = min(rule.get("max_gain", 15.0), self.target_score - sub_value)
gain = max(0, gain)
sugg = Suggestion(
priority=0, # assigned later
sub_score_target=sub_name,
message=rule.get("message", ""),
projected_gain=gain,
trigger_feature=rule.get("trigger_feature", ""),
trigger_value=rule.get("trigger_value", 0.0)
)
suggestions.append(sugg)
# Rank by: (gap to target) × (max_gain weight) = most impactful first
for sugg in suggestions:
current = sub_scores.get(sugg.sub_score_target, 50)
gap = max(0, self.target_score - current)
# Priority score = gap × projected_gain
sugg.priority = gap * sugg.projected_gain
# Sort descending by priority, then take top N
suggestions.sort(key=lambda x: -x.priority)
suggestions = suggestions[:self.max_suggestions]
# Renumber priorities 1..N
for i, sugg in enumerate(suggestions, 1):
sugg.priority = i
return suggestions
def _find_matching_rules(self, sub_score_name: str, sub_score_value: float,
raw_features: Dict[str, Any]) -> List[Dict]:
"""Find rules that match this sub-score's weakness."""
matches = []
for rule in self.rules:
# Check sub-score match
if rule.get("sub_score") != sub_score_name:
continue
# Check max_score threshold
if sub_score_value > rule.get("max_score", 100):
continue
# Check trigger feature condition
trigger_feature = rule.get("trigger_feature", "")
trigger_value = rule.get("trigger_value", 0.0)
trigger_cond = rule.get("trigger_condition", "lt")
# Look up trigger feature value in raw features
feat_key = None
# Try heuristic_, saliency_, ocr_, quality_, semantic_ prefixes
for prefix in ["heuristic_", "saliency_", "ocr_", "quality_", "semantic_"]:
candidate = f"{prefix}{trigger_feature}"
if candidate in raw_features:
feat_key = candidate
break
if feat_key is None:
feat_key = trigger_feature # try raw key
feat_val = raw_features.get(feat_key)
if feat_val is None:
continue
# Evaluate condition
matched = False
if trigger_cond == "lt" and feat_val < trigger_value:
matched = True
elif trigger_cond == "gt" and feat_val > trigger_value:
matched = True
elif trigger_cond == "eq" and abs(feat_val - trigger_value) < 0.01:
matched = True
if matched:
matches.append(rule)
return matches
def estimate_improvement(self, sub_scores: Dict[str, float],
suggestions: List[Suggestion],
use_case: str) -> float:
"""
Estimate overall score improvement if all suggestions are followed.
Returns projected overall score.
"""
if not suggestions:
return sub_scores.get("overall_score", 50.0)
# Apply projected gains to each targeted sub-score
projected = dict(sub_scores)
# Aggregate gains per sub-score (take max, not sum — overlapping suggestions)
gains_by_sub = {}
for sugg in suggestions:
gains_by_sub[sugg.sub_score_target] = max(
gains_by_sub.get(sugg.sub_score_target, 0),
sugg.projected_gain
)
for sub_name, gain in gains_by_sub.items():
current = projected.get(sub_name, 0)
projected[sub_name] = min(100, current + gain)
# Re-aggregate (same weights as scoring engine)
# For simplicity, approximate: mean of top 3 weakest improvements
improved_subs = [projected[s] for s in gains_by_sub.keys()]
if not improved_subs:
return sub_scores.get("overall_score", 50.0)
# Rough estimate: apply gains proportionally to their weights
# This is approximate — in production, call ScoringEngine._aggregate()
current_overall = sum(sub_scores.values()) / len(sub_scores)
projected_overall = sum(projected.values()) / len(projected)
return round(min(100, projected_overall), 1)
def compute_improvement_potential(sub_scores: Dict[str, float]) -> float:
"""Compute improvement potential as meta-score."""
target = 75.0
gaps = sorted([max(0, target - s) for s in sub_scores.values() if s != "improvement_potential"], reverse=True)
top3 = gaps[:3]
return round(sum(top3) / len(top3) if top3 else 0.0, 1)