| """ |
| Workflow helper functions for the multi-role AI workflow. |
| |
| Contains: |
| - WorkflowConfig: configuration flags (strict_mode, allow_persona_roles, etc.) |
| - Output format intent detection |
| - Structured QA result parsing |
| - Role relevance metadata and selection |
| - Targeted revision logic |
| - Final answer compression / noise stripping |
| - PlannerState management |
| """ |
|
|
| import json |
| import re |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class WorkflowConfig: |
| """Runtime config flags for the multi-role workflow.""" |
| allow_persona_roles: bool = False |
| max_specialists_per_task: int = 3 |
| strict_mode: bool = True |
| always_include_qa: bool = True |
| always_include_research_for_factual_tasks: bool = True |
| require_evidence_for_factual_claims: bool = True |
|
|
| |
| PERSONA_ROLE_KEYS: tuple = ( |
| "mad_professor", "accountant", "artist", "lazy_slacker", |
| "black_metal_fundamentalist", "doris", "maga_appointee", |
| "chairman_of_board", |
| ) |
|
|
| |
| CORE_ROLE_KEYS: tuple = ( |
| "creative", "technical", "research", "security", |
| "data_analyst", "labour_union_rep", "ux_designer", "lawyer", |
| ) |
|
|
|
|
| DEFAULT_CONFIG = WorkflowConfig() |
|
|
|
|
| |
| |
| |
|
|
| TASK_CATEGORIES = [ |
| "factual_question", |
| "comparison", |
| "coding_task", |
| "creative_writing", |
| "opinion_discussion", |
| "summarization", |
| "analysis", |
| "planning", |
| ] |
|
|
| _TASK_CATEGORY_PATTERNS = [ |
| ("coding_task", [ |
| r"\bwrite\s+(python|code|javascript|typescript|rust|java|c\+\+|go|bash|sql)\b", |
| r"\bcode\s+(to|for|that)\b", r"\bimplement\b", |
| r"\bscript\s+(to|for|that)\b", r"\bparse\s+a?\s*\w+\s+(file|data)\b", |
| r"\bdebug\b", r"\brefactor\b", r"\bfix\s+(the|this|my)\s+(code|bug)\b", |
| ]), |
| ("creative_writing", [ |
| r"\bwrite\s+a\b.*\b(poem|story|essay|blog|article|song|haiku)\b", |
| r"\bcreative\s+writing\b", r"\bbrainstorm\b", r"\bimagine\b", |
| r"\bfiction\b", r"\bnarrative\b", |
| ]), |
| ("factual_question", [ |
| r"\bwhat\s+(is|are|was|were)\b", r"\bwho\s+(is|was|are|were)\b", |
| r"\bwhen\s+(did|was|is)\b", r"\bwhere\s+(is|was|are)\b", |
| r"\bhow\s+many\b", r"\bhow\s+much\b", |
| r"\bnews\b", r"\brecent\b", r"\blatest\b", r"\bcurrent\b", |
| r"\bfact\b", r"\btrue\s+or\s+false\b", |
| ]), |
| ("comparison", [ |
| r"\bcompar(e|ison|ing)\b", r"\bvs\.?\b", r"\bversus\b", |
| r"\bdifference\s+between\b", r"\bbetter\s+than\b", |
| r"\bwhich\s+is\s+(better|faster|cheaper)\b", |
| r"\bpros?\s+and\s+cons?\b", r"\btrade[\s-]?offs?\b", |
| ]), |
| ("summarization", [ |
| r"\bsummar(y|ize|ise)\b", r"\btl;?dr\b", r"\bsynopsis\b", |
| r"\boverview\b", r"\brecap\b", |
| ]), |
| ("analysis", [ |
| r"\banaly(sis|se|ze)\b", r"\bevaluat(e|ion)\b", |
| r"\bassess(ment)?\b", r"\breview\b", |
| r"\bexamin(e|ation)\b", r"\binvestigat(e|ion)\b", |
| ]), |
| ("planning", [ |
| r"\bplan\b", r"\bstrateg(y|ic)\b", r"\broadmap\b", |
| r"\baction\s+items?\b", r"\bsteps?\s+to\b", |
| ]), |
| ("opinion_discussion", [ |
| r"\bdiscuss\b", r"\bopinion\b", r"\bperspective\b", |
| r"\bpoint\s+of\s+view\b", r"\bargue\b", r"\bdebate\b", |
| r"\brole\s+of\b", |
| ]), |
| ] |
|
|
|
|
| def classify_task(user_request: str) -> str: |
| """Classify the user's request into a task category. |
| |
| Returns one of: factual_question, comparison, coding_task, creative_writing, |
| opinion_discussion, summarization, analysis, planning, other. |
| """ |
| lower = user_request.lower() |
| best_category = "other" |
| best_score = 0 |
| for category, patterns in _TASK_CATEGORY_PATTERNS: |
| score = 0 |
| for pat in patterns: |
| if re.search(pat, lower): |
| score += 1 |
| if score > best_score: |
| best_score = score |
| best_category = category |
| return best_category |
|
|
|
|
| def task_needs_evidence(task_category: str) -> bool: |
| """Whether this task category benefits from tool-backed evidence retrieval.""" |
| return task_category in ("factual_question", "comparison", "analysis", "summarization") |
|
|
|
|
| |
| |
| |
|
|
| |
| _FORMAT_PATTERNS = [ |
| ("single_choice", [ |
| r"\bpick\s+one\b", r"\bchoose\s+one\b", r"\bagree\s+on\s+one\b", |
| r"\bselect\s+one\b", r"\bjust\s+one\b", r"\bone\s+choice\b", |
| r"\bwhich\s+one\b", r"\bone\s+word\b", |
| ]), |
| ("code", [ |
| r"\bwrite\s+(python|code|javascript|typescript|rust|java|c\+\+|go|bash|sql)\b", |
| r"\bcode\s+(to|for|that)\b", r"\bimplement\b.*\b(function|class|method|script)\b", |
| r"\bgive\s+me\s+(the\s+)?code\b", r"\bscript\s+(to|for|that)\b", |
| ]), |
| ("table", [ |
| r"\bmake\s+a\s+table\b", r"\bcreate\s+a\s+table\b", r"\btable\s+comparing\b", |
| r"\bcomparison\s+table\b", r"\btabular\b", r"\bin\s+table\s+form\b", |
| ]), |
| ("bullet_list", [ |
| r"\bbullet\s*(ed)?\s*(point|list)\b", r"\blist\s+(the|all|some)\b", |
| r"\bgive\s+me\s+a\s+list\b", |
| ]), |
| ("short_answer", [ |
| r"\bshort\s+answer\b", r"\bbrief(ly)?\b", r"\bconcise(ly)?\b", |
| r"\bin\s+one\s+sentence\b", r"\byes\s+or\s+no\b", r"\bquick\s+answer\b", |
| r"\banswer\s+briefly\b", r"\bkeep\s+it\s+short\b", |
| ]), |
| ("report", [ |
| r"\breport\b", r"\banalysis\b", r"\bin[\s-]depth\b", r"\bdetailed\b", |
| r"\bcomprehensive\b", r"\btrade[\s-]?offs?\b", r"\bpros?\s+and\s+cons?\b", |
| ]), |
| ("paragraph", [ |
| r"\bexplain\b", r"\bdescribe\b", r"\bparagraph\b", |
| ]), |
| ] |
|
|
|
|
| def detect_output_format(user_request: str) -> str: |
| """Classify the expected output format from the user's request text. |
| |
| Returns one of: single_choice, short_answer, paragraph, bullet_list, |
| table, report, code, other. |
| """ |
| lower = user_request.lower() |
| for fmt, patterns in _FORMAT_PATTERNS: |
| for pat in patterns: |
| if re.search(pat, lower): |
| return fmt |
| return "other" |
|
|
|
|
| def detect_brevity_requirement(user_request: str) -> str: |
| """Detect how brief the answer should be. |
| |
| Returns: 'minimal', 'short', 'normal', or 'verbose'. |
| """ |
| lower = user_request.lower() |
|
|
| minimal_signals = [ |
| r"\bjust\s+(one|the)\b", r"\bone\s+word\b", r"\byes\s+or\s+no\b", |
| r"\bpick\s+one\b", r"\bchoose\s+one\b", r"\bagree\s+on\s+one\b", |
| ] |
| for pat in minimal_signals: |
| if re.search(pat, lower): |
| return "minimal" |
|
|
| short_signals = [ |
| r"\bshort\b", r"\bbrief(ly)?\b", r"\bconcise(ly)?\b", |
| r"\bquick\b", r"\bsimple\b", r"\bkeep\s+it\s+short\b", |
| ] |
| for pat in short_signals: |
| if re.search(pat, lower): |
| return "short" |
|
|
| verbose_signals = [ |
| r"\bdetailed\b", r"\bin[\s-]depth\b", r"\bcomprehensive\b", |
| r"\bthorough(ly)?\b", r"\bfull\s+report\b", |
| ] |
| for pat in verbose_signals: |
| if re.search(pat, lower): |
| return "verbose" |
|
|
| return "normal" |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class QAIssue: |
| type: str |
| message: str |
| owner: str |
|
|
|
|
| @dataclass |
| class QAResult: |
| status: str |
| reason: str = "" |
| issues: List[QAIssue] = field(default_factory=list) |
| warnings: List[str] = field(default_factory=list) |
| correction_instruction: str = "" |
|
|
| @property |
| def passed(self) -> bool: |
| return self.status in ("PASS", "PASS_WITH_WARNINGS") |
|
|
| @property |
| def passed_with_warnings(self) -> bool: |
| return self.status == "PASS_WITH_WARNINGS" |
|
|
| def owners(self) -> List[str]: |
| """Return unique owner labels from issues.""" |
| return list(dict.fromkeys(issue.owner for issue in self.issues)) |
|
|
| def to_dict(self) -> dict: |
| return { |
| "status": self.status, |
| "reason": self.reason, |
| "issues": [ |
| {"type": i.type, "message": i.message, "owner": i.owner} |
| for i in self.issues |
| ], |
| "warnings": self.warnings, |
| "correction_instruction": self.correction_instruction, |
| } |
|
|
|
|
| def parse_structured_qa(qa_text: str) -> QAResult: |
| """Parse QA output into a structured QAResult. |
| |
| Tries JSON first (if QA produced structured output), |
| then falls back to the legacy text format. |
| """ |
| |
| json_match = re.search(r'\{[\s\S]*"status"\s*:', qa_text) |
| if json_match: |
| |
| start = json_match.start() |
| brace_count = 0 |
| end = start |
| for i, ch in enumerate(qa_text[start:], start): |
| if ch == '{': |
| brace_count += 1 |
| elif ch == '}': |
| brace_count -= 1 |
| if brace_count == 0: |
| end = i + 1 |
| break |
| try: |
| data = json.loads(qa_text[start:end]) |
| issues = [] |
| for item in data.get("issues", []): |
| issues.append(QAIssue( |
| type=item.get("type", "other"), |
| message=item.get("message", ""), |
| owner=item.get("owner", "Synthesizer"), |
| )) |
| return QAResult( |
| status=data.get("status", "FAIL"), |
| reason=data.get("reason", ""), |
| issues=issues, |
| warnings=[str(w) for w in data.get("warnings", [])], |
| correction_instruction=data.get("correction_instruction", ""), |
| ) |
| except (json.JSONDecodeError, KeyError): |
| pass |
|
|
| |
| status = "FAIL" |
| lower = qa_text.lower() |
| if "result: pass_with_warnings" in lower: |
| status = "PASS_WITH_WARNINGS" |
| elif "result: pass" in lower: |
| status = "PASS" |
|
|
| reason = "" |
| if "ISSUES FOUND:" in qa_text: |
| section = qa_text.split("ISSUES FOUND:", 1)[1] |
| for header in ("ROLE-SPECIFIC FEEDBACK:", "RESULT:", "RECOMMENDED FIXES:"): |
| if header in section: |
| section = section.split(header, 1)[0] |
| break |
| reason = section.strip() |
|
|
| correction = "" |
| if "RECOMMENDED FIXES:" in qa_text: |
| correction = qa_text.split("RECOMMENDED FIXES:", 1)[1].strip() |
|
|
| |
| issues = [] |
| if "ROLE-SPECIFIC FEEDBACK:" in qa_text: |
| fb_section = qa_text.split("ROLE-SPECIFIC FEEDBACK:", 1)[1] |
| for header in ("RESULT:", "RECOMMENDED FIXES:"): |
| if header in fb_section: |
| fb_section = fb_section.split(header, 1)[0] |
| break |
| for line in fb_section.strip().splitlines(): |
| line = line.strip().lstrip("•-* ") |
| if ":" not in line: |
| continue |
| role_label, _, feedback = line.partition(":") |
| feedback = feedback.strip() |
| if feedback and feedback.lower() not in ("satisfactory", "n/a", "none"): |
| issues.append(QAIssue( |
| type="other", |
| message=feedback, |
| owner=role_label.strip(), |
| )) |
|
|
| |
| if not issues and reason and reason.lower() not in ("none", "n/a", "none."): |
| issues.append(QAIssue(type="other", message=reason, owner="Synthesizer")) |
|
|
| return QAResult( |
| status=status, |
| reason=reason, |
| issues=issues, |
| correction_instruction=correction, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| ROLE_RELEVANCE: Dict[str, Dict[str, Any]] = { |
| "creative": { |
| "keywords": ["brainstorm", "ideas", "creative", "naming", "slogan", "marketing", |
| "framing", "wording", "concept", "design", "brand"], |
| "domains": ["marketing", "content", "writing", "communication"], |
| "description": "Ideas, framing, wording, brainstorming", |
| "role_type": "creative", |
| "task_types": ["creative_writing", "opinion_discussion"], |
| }, |
| "technical": { |
| "keywords": ["code", "implement", "build", "architecture", "api", "database", |
| "debug", "software", "programming", "algorithm", "system", "deploy", |
| "python", "javascript", "rust", "java", "react", "vue", "svelte", |
| "framework", "library", "performance", "faster"], |
| "domains": ["engineering", "development", "devops", "infrastructure"], |
| "description": "Code, architecture, implementation, technical solutions", |
| "role_type": "factual", |
| "task_types": ["coding_task", "analysis"], |
| }, |
| "research": { |
| "keywords": ["research", "study", "evidence", "literature", "paper", "facts", |
| "history", "compare", "comparison", "analysis", "data", "statistics", |
| "science", "scientific", "information"], |
| "domains": ["academia", "science", "fact-finding"], |
| "description": "Information gathering, literature review, fact-finding", |
| "role_type": "factual", |
| "task_types": ["factual_question", "comparison", "analysis", "summarization"], |
| }, |
| "security": { |
| "keywords": ["security", "vulnerability", "attack", "encryption", "auth", |
| "password", "exploit", "firewall", "compliance", "gdpr", "privacy"], |
| "domains": ["cybersecurity", "infosec", "compliance"], |
| "description": "Security analysis, vulnerability checks, best practices", |
| "role_type": "safety", |
| "task_types": ["analysis"], |
| }, |
| "data_analyst": { |
| "keywords": ["data", "analytics", "statistics", "pattern", "trend", "metric", |
| "dashboard", "visualization", "dataset", "csv", "spreadsheet"], |
| "domains": ["analytics", "business intelligence"], |
| "description": "Data analysis, statistics, pattern recognition, insights", |
| "role_type": "analytical", |
| "task_types": ["analysis", "comparison", "factual_question"], |
| }, |
| "labour_union_rep": { |
| "keywords": ["worker", "wages", "union", "labor", "labour", "employment", |
| "rights", "workplace", "collective", "bargaining", "fair"], |
| "domains": ["labor relations", "HR", "workplace policy"], |
| "description": "Worker rights, fair wages, job security", |
| "role_type": "analytical", |
| "task_types": ["opinion_discussion", "analysis"], |
| }, |
| "ux_designer": { |
| "keywords": ["user", "usability", "accessibility", "interface", "ux", "ui", |
| "design", "wireframe", "prototype", "user experience", "user-friendly"], |
| "domains": ["design", "product", "UX"], |
| "description": "User needs, usability, accessibility", |
| "role_type": "analytical", |
| "task_types": ["analysis", "planning"], |
| }, |
| "lawyer": { |
| "keywords": ["legal", "law", "contract", "liability", "compliance", "regulation", |
| "patent", "copyright", "trademark", "lawsuit", "litigation"], |
| "domains": ["law", "compliance", "governance"], |
| "description": "Legal compliance, liability, contracts, risk management", |
| "role_type": "analytical", |
| "task_types": ["analysis"], |
| }, |
| |
| "mad_professor": { |
| "keywords": ["crazy", "radical", "hypothesis", "experiment", "breakthrough"], |
| "domains": ["speculation"], |
| "description": "Radical scientific hypotheses, extreme speculation", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "accountant": { |
| "keywords": ["cost", "budget", "expense", "cheap", "price", "financial"], |
| "domains": ["finance"], |
| "description": "Cost scrutiny, budget optimization", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "artist": { |
| "keywords": ["art", "inspiration", "vision", "aesthetic", "beauty"], |
| "domains": ["art"], |
| "description": "Unhinged creative vision, cosmic vibes", |
| "role_type": "persona", |
| "task_types": ["creative_writing"], |
| "is_persona": True, |
| }, |
| "lazy_slacker": { |
| "keywords": ["lazy", "shortcut", "easy", "simple", "quick"], |
| "domains": [], |
| "description": "Minimum viable effort, shortcuts", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "black_metal_fundamentalist": { |
| "keywords": ["metal", "kvlt", "underground", "nihilism"], |
| "domains": [], |
| "description": "Nihilistic kvlt critique", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "doris": { |
| "keywords": [], |
| "domains": [], |
| "description": "Well-meaning but clueless observations", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "chairman_of_board": { |
| "keywords": ["shareholder", "board", "governance", "strategic", "corporate"], |
| "domains": ["corporate governance"], |
| "description": "Corporate governance, shareholder value", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| "maga_appointee": { |
| "keywords": ["america", "patriot", "deregulation"], |
| "domains": [], |
| "description": "America First perspective", |
| "role_type": "persona", |
| "task_types": [], |
| "is_persona": True, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| ROLE_CAPABILITIES: Dict[str, List[str]] = { |
| "creative": ["creative", "design", "ideas", "writing", "brainstorm", "opinion"], |
| "technical": ["technical", "analysis", "engineering", "calculation", "code", "implementation"], |
| "research": ["research", "facts", "evidence", "information", "comparison", "history"], |
| "security": ["risk", "safety", "compliance", "security", "vulnerability"], |
| "data_analyst": ["data", "statistics", "analysis", "metrics", "patterns"], |
| "labour_union_rep": ["labor", "policy", "workplace", "rights", "fairness"], |
| "ux_designer": ["design", "usability", "interface", "user_experience", "accessibility"], |
| "lawyer": ["legal", "compliance", "contracts", "liability", "regulation"], |
| "mad_professor": ["persona", "speculation", "radical", "humor"], |
| "accountant": ["persona", "cost", "budget", "financial"], |
| "artist": ["persona", "creative", "aesthetic", "vision"], |
| "lazy_slacker": ["persona", "simple_answer", "minimal"], |
| "black_metal_fundamentalist": ["persona", "stylistic", "humor", "music"], |
| "doris": ["persona", "humor"], |
| "chairman_of_board": ["persona", "strategy", "corporate", "governance"], |
| "maga_appointee": ["persona", "political", "deregulation"], |
| } |
|
|
| |
| TASK_FEATURE_KEYWORDS: Dict[str, List[str]] = { |
| "analysis": ["analy", "evaluate", "assess", "review", "examine", "investigate"], |
| "creative": ["creative", "brainstorm", "ideas", "imagine", "invent", "story", "write a"], |
| "design": ["design", "wireframe", "prototype", "layout", "visual", "ui", "ux", "interface", "usability", |
| "user experience", "accessibility", "login page", "user interface"], |
| "technical": ["code", "implement", "build", "architecture", "api", "debug", "software", "program", |
| "algorithm", "system", "deploy", "python", "javascript", "rust", "java", "react", |
| "framework", "performance"], |
| "research": ["research", "study", "evidence", "literature", "paper", "facts", "history", |
| "information", "find out", "look up"], |
| "policy": ["policy", "regulation", "law", "compliance", "legal", "rights", "labor", "labour", |
| "union", "worker", "employment", "workplace"], |
| "simple_answer": ["yes or no", "pick one", "choose", "which one", "red or blue", "agree on one"], |
| "opinion": ["opinion", "perspective", "viewpoint", "discuss", "debate", "pros and cons", |
| "should i", "what do you think", "agree", "disagree"], |
| "comparison": ["compare", "comparison", "versus", "vs", "difference", "better"], |
| "data": ["data", "statistics", "metric", "trend", "pattern", "chart", "dashboard", "csv", |
| "spreadsheet", "dataset"], |
| "security": ["security", "vulnerability", "attack", "encryption", "password", "exploit", |
| "firewall", "gdpr", "privacy"], |
| "cost": ["cost", "budget", "expense", "cheap", "price", "financial", "roi"], |
| "humor": ["funny", "joke", "humorous", "kvlt", "metal", "nihil"], |
| "music": ["music", "metal", "band", "song", "album", "guitar"], |
| } |
|
|
| |
| _GENERALIST_ROLES = ("creative", "technical", "research") |
|
|
|
|
| def extract_task_features(user_request: str, task_category: str = "other") -> List[str]: |
| """Derive task features from the user request and task category. |
| |
| Returns a deduplicated list of feature tags like ["design", "opinion"]. |
| """ |
| lower = user_request.lower() |
| features: List[str] = [] |
|
|
| for feature, keywords in TASK_FEATURE_KEYWORDS.items(): |
| for kw in keywords: |
| if kw in lower: |
| features.append(feature) |
| break |
|
|
| |
| category_features: Dict[str, List[str]] = { |
| "coding_task": ["technical", "code"], |
| "creative_writing": ["creative"], |
| "factual_question": ["research"], |
| "comparison": ["comparison", "research"], |
| "analysis": ["analysis"], |
| "summarization": ["research"], |
| "opinion_discussion": ["opinion"], |
| "planning": ["analysis"], |
| } |
| for f in category_features.get(task_category, []): |
| if f not in features: |
| features.append(f) |
|
|
| return features |
|
|
|
|
| @dataclass |
| class RoleScore: |
| """Scoring details for a single role — used for transparent logging.""" |
| role_key: str |
| role_label: str |
| score: int |
| matched_capabilities: List[str] |
| is_persona: bool |
| filtered_reason: str = "" |
|
|
|
|
| def score_roles( |
| task_features: List[str], |
| active_role_keys: List[str], |
| config: WorkflowConfig, |
| task_category: str = "other", |
| ) -> List[RoleScore]: |
| """Score each active role by capability overlap with task features. |
| |
| Returns all RoleScore objects (including filtered ones) for transparency. |
| """ |
| feature_set = set(task_features) |
| results: List[RoleScore] = [] |
|
|
| |
| for role_key in active_role_keys: |
| capabilities = ROLE_CAPABILITIES.get(role_key, []) |
| meta = ROLE_RELEVANCE.get(role_key, {}) |
| is_persona = meta.get("is_persona", False) |
| role_label = meta.get("description", role_key) |
|
|
| |
| matched = [cap for cap in capabilities if cap in feature_set] |
| score = len(matched) |
|
|
| |
| role_tasks = meta.get("task_types", []) |
| if task_category in role_tasks: |
| score += 2 |
|
|
| rs = RoleScore( |
| role_key=role_key, |
| role_label=role_label, |
| score=score, |
| matched_capabilities=matched, |
| is_persona=is_persona, |
| ) |
|
|
| |
| if is_persona and not config.allow_persona_roles: |
| rs.filtered_reason = "persona role not allowed" |
|
|
| results.append(rs) |
|
|
| return results |
|
|
|
|
| def select_relevant_roles( |
| user_request: str, |
| active_role_keys: List[str], |
| config: WorkflowConfig, |
| task_category: str = "other", |
| ) -> List[str]: |
| """Select the most relevant specialist roles for a given request. |
| |
| Uses capability-based scoring: extracts task features from the request, |
| scores each active role by capability overlap, and returns the top roles |
| up to ``config.max_specialists_per_task``. |
| |
| Returns a ``_SelectionResult`` (list subclass) so callers can also access |
| ``.scoring_info`` for transparent trace logging. |
| """ |
| task_features = extract_task_features(user_request, task_category) |
| role_scores = score_roles(task_features, active_role_keys, config, task_category) |
|
|
| |
| eligible = [rs for rs in role_scores if not rs.filtered_reason] |
| eligible.sort(key=lambda rs: (-rs.score, active_role_keys.index(rs.role_key))) |
|
|
| selected: List[str] = [] |
| for rs in eligible: |
| if len(selected) >= config.max_specialists_per_task: |
| break |
| |
| if config.strict_mode and rs.score == 0 and selected: |
| continue |
| selected.append(rs.role_key) |
|
|
| |
| if not selected and eligible: |
| selected.append(eligible[0].role_key) |
|
|
| |
| if not selected: |
| for rk in _GENERALIST_ROLES: |
| if rk in active_role_keys: |
| selected.append(rk) |
| break |
|
|
| |
| if (config.always_include_research_for_factual_tasks |
| and task_needs_evidence(task_category) |
| and "research" in active_role_keys |
| and "research" not in selected): |
| selected.append("research") |
|
|
| return _SelectionResult(selected, role_scores, task_features) |
|
|
|
|
| class _SelectionResult(list): |
| """A list of role keys with attached scoring metadata. |
| |
| Behaves exactly like a ``list[str]`` so existing code continues to work, |
| but also carries ``scoring_info`` and ``task_features`` for trace logging. |
| """ |
|
|
| def __init__( |
| self, |
| selected: List[str], |
| scoring_info: List[RoleScore], |
| task_features: List[str], |
| ): |
| super().__init__(selected) |
| self.scoring_info = scoring_info |
| self.task_features = task_features |
|
|
| def format_trace(self, role_labels: Optional[Dict[str, str]] = None) -> str: |
| """Return a human-readable ROLE SCORING trace block.""" |
| lines = ["── ROLE SCORING ──"] |
| lines.append(f"Task features: {self.task_features}") |
| for rs in sorted(self.scoring_info, key=lambda r: -r.score): |
| label = (role_labels or {}).get(rs.role_key, rs.role_key) |
| status = "SELECTED" if rs.role_key in self else "skipped" |
| if rs.filtered_reason: |
| status = f"FILTERED ({rs.filtered_reason})" |
| caps = ", ".join(rs.matched_capabilities) if rs.matched_capabilities else "none" |
| lines.append( |
| f" {label}: score={rs.score} caps=[{caps}] → {status}" |
| ) |
| lines.append("──────────────────") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def identify_revision_targets( |
| qa_result: QAResult, |
| role_label_to_key: Dict[str, str], |
| ) -> List[str]: |
| """Given a QAResult, return the list of role keys that need rerunning. |
| |
| Rules: |
| - Format/brevity issues → Synthesizer only (returned as "synthesizer") |
| - Issues owned by a specific specialist → that specialist key |
| - Issues owned by Planner → "planner" |
| - If no clear owner → "synthesizer" (default) |
| """ |
| targets = [] |
| for issue in qa_result.issues: |
| owner = issue.owner.strip() |
|
|
| if owner.lower() in ("synthesizer", "synthesis"): |
| if "synthesizer" not in targets: |
| targets.append("synthesizer") |
| elif owner.lower() == "planner": |
| if "planner" not in targets: |
| targets.append("planner") |
| else: |
| |
| key = role_label_to_key.get(owner) |
| if key and key not in targets: |
| targets.append(key) |
| elif "synthesizer" not in targets: |
| |
| targets.append("synthesizer") |
|
|
| |
| for issue in qa_result.issues: |
| if issue.type in ("format", "brevity", "directness") and "synthesizer" not in targets: |
| targets.append("synthesizer") |
|
|
| if not targets: |
| targets.append("synthesizer") |
|
|
| return targets |
|
|
|
|
| |
| |
| |
|
|
| |
| _INTERNAL_NOISE_HEADERS = [ |
| "TASK BREAKDOWN:", "ROLE TO CALL:", "SUCCESS CRITERIA:", |
| "GUIDANCE FOR SPECIALIST:", "PERSPECTIVES SUMMARY:", |
| "COMMON GROUND:", "TENSIONS AND TRADE-OFFS:", |
| "REQUIREMENTS CHECKED:", "ISSUES FOUND:", |
| "ROLE-SPECIFIC FEEDBACK:", "RESULT:", "RECOMMENDED FIXES:", |
| "DECISION: APPROVED", "DECISION: REVISE", |
| "REVISED INSTRUCTIONS:", "REVISION", |
| "SOURCES CONSULTED:", "SECURITY ANALYSIS:", |
| "VULNERABILITIES FOUND:", "COST ANALYSIS:", |
| "COST-CUTTING MEASURES:", "CHEAPEST VIABLE APPROACH:", |
| "KVLT VERDICT:", "WHAT THE MAINSTREAM GETS WRONG:", |
| "COSMIC VISION:", "DO WE EVEN NEED TO DO THIS:", |
| "WORKER IMPACT:", "UNION CONCERNS:", |
| "BOARD PERSPECTIVE:", "STRATEGIC CONCERNS:", |
| "AMERICA FIRST ANALYSIS:", "DEEP STATE CONCERNS:", |
| "LEGAL ANALYSIS:", "LIABILITIES AND RISKS:", |
| "WHAT DORIS THINKS IS HAPPENING:", |
| ] |
|
|
|
|
| def strip_internal_noise(text: str) -> str: |
| """Remove internal workflow headers/sections from text intended for the user.""" |
| lines = text.split("\n") |
| cleaned = [] |
| skip_until_next = False |
|
|
| for line in lines: |
| stripped = line.strip() |
| |
| is_noise = False |
| for header in _INTERNAL_NOISE_HEADERS: |
| if stripped.startswith(header): |
| is_noise = True |
| skip_until_next = True |
| break |
|
|
| if is_noise: |
| continue |
|
|
| |
| |
| if skip_until_next: |
| if stripped == "": |
| continue |
| |
| if stripped.endswith(":") and stripped == stripped.upper(): |
| skip_until_next = False |
| elif not stripped.startswith("•") and not stripped.startswith("-") and not stripped.startswith("*"): |
| skip_until_next = False |
|
|
| if not skip_until_next: |
| cleaned.append(line) |
|
|
| result = "\n".join(cleaned).strip() |
| return result if result else text |
|
|
|
|
| def compress_final_answer( |
| draft: str, |
| output_format: str, |
| brevity: str, |
| user_request: str, |
| ) -> str: |
| """Apply rule-based compression to the final answer. |
| |
| This does NOT call the LLM — it applies deterministic rules to trim |
| the answer. The LLM-based compression happens in the synthesizer. |
| """ |
| |
| answer = strip_internal_noise(draft) |
|
|
| |
| if output_format == "single_choice" and brevity == "minimal": |
| |
| for marker in ("UNIFIED RECOMMENDATION:", "RECOMMENDED DRAFT:", "FINAL ANSWER:"): |
| if marker in answer: |
| answer = answer.split(marker, 1)[1].strip() |
| |
| paragraphs = answer.split("\n\n") |
| if paragraphs: |
| answer = paragraphs[0].strip() |
| break |
|
|
| |
| if output_format == "short_answer" or brevity in ("minimal", "short"): |
| |
| if len(answer) > 500 and brevity == "minimal": |
| |
| paragraphs = [p.strip() for p in answer.split("\n\n") if p.strip()] |
| if paragraphs: |
| answer = paragraphs[0] |
|
|
| return answer |
|
|
|
|
| def postprocess_format_fixes(text: str) -> str: |
| """Apply lightweight format fixes before QA evaluation. |
| |
| Converts common formatting artefacts so QA can focus on content quality |
| rather than failing for cosmetic issues. |
| """ |
| |
| text = re.sub(r'^#{1,4}\s+', '', text, flags=re.MULTILINE) |
|
|
| |
| def _bullets_to_sentences(m: re.Match) -> str: |
| lines = m.group(0).strip().splitlines() |
| sentences = [] |
| for line in lines: |
| cleaned = re.sub(r'^\s*[-•*]\s+', '', line).strip() |
| if cleaned: |
| |
| if cleaned[-1] not in '.!?': |
| cleaned += '.' |
| sentences.append(cleaned) |
| return ' '.join(sentences) |
|
|
| text = re.sub( |
| r'(?:^\s*[-•*]\s+.+\n?){2,}', |
| _bullets_to_sentences, |
| text, |
| flags=re.MULTILINE, |
| ) |
|
|
| |
| text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
| |
| text = re.sub(r'\{[^{}]*"status"\s*:[^{}]*\}', '', text) |
|
|
| return text.strip() |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class FailureRecord: |
| """Record of a single failure in the workflow.""" |
| revision: int |
| owner: str |
| issue_type: str |
| message: str |
| correction: str |
|
|
| def to_dict(self) -> dict: |
| return { |
| "revision": self.revision, |
| "owner": self.owner, |
| "issue_type": self.issue_type, |
| "message": self.message, |
| "correction": self.correction, |
| } |
|
|
|
|
| @dataclass |
| class PlannerState: |
| """Persistent state object that tracks the planner's decisions through revisions. |
| |
| This is the central working memory for the workflow. |
| All stages read from and write to this shared state. |
| """ |
| user_request: str = "" |
| task_summary: str = "" |
| task_category: str = "other" |
| success_criteria: List[str] = field(default_factory=list) |
| output_format: str = "other" |
| brevity_requirement: str = "normal" |
| selected_roles: List[str] = field(default_factory=list) |
| specialist_outputs: Dict[str, str] = field(default_factory=dict) |
| evidence: Optional[Dict] = None |
| task_assumptions: Dict[str, str] = field(default_factory=dict) |
| current_draft: str = "" |
| qa_result: Optional[QAResult] = None |
| revision_count: int = 0 |
| max_revisions: int = 3 |
| revision_instruction: str = "" |
| failure_history: List[FailureRecord] = field(default_factory=list) |
| history: List[Dict[str, str]] = field(default_factory=list) |
| final_answer: str = "" |
|
|
| def record_event(self, event_type: str, detail: str): |
| self.history.append({"type": event_type, "detail": detail[:500]}) |
|
|
| def record_failure(self, qa_result: QAResult): |
| """Record QA failures into the failure history.""" |
| for issue in qa_result.issues: |
| self.failure_history.append(FailureRecord( |
| revision=self.revision_count, |
| owner=issue.owner, |
| issue_type=issue.type, |
| message=issue.message[:200], |
| correction=qa_result.correction_instruction[:200], |
| )) |
|
|
| def has_repeated_failure(self, owner: str, issue_type: str) -> bool: |
| """Check if the same owner+issue_type has failed in a previous revision.""" |
| past = [ |
| f for f in self.failure_history |
| if f.owner == owner and f.issue_type == issue_type |
| and f.revision < self.revision_count |
| ] |
| return len(past) >= 1 |
|
|
| def get_repeat_failures(self) -> List[Tuple[str, str]]: |
| """Return (owner, issue_type) pairs that have failed more than once.""" |
| counts: Dict[Tuple[str, str], int] = {} |
| for f in self.failure_history: |
| key = (f.owner, f.issue_type) |
| counts[key] = counts.get(key, 0) + 1 |
| return [k for k, v in counts.items() if v >= 2] |
|
|
| def get_escalation_strategy(self) -> str: |
| """Determine escalation strategy when failures repeat. |
| |
| Returns: |
| 'narrow_scope' — reduce role count and simplify |
| 'rewrite_from_state' — synthesizer should rewrite from state, not reuse draft |
| 'suppress_role' — a specific role keeps introducing unsupported content |
| 'none' — no escalation needed |
| """ |
| repeats = self.get_repeat_failures() |
| if not repeats: |
| return "none" |
|
|
| synth_repeats = [(o, t) for o, t in repeats if o.lower() in ("synthesizer", "synthesis")] |
| role_repeats = [(o, t) for o, t in repeats if o.lower() not in ("synthesizer", "synthesis", "planner")] |
|
|
| if synth_repeats: |
| return "rewrite_from_state" |
| if role_repeats: |
| return "suppress_role" |
| return "narrow_scope" |
|
|
| def get_roles_to_suppress(self) -> List[str]: |
| """Return role owners that keep introducing repeated failures.""" |
| repeats = self.get_repeat_failures() |
| return list({owner for owner, _ in repeats |
| if owner.lower() not in ("synthesizer", "synthesis", "planner")}) |
|
|
| def to_context_string(self) -> str: |
| """Produce a compact summary string for inclusion in LLM prompts.""" |
| lines = [ |
| f"Task category: {self.task_category}", |
| f"Output format required: {self.output_format}", |
| f"Brevity requirement: {self.brevity_requirement}", |
| f"Revision: {self.revision_count}/{self.max_revisions}", |
| f"Selected roles: {', '.join(self.selected_roles)}", |
| ] |
| if self.success_criteria: |
| lines.append(f"Success criteria: {'; '.join(self.success_criteria)}") |
| if self.task_assumptions: |
| assumptions_str = "; ".join(f"{k}: {v}" for k, v in self.task_assumptions.items()) |
| lines.append(f"Shared assumptions: {assumptions_str}") |
| if self.evidence: |
| conf = self.evidence.get("confidence", "unknown") |
| n_items = len(self.evidence.get("results", [])) |
| lines.append(f"Evidence: {n_items} items (confidence: {conf})") |
| if self.revision_instruction: |
| lines.append(f"Revision instruction: {self.revision_instruction}") |
| if self.qa_result and not self.qa_result.passed: |
| lines.append(f"QA status: FAIL — {self.qa_result.reason}") |
| if self.qa_result.correction_instruction: |
| lines.append(f"Correction needed: {self.qa_result.correction_instruction}") |
| if self.failure_history: |
| lines.append(f"Previous failures: {len(self.failure_history)}") |
| strategy = self.get_escalation_strategy() |
| if strategy != "none": |
| lines.append(f"Escalation strategy: {strategy}") |
| return "\n".join(lines) |
|
|
| def to_state_dict(self) -> dict: |
| """Serialise the full state to a dictionary.""" |
| return { |
| "user_request": self.user_request, |
| "task_summary": self.task_summary, |
| "task_category": self.task_category, |
| "success_criteria": self.success_criteria, |
| "output_format": self.output_format, |
| "brevity_requirement": self.brevity_requirement, |
| "selected_roles": self.selected_roles, |
| "specialist_outputs": self.specialist_outputs, |
| "evidence": self.evidence, |
| "task_assumptions": self.task_assumptions, |
| "current_draft": self.current_draft[:500], |
| "revision_count": self.revision_count, |
| "max_revisions": self.max_revisions, |
| "revision_instruction": self.revision_instruction, |
| "failure_history": [f.to_dict() for f in self.failure_history], |
| "final_answer": self.final_answer[:500] if self.final_answer else "", |
| } |
|
|
|
|
| |
| |
| |
|
|
| def get_synthesizer_format_instruction(output_format: str, brevity: str) -> str: |
| """Return format-specific instructions to append to the synthesizer prompt.""" |
| instructions = { |
| "single_choice": ( |
| "CRITICAL FORMAT RULE: The user wants ONE SINGLE CHOICE.\n" |
| "Output ONLY the chosen option and at most one short justification sentence.\n" |
| "Do NOT include perspectives summary, common ground, trade-offs, or any multi-section structure.\n" |
| "Example: 'Veggie — it accommodates the widest range of dietary needs.'" |
| ), |
| "short_answer": ( |
| "CRITICAL FORMAT RULE: The user wants a SHORT, DIRECT answer.\n" |
| "Output 1-3 sentences maximum. No sections, no headers, no perspectives summary.\n" |
| "Answer the question directly and stop." |
| ), |
| "code": ( |
| "CRITICAL FORMAT RULE: The user wants CODE output.\n" |
| "Output the code directly. Only include a brief explanation if explicitly requested.\n" |
| "Do NOT include perspectives summary, trade-offs, or multi-section structure." |
| ), |
| "table": ( |
| "CRITICAL FORMAT RULE: The user wants a TABLE.\n" |
| "Output a properly formatted markdown table.\n" |
| "Do NOT include perspectives summary or prose-only answers." |
| ), |
| "bullet_list": ( |
| "CRITICAL FORMAT RULE: The user wants a BULLET LIST.\n" |
| "Output a clean bullet list. Do NOT wrap it in prose or add unnecessary sections." |
| ), |
| "paragraph": ( |
| "Output a clear, well-structured paragraph. Keep it focused and avoid unnecessary sections." |
| ), |
| "report": ( |
| "The user wants a detailed report. You may include sections like summary, " |
| "trade-offs, and perspectives, but keep each section concise." |
| ), |
| } |
|
|
| base = instructions.get(output_format, "Match the output format to what the user requested.") |
|
|
| if brevity == "minimal": |
| base += "\nBREVITY: Absolute minimum. Fewer words is better." |
| elif brevity == "short": |
| base += "\nBREVITY: Keep it concise. No unnecessary elaboration." |
|
|
| return base |
|
|
|
|
| def get_qa_format_instruction(output_format: str, brevity: str) -> str: |
| """Return format-specific validation rules for the QA prompt.""" |
| rules = [] |
| if output_format == "single_choice": |
| rules.append("FAIL if the output contains more than one choice or a long multi-section answer.") |
| rules.append("FAIL if the output includes perspectives summary, common ground, or trade-offs.") |
| elif output_format == "short_answer": |
| rules.append("FAIL if the output is longer than 3-4 sentences.") |
| rules.append("FAIL if the output includes unnecessary sections or headers.") |
| elif output_format == "code": |
| rules.append("FAIL if the output is mostly prose with no code.") |
| elif output_format == "table": |
| rules.append("FAIL if the output does not contain a markdown table.") |
| if brevity in ("minimal", "short"): |
| rules.append("FAIL if the output is excessively verbose for a brevity requirement.") |
| return "\n".join(rules) if rules else "" |
|
|
|
|
| |
| |
| |
|
|
| def validate_output_format(text: str, output_format: str, brevity: str) -> List[str]: |
| """Check structural format constraints before QA. |
| |
| Returns a list of violation descriptions. Empty list means the output is valid. |
| This catches common structural problems that the synthesizer repeatedly ignores |
| (e.g., bullet lists when paragraph-only was requested). |
| """ |
| violations: List[str] = [] |
| stripped = text.strip() |
| if not stripped: |
| violations.append("Output is empty.") |
| return violations |
|
|
| has_bullets = bool(re.search(r"^[\s]*[-•*]\s", stripped, re.MULTILINE)) |
| has_numbered = bool(re.search(r"^[\s]*\d+[.)]\s", stripped, re.MULTILINE)) |
| has_headings = bool(re.search(r"^#{1,4}\s", stripped, re.MULTILINE)) |
| has_table = bool(re.search(r"\|.*\|.*\|", stripped)) |
| has_code_block = "```" in stripped |
| line_count = len([ln for ln in stripped.splitlines() if ln.strip()]) |
|
|
| if output_format == "paragraph": |
| if has_bullets or has_numbered: |
| violations.append("Paragraph format requested but output contains bullet/numbered lists.") |
| if has_headings: |
| violations.append("Paragraph format requested but output contains markdown headings.") |
| if has_table: |
| violations.append("Paragraph format requested but output contains a table.") |
|
|
| elif output_format == "code": |
| if not has_code_block and not re.search(r"(?:def |class |import |function |const |let |var )", stripped): |
| violations.append("Code format requested but output contains no code block or recognisable code.") |
|
|
| elif output_format == "table": |
| if not has_table: |
| violations.append("Table format requested but output contains no markdown table.") |
|
|
| elif output_format == "single_choice": |
| if line_count > 5: |
| violations.append("Single choice requested but output is multi-section (too many lines).") |
| if has_bullets and line_count > 3: |
| violations.append("Single choice requested but output contains a bullet list.") |
|
|
| |
| if brevity == "minimal" and line_count > 8: |
| violations.append(f"Minimal brevity requested but output has {line_count} lines.") |
| elif brevity == "short" and line_count > 20: |
| violations.append(f"Short brevity requested but output has {line_count} lines.") |
|
|
| return violations |
|
|
|
|
| def format_violations_instruction(violations: List[str]) -> str: |
| """Turn format violation descriptions into a synthesis rewrite instruction.""" |
| return ( |
| "FORMAT VIOLATIONS DETECTED — you MUST fix these before QA:\n" |
| + "\n".join(f"- {v}" for v in violations) |
| + "\nRewrite the output to satisfy the required format strictly." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def parse_task_assumptions(plan_text: str) -> Dict[str, str]: |
| """Extract TASK ASSUMPTIONS from planner output. |
| |
| Looks for lines like 'key: value' under a TASK ASSUMPTIONS: header. |
| Returns a dict of assumption key → value. |
| """ |
| assumptions: Dict[str, str] = {} |
| if "TASK ASSUMPTIONS:" not in plan_text: |
| return assumptions |
|
|
| section = plan_text.split("TASK ASSUMPTIONS:", 1)[1] |
| |
| for header in ( |
| "TASK BREAKDOWN:", "ROLE TO CALL:", "SUCCESS CRITERIA:", |
| "GUIDANCE FOR SPECIALIST:", "REVISED INSTRUCTIONS:", |
| ): |
| if header in section: |
| section = section.split(header, 1)[0] |
| break |
|
|
| for line in section.strip().splitlines(): |
| line = line.strip().lstrip("•-* ") |
| if ":" not in line: |
| continue |
| key, _, value = line.partition(":") |
| key = key.strip().lower().replace(" ", "_") |
| value = value.strip() |
| if key and value: |
| assumptions[key] = value |
|
|
| return assumptions |
|
|
|
|
| def format_assumptions_for_prompt(assumptions: Dict[str, str]) -> str: |
| """Format shared assumptions for injection into specialist prompts.""" |
| if not assumptions: |
| return "" |
| lines = ["SHARED TASK ASSUMPTIONS (use these — do NOT invent your own):"] |
| for key, value in assumptions.items(): |
| lines.append(f" - {key}: {value}") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| |
| STRUCTURED_OUTPUT_SUFFIX = """ |
| |
| IMPORTANT — OUTPUT FORMAT: |
| After your analysis above, you MUST also output a JSON block at the end of your response, |
| wrapped in ```json ... ``` fences, with this exact structure: |
| ```json |
| { |
| "role": "<your role name>", |
| "main_points": ["point 1", "point 2"], |
| "recommendations": ["recommendation 1"], |
| "evidence": ["supporting evidence or examples"], |
| "assumptions": ["assumption 1"], |
| "confidence": "high | medium | low" |
| } |
| ``` |
| - "main_points": your key substantive contributions to the answer (2-4 points) |
| - "recommendations": specific actionable recommendations (0-3) |
| - "evidence": facts, data, or examples that support your points (0-3) |
| - "assumptions": assumptions you relied on (0-2) |
| - "confidence": how confident you are in your contribution |
| |
| This JSON block is REQUIRED. The Synthesizer will use it to build the final answer. |
| Do NOT write a complete final answer — focus on your domain-specific contribution. |
| """ |
|
|
|
|
| @dataclass |
| class StructuredContribution: |
| """Structured output from an expert specialist.""" |
| role: str |
| main_points: List[str] = field(default_factory=list) |
| recommendations: List[str] = field(default_factory=list) |
| evidence: List[str] = field(default_factory=list) |
| assumptions: List[str] = field(default_factory=list) |
| confidence: str = "medium" |
| raw_output: str = "" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "role": self.role, |
| "main_points": self.main_points, |
| "recommendations": self.recommendations, |
| "evidence": self.evidence, |
| "assumptions": self.assumptions, |
| "confidence": self.confidence, |
| } |
|
|
| def has_substance(self) -> bool: |
| """Check if this contribution has at least one substantive point.""" |
| return bool(self.main_points or self.recommendations) |
|
|
|
|
| def parse_structured_contribution(text: str, role: str) -> StructuredContribution: |
| """Parse a StructuredContribution from specialist LLM output. |
| |
| Tries to extract a JSON block from the text. Falls back to heuristic |
| extraction from section headers if JSON is missing or malformed. |
| """ |
| contribution = StructuredContribution(role=role, raw_output=text) |
|
|
| |
| json_match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) |
| if not json_match: |
| |
| json_match = re.search(r'(\{\s*"role"\s*:.*\})', text, re.DOTALL) |
|
|
| if json_match: |
| try: |
| data = json.loads(json_match.group(1)) |
| contribution.main_points = data.get("main_points", []) |
| contribution.recommendations = data.get("recommendations", []) |
| contribution.evidence = data.get("evidence", []) |
| contribution.assumptions = data.get("assumptions", []) |
| contribution.confidence = data.get("confidence", "medium") |
| if data.get("role"): |
| contribution.role = data["role"] |
| return contribution |
| except (json.JSONDecodeError, AttributeError): |
| pass |
|
|
| |
| _extract_section_points(text, contribution) |
| return contribution |
|
|
|
|
| def _extract_section_points(text: str, contribution: StructuredContribution): |
| """Heuristic fallback: extract key points from section-based specialist output.""" |
| lines = text.strip().splitlines() |
| current_section = "" |
| buffer: List[str] = [] |
|
|
| |
| section_map = { |
| |
| "ideas": "main_points", "rationale": "main_points", |
| "technical approach": "main_points", "implementation notes": "recommendations", |
| "evidence summary": "evidence", "key findings": "evidence", |
| "security analysis": "main_points", "vulnerabilities found": "main_points", |
| "recommendations": "recommendations", |
| "data overview": "main_points", "analysis": "main_points", |
| "insights": "recommendations", |
| |
| "wild hypothesis": "main_points", "scientific rationale": "evidence", |
| "groundbreaking implications": "main_points", |
| "cost analysis": "main_points", "cost-cutting measures": "recommendations", |
| "cosmic vision": "main_points", "wild storm of ideas": "main_points", |
| "minimum viable effort": "main_points", |
| "kvlt verdict": "main_points", "the grim truth": "main_points", |
| "worker impact": "main_points", "union concerns": "main_points", |
| "collective bargaining position": "recommendations", |
| "user needs analysis": "main_points", "pain points": "main_points", |
| "ux recommendations": "recommendations", |
| "what doris thinks is happening": "main_points", |
| "doris's thoughts": "main_points", |
| "board perspective": "main_points", "strategic concerns": "main_points", |
| "shareholder value": "recommendations", |
| "america first analysis": "main_points", |
| "making it great again": "recommendations", |
| "legal analysis": "main_points", "liabilities and risks": "main_points", |
| "legal recommendations": "recommendations", |
| } |
|
|
| def flush_buffer(): |
| if current_section and buffer: |
| field_name = section_map.get(current_section.lower().rstrip(":"), "") |
| if field_name: |
| combined = " ".join(ln.strip().lstrip("•-*0123456789.) ") for ln in buffer if ln.strip()) |
| if combined: |
| target = getattr(contribution, field_name) |
| target.append(combined[:300]) |
|
|
| for line in lines: |
| header_match = re.match(r"^([A-Z][A-Z\s\'']+):?\s*$", line.strip()) |
| if header_match: |
| flush_buffer() |
| current_section = header_match.group(1).strip() |
| buffer = [] |
| else: |
| |
| if re.match(r"^[A-Z][A-Z\s]+DRAFT:?\s*$", line.strip()): |
| flush_buffer() |
| current_section = "" |
| buffer = [] |
| elif current_section: |
| buffer.append(line) |
|
|
| flush_buffer() |
|
|
|
|
| def format_contributions_for_synthesizer( |
| contributions: Dict[str, "StructuredContribution"], |
| ) -> str: |
| """Format structured expert contributions for the Synthesizer prompt. |
| |
| Presents each expert's key points, recommendations, and evidence |
| so the Synthesizer can build the final answer from them. |
| """ |
| if not contributions: |
| return "" |
| parts = ["STRUCTURED EXPERT CONTRIBUTIONS:"] |
| for role_key, contrib in contributions.items(): |
| role_label = contrib.role |
| section = [f"\n=== {role_label} (confidence: {contrib.confidence}) ==="] |
| if contrib.main_points: |
| section.append("Main points:") |
| for i, pt in enumerate(contrib.main_points): |
| section.append(f" [{i}] {pt}") |
| if contrib.recommendations: |
| section.append("Recommendations:") |
| for i, rec in enumerate(contrib.recommendations): |
| section.append(f" [{i}] {rec}") |
| if contrib.evidence: |
| section.append("Evidence:") |
| for ev in contrib.evidence: |
| section.append(f" - {ev}") |
| if contrib.assumptions: |
| section.append("Assumptions:") |
| for a in contrib.assumptions: |
| section.append(f" - {a}") |
| parts.append("\n".join(section)) |
| return "\n\n".join(parts) |
|
|
|
|
| def format_contributions_for_qa( |
| contributions: Dict[str, "StructuredContribution"], |
| used_contributions: Dict[str, List[str]], |
| ) -> str: |
| """Format contribution data for QA to verify expert influence.""" |
| if not contributions: |
| return "" |
| parts = ["EXPERT CONTRIBUTION TRACEABILITY:"] |
| for role_key, contrib in contributions.items(): |
| role_label = contrib.role |
| used = used_contributions.get(role_key, []) |
| section = [f"\n=== {role_label} ==="] |
| if contrib.main_points: |
| for i, pt in enumerate(contrib.main_points): |
| tag = "USED" if f"main_points[{i}]" in used else "NOT USED" |
| section.append(f" main_points[{i}] [{tag}]: {pt}") |
| if contrib.recommendations: |
| for i, rec in enumerate(contrib.recommendations): |
| tag = "USED" if f"recommendations[{i}]" in used else "NOT USED" |
| section.append(f" recommendations[{i}] [{tag}]: {rec}") |
| parts.append("\n".join(section)) |
|
|
| used_count = sum(len(v) for v in used_contributions.values()) |
| total_points = sum( |
| len(c.main_points) + len(c.recommendations) for c in contributions.values() |
| ) |
| parts.append(f"\nSummary: {used_count}/{total_points} expert contributions marked as used.") |
| return "\n".join(parts) |
|
|
|
|
| def parse_used_contributions(text: str) -> Dict[str, List[str]]: |
| """Parse the Synthesizer's USED_CONTRIBUTIONS JSON block from its output. |
| |
| Returns a dict mapping role_key → list of contribution references |
| like ["main_points[0]", "recommendations[1]"]. |
| """ |
| |
| json_match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) |
| if json_match: |
| try: |
| data = json.loads(json_match.group(1)) |
| if "used_contributions" in data: |
| return data["used_contributions"] |
| except (json.JSONDecodeError, AttributeError): |
| pass |
|
|
| |
| if "USED_CONTRIBUTIONS:" in text: |
| section = text.split("USED_CONTRIBUTIONS:", 1)[1] |
| |
| json_match = re.search(r"(\{.*?\})", section, re.DOTALL) |
| if json_match: |
| try: |
| return json.loads(json_match.group(1)) |
| except (json.JSONDecodeError, AttributeError): |
| pass |
|
|
| return {} |
|
|
|
|
| def check_expert_influence( |
| contributions: Dict[str, "StructuredContribution"], |
| used_contributions: Dict[str, List[str]], |
| final_answer: str, |
| ) -> List[str]: |
| """Check whether the final answer materially uses expert contributions. |
| |
| Returns a list of influence issues (empty = influence is adequate). |
| """ |
| issues: List[str] = [] |
| if not contributions: |
| return issues |
|
|
| |
| total_used = sum(len(refs) for refs in used_contributions.values()) |
| total_available = sum( |
| len(c.main_points) + len(c.recommendations) |
| for c in contributions.values() if c.has_substance() |
| ) |
| if total_available > 0 and total_used == 0: |
| issues.append( |
| "Final answer does not materially incorporate any specialist contributions." |
| ) |
| return issues |
|
|
| |
| for role_key, contrib in contributions.items(): |
| if not contrib.has_substance(): |
| continue |
| role_refs = used_contributions.get(role_key, []) |
| if not role_refs: |
| issues.append( |
| f"Expert '{contrib.role}' provided substantive points but none were used." |
| ) |
|
|
| |
| |
| answer_lower = final_answer.lower() |
| expert_words_found = 0 |
| for contrib in contributions.values(): |
| for pt in contrib.main_points: |
| |
| words = [w for w in re.findall(r"\b\w{3,}\b", pt.lower()) |
| if w not in ("the", "and", "for", "that", "this", "with", "from", "are", "was")] |
| matches = sum(1 for w in words if w in answer_lower) |
| if matches >= 2: |
| expert_words_found += 1 |
| if expert_words_found == 0 and total_available > 0: |
| issues.append( |
| "Final answer appears to not reflect expert contribution content." |
| ) |
|
|
| return issues |
|
|