| import os |
| import re |
| import uuid |
| import random |
| import warnings |
| import traceback |
| from contextvars import ContextVar |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Tuple, TypedDict |
|
|
| import requests |
| from dotenv import load_dotenv |
|
|
| from workflow_helpers import ( |
| WorkflowConfig, DEFAULT_CONFIG, |
| detect_output_format, detect_brevity_requirement, |
| classify_task, task_needs_evidence, |
| QAResult, parse_structured_qa, QAIssue, |
| PlannerState, FailureRecord, |
| select_relevant_roles, identify_revision_targets, |
| compress_final_answer, strip_internal_noise, |
| postprocess_format_fixes, |
| get_synthesizer_format_instruction, get_qa_format_instruction, |
| validate_output_format, format_violations_instruction, |
| parse_task_assumptions, format_assumptions_for_prompt, |
| ROLE_RELEVANCE, |
| STRUCTURED_OUTPUT_SUFFIX, |
| StructuredContribution, parse_structured_contribution, |
| format_contributions_for_synthesizer, format_contributions_for_qa, |
| parse_used_contributions, check_expert_influence, |
| ) |
| from evidence import ( |
| EvidenceResult, EvidenceItem, |
| WebSearchAdapter, WikipediaAdapter, ArxivAdapter, |
| ResearchToolAdapter, |
| gather_evidence, extract_search_queries, |
| detect_unsupported_claims, |
| format_evidence_for_prompt, format_evidence_for_qa, |
| ) |
|
|
| warnings.filterwarnings("ignore", category=UserWarning, module="wikipedia") |
| load_dotenv() |
|
|
| if os.path.exists("/data"): |
| os.environ.setdefault("HF_HOME", "/data/.huggingface") |
|
|
| import gradio as gr |
| import yfinance as yf |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
|
|
| from huggingface_hub.errors import HfHubHTTPError |
|
|
| from langchain_core.tools import tool |
| from langchain_core.messages import SystemMessage, HumanMessage |
| from langchain.agents import create_agent |
| from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper |
| from langchain_community.tools import DuckDuckGoSearchRun, ArxivQueryRun |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
|
|
|
|
| |
| |
| |
|
|
| HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_TOKEN") |
| if not HF_TOKEN: |
| raise ValueError("Missing Hugging Face token. Set HUGGINGFACEHUB_API_TOKEN or HF_TOKEN.") |
|
|
| MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "1024")) |
| CHART_DIR = "charts" |
| os.makedirs(CHART_DIR, exist_ok=True) |
|
|
| MODEL_OPTIONS = [ |
| |
| "meta-llama/Llama-3.1-8B-Instruct", |
| "meta-llama/Llama-3.3-70B-Instruct", |
|
|
| |
| "openai/gpt-oss-20b", |
| "openai/gpt-oss-120b", |
|
|
| |
| "Qwen/Qwen3-VL-8B-Instruct", |
| "Qwen/Qwen2.5-7B-Instruct", |
| "Qwen/Qwen3-8B", |
| "Qwen/Qwen3-32B", |
|
|
| |
| "baidu/ERNIE-4.5-21B-A3B-PT", |
|
|
| |
| "deepseek-ai/DeepSeek-R1", |
| "deepseek-ai/DeepSeek-V3-0324", |
|
|
| |
| "zai-org/GLM-5", |
| "zai-org/GLM-4.7", |
| "zai-org/GLM-4.6", |
| "zai-org/GLM-4.5", |
|
|
| |
| "MiniMaxAI/MiniMax-M2.5", |
| "moonshotai/Kimi-K2.5", |
| "moonshotai/Kimi-K2-Instruct-0905", |
| ] |
|
|
| DEFAULT_MODEL_ID = "openai/gpt-oss-20b" |
|
|
| MODEL_NOTES = { |
| "meta-llama/Llama-3.1-8B-Instruct": "Provider model. May require gated access depending on your token.", |
| "meta-llama/Llama-3.3-70B-Instruct": "Large provider model. Likely slower and may hit rate limits.", |
| "openai/gpt-oss-20b": "Provider model. Good showcase option if available in your enabled providers.", |
| "openai/gpt-oss-120b": "Large provider model. May call tools but sometimes fail to return final text.", |
| "Qwen/Qwen3-VL-8B-Instruct": "Vision-language model. In this text-only UI it behaves as text-only.", |
| "Qwen/Qwen2.5-7B-Instruct": "Provider model. Usually a safer text-only fallback.", |
| "Qwen/Qwen3-8B": "Provider model. Availability depends on enabled providers.", |
| "Qwen/Qwen3-32B": "Large provider model. Availability depends on enabled providers.", |
| "baidu/ERNIE-4.5-21B-A3B-PT": "Provider model. Availability depends on enabled providers.", |
| "deepseek-ai/DeepSeek-R1": "Provider model. Availability depends on enabled providers.", |
| "deepseek-ai/DeepSeek-V3-0324": "Provider model. Availability depends on enabled providers.", |
| "zai-org/GLM-5": "Provider model. Availability depends on enabled providers.", |
| "zai-org/GLM-4.7": "Provider model. Availability depends on enabled providers.", |
| "zai-org/GLM-4.6": "Provider model. Availability depends on enabled providers.", |
| "zai-org/GLM-4.5": "Provider model. Availability depends on enabled providers.", |
| "MiniMaxAI/MiniMax-M2.5": "Provider model. Availability depends on enabled providers.", |
| "moonshotai/Kimi-K2.5": "Provider model. Availability depends on enabled providers.", |
| "moonshotai/Kimi-K2-Instruct-0905": "Provider model. Availability depends on enabled providers.", |
| } |
|
|
| LLM_CACHE: Dict[str, object] = {} |
| AGENT_CACHE: Dict[Tuple[str, Tuple[str, ...]], object] = {} |
| RUNTIME_HEALTH: Dict[str, str] = {} |
|
|
| |
| _client_location: ContextVar[str] = ContextVar("client_location", default="") |
|
|
|
|
| |
| |
| |
|
|
| try: |
| ddg_search = DuckDuckGoSearchRun() |
| except Exception: |
| ddg_search = None |
|
|
| arxiv_tool = ArxivQueryRun( |
| api_wrapper=ArxivAPIWrapper( |
| top_k_results=3, |
| doc_content_chars_max=1200, |
| ) |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _build_research_adapters() -> List[ResearchToolAdapter]: |
| """Build available research tool adapters from the shared wrappers.""" |
| adapters: List[ResearchToolAdapter] = [] |
| if ddg_search is not None: |
| adapters.append(WebSearchAdapter(ddg_search.run)) |
| wiki = WikipediaAPIWrapper() |
| adapters.append(WikipediaAdapter(wiki.run)) |
| adapters.append(ArxivAdapter(arxiv_tool.run)) |
| return adapters |
|
|
|
|
| |
| |
| |
|
|
| def model_status_text(model_id: str) -> str: |
| note = MODEL_NOTES.get(model_id, "Provider model.") |
| health = RUNTIME_HEALTH.get(model_id) |
|
|
| if health == "ok": |
| return note |
| if health == "unavailable": |
| return note + " This model previously failed because no enabled provider supported it." |
| if health == "gated": |
| return note + " This model previously failed due to access restrictions." |
| if health == "rate_limited": |
| return note + " This model previously hit rate limiting." |
| if health == "empty_final": |
| return note + " This model previously called tools but returned no final assistant text." |
| if health == "error": |
| return note + " This model previously failed with a backend/runtime error." |
| return note |
|
|
|
|
| def build_provider_chat(model_id: str): |
| if model_id in LLM_CACHE: |
| return LLM_CACHE[model_id] |
|
|
| llm = HuggingFaceEndpoint( |
| repo_id=model_id, |
| task="text-generation", |
| provider="auto", |
| huggingfacehub_api_token=HF_TOKEN, |
| max_new_tokens=MAX_NEW_TOKENS, |
| temperature=0.1, |
| timeout=120, |
| ) |
| chat = ChatHuggingFace(llm=llm) |
| LLM_CACHE[model_id] = chat |
| return chat |
|
|
|
|
| |
| |
| |
|
|
| def save_line_chart( |
| title: str, |
| x_values: List[str], |
| y_values: List[float], |
| x_label: str = "X", |
| y_label: str = "Y", |
| ) -> str: |
| path = os.path.join(CHART_DIR, f"{uuid.uuid4().hex}.png") |
|
|
| fig, ax = plt.subplots(figsize=(9, 4.8)) |
| ax.plot(x_values, y_values) |
| ax.set_title(title) |
| ax.set_xlabel(x_label) |
| ax.set_ylabel(y_label) |
| ax.grid(True) |
| fig.autofmt_xdate() |
| fig.tight_layout() |
| fig.savefig(path, bbox_inches="tight") |
| plt.close(fig) |
|
|
| return path |
|
|
|
|
| def extract_chart_path(text: str) -> Optional[str]: |
| if not text: |
| return None |
|
|
| match = re.search(r"Chart saved to:\s*(.+\.png)", text) |
| if not match: |
| return None |
|
|
| candidate = match.group(1).strip() |
| if os.path.exists(candidate): |
| return candidate |
|
|
| abs_path = os.path.abspath(candidate) |
| if os.path.exists(abs_path): |
| return abs_path |
|
|
| return None |
|
|
|
|
| def content_to_text(content) -> str: |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| parts = [] |
| for item in content: |
| if isinstance(item, str): |
| parts.append(item) |
| elif isinstance(item, dict) and "text" in item: |
| parts.append(item["text"]) |
| else: |
| parts.append(str(item)) |
| return "\n".join(parts).strip() |
| return str(content) |
|
|
|
|
| def short_text(text: str, limit: int = 1200) -> str: |
| text = text or "" |
| return text if len(text) <= limit else text[:limit] + "..." |
|
|
|
|
| |
| |
| |
|
|
| @tool |
| def add_numbers(a: float, b: float) -> float: |
| """Add two numbers.""" |
| return a + b |
|
|
|
|
| @tool |
| def subtract_numbers(a: float, b: float) -> float: |
| """Subtract the second number from the first.""" |
| return a - b |
|
|
|
|
| @tool |
| def multiply_numbers(a: float, b: float) -> float: |
| """Multiply two numbers.""" |
| return a * b |
|
|
|
|
| @tool |
| def divide_numbers(a: float, b: float) -> float: |
| """Divide the first number by the second.""" |
| if b == 0: |
| raise ValueError("Cannot divide by zero.") |
| return a / b |
|
|
|
|
| @tool |
| def power(a: float, b: float) -> float: |
| """Raise the first number to the power of the second.""" |
| return a ** b |
|
|
|
|
| @tool |
| def square_root(a: float) -> float: |
| """Calculate the square root of a number.""" |
| if a < 0: |
| raise ValueError("Cannot calculate square root of a negative number.") |
| return a ** 0.5 |
|
|
|
|
| @tool |
| def percentage(part: float, whole: float) -> float: |
| """Calculate what percentage the first value is of the second value.""" |
| if whole == 0: |
| raise ValueError("Whole cannot be zero.") |
| return (part / whole) * 100 |
|
|
|
|
| @tool |
| def search_wikipedia(query: str) -> str: |
| """Search Wikipedia for stable factual information.""" |
| wiki = WikipediaAPIWrapper() |
| return wiki.run(query) |
|
|
|
|
| @tool |
| def web_search(query: str) -> str: |
| """Search the web for recent or changing information.""" |
| if ddg_search is None: |
| return "Web search is unavailable because DDGS is not available." |
| return ddg_search.run(query) |
|
|
|
|
| @tool |
| def search_arxiv(query: str) -> str: |
| """Search arXiv for scientific papers and research literature.""" |
| return arxiv_tool.run(query) |
|
|
|
|
| @tool |
| def get_current_utc_time(_: str = "") -> str: |
| """Return the current UTC date and time.""" |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| @tool |
| def get_stock_price(ticker: str) -> str: |
| """Get the latest recent close price for a stock, ETF, index, or crypto ticker.""" |
| ticker = ticker.upper().strip() |
| t = yf.Ticker(ticker) |
| hist = t.history(period="5d") |
|
|
| if hist.empty: |
| return f"No recent market data found for {ticker}." |
|
|
| last = float(hist["Close"].iloc[-1]) |
| return f"{ticker} latest close: {last:.2f}" |
|
|
|
|
| @tool |
| def get_stock_history(ticker: str, period: str = "6mo") -> str: |
| """Get historical closing prices for a ticker and generate a chart image.""" |
| ticker = ticker.upper().strip() |
| t = yf.Ticker(ticker) |
| hist = t.history(period=period) |
|
|
| if hist.empty: |
| return f"No historical market data found for {ticker}." |
|
|
| x_vals = [str(d.date()) for d in hist.index] |
| y_vals = [float(v) for v in hist["Close"].tolist()] |
|
|
| chart_path = save_line_chart( |
| title=f"{ticker} closing price ({period})", |
| x_values=x_vals, |
| y_values=y_vals, |
| x_label="Date", |
| y_label="Close", |
| ) |
|
|
| start_close = y_vals[0] |
| end_close = y_vals[-1] |
| pct = ((end_close - start_close) / start_close) * 100 if start_close else 0.0 |
|
|
| return ( |
| f"Ticker: {ticker}\n" |
| f"Period: {period}\n" |
| f"Points: {len(y_vals)}\n" |
| f"Start close: {start_close:.2f}\n" |
| f"End close: {end_close:.2f}\n" |
| f"Performance: {pct:+.2f}%\n" |
| f"Chart saved to: {chart_path}" |
| ) |
|
|
|
|
| @tool |
| def generate_line_chart( |
| title: str, |
| x_values: list, |
| y_values: list, |
| x_label: str = "X", |
| y_label: str = "Y", |
| ) -> str: |
| """Generate a line chart from x and y values and save it as an image file.""" |
| chart_path = save_line_chart(title, x_values, y_values, x_label=x_label, y_label=y_label) |
| return f"Chart saved to: {chart_path}" |
|
|
|
|
| @tool |
| def wikipedia_chaos_oracle(query: str) -> str: |
| """Generate a weird chaotic text mashup based on Wikipedia content.""" |
| wiki = WikipediaAPIWrapper() |
| text = wiki.run(query) |
|
|
| if not text: |
| return "The chaos oracle found only silence." |
|
|
| words = re.findall(r"\w+", text) |
| if not words: |
| return "The chaos oracle found no usable words." |
|
|
| random.shuffle(words) |
| return " ".join(words[:30]) |
|
|
|
|
| @tool |
| def random_number(min_value: int, max_value: int) -> int: |
| """Generate a random integer between the minimum and maximum values.""" |
| return random.randint(min_value, max_value) |
|
|
|
|
| @tool |
| def generate_uuid(_: str = "") -> str: |
| """Generate a random UUID string.""" |
| return str(uuid.uuid4()) |
|
|
|
|
| @tool |
| def get_user_location(_: str = "") -> str: |
| """Determine the user's precise physical location using browser GPS/WiFi coordinates or IP fallback.""" |
| location_data = _client_location.get() |
|
|
| |
| if location_data and not location_data.startswith("ip:"): |
| try: |
| lat_str, lon_str = location_data.split(",", 1) |
| lat, lon = float(lat_str), float(lon_str) |
| except ValueError: |
| return "Location lookup failed: invalid coordinate data." |
| try: |
| headers = {"User-Agent": "HFAgent/1.0 (location lookup)"} |
| resp = requests.get( |
| "https://nominatim.openstreetmap.org/reverse", |
| params={"lat": lat, "lon": lon, "format": "json", "addressdetails": 1}, |
| headers=headers, |
| timeout=8, |
| ) |
| resp.raise_for_status() |
| data = resp.json() |
| addr = data.get("address", {}) |
| city = ( |
| addr.get("city") |
| or addr.get("town") |
| or addr.get("village") |
| or addr.get("municipality") |
| or addr.get("county") |
| or "N/A" |
| ) |
| return ( |
| f"City: {city}\n" |
| f"County: {addr.get('county', 'N/A')}\n" |
| f"Region: {addr.get('state', 'N/A')}\n" |
| f"Country: {addr.get('country', 'N/A')} ({addr.get('country_code', 'N/A').upper()})\n" |
| f"Latitude: {lat}\n" |
| f"Longitude: {lon}\n" |
| f"Source: Browser GPS/WiFi (precise)" |
| ) |
| except requests.RequestException as exc: |
| return f"Reverse geocoding failed: {exc}" |
|
|
| |
| client_ip = location_data[3:] if location_data.startswith("ip:") else "" |
| url = f"http://ip-api.com/json/{client_ip}" if client_ip else "http://ip-api.com/json/" |
| try: |
| response = requests.get(url, timeout=5) |
| response.raise_for_status() |
| data = response.json() |
| if data.get("status") != "success": |
| return f"Location lookup failed: {data.get('message', 'unknown error')}" |
| return ( |
| f"City: {data.get('city', 'N/A')}\n" |
| f"Region: {data.get('regionName', 'N/A')}\n" |
| f"Country: {data.get('country', 'N/A')} ({data.get('countryCode', 'N/A')})\n" |
| f"Latitude: {data.get('lat', 'N/A')}\n" |
| f"Longitude: {data.get('lon', 'N/A')}\n" |
| f"Timezone: {data.get('timezone', 'N/A')}\n" |
| f"ISP: {data.get('isp', 'N/A')}\n" |
| f"Source: IP geolocation (approximate)" |
| ) |
| except requests.RequestException as exc: |
| return f"Location lookup failed: {exc}" |
|
|
|
|
| ALL_TOOLS = { |
| "add_numbers": add_numbers, |
| "subtract_numbers": subtract_numbers, |
| "multiply_numbers": multiply_numbers, |
| "divide_numbers": divide_numbers, |
| "power": power, |
| "square_root": square_root, |
| "percentage": percentage, |
| "search_wikipedia": search_wikipedia, |
| "web_search": web_search, |
| "search_arxiv": search_arxiv, |
| "get_current_utc_time": get_current_utc_time, |
| "get_stock_price": get_stock_price, |
| "get_stock_history": get_stock_history, |
| "generate_line_chart": generate_line_chart, |
| "wikipedia_chaos_oracle": wikipedia_chaos_oracle, |
| "random_number": random_number, |
| "generate_uuid": generate_uuid, |
| "get_user_location": get_user_location, |
| } |
| TOOL_NAMES = list(ALL_TOOLS.keys()) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| MAX_REVISIONS = 3 |
|
|
| AGENT_ROLES = { |
| "planner": "Planner", |
| "creative": "Creative Expert", |
| "technical": "Technical Expert", |
| "qa_tester": "QA Tester", |
| "research": "Research Analyst", |
| "security": "Security Reviewer", |
| "data_analyst": "Data Analyst", |
| "mad_professor": "Mad Professor", |
| "accountant": "Accountant", |
| "artist": "Artist", |
| "lazy_slacker": "Lazy Slacker", |
| "black_metal_fundamentalist": "Black Metal Fundamentalist", |
| "labour_union_rep": "Labour Union Representative", |
| "ux_designer": "UX Designer", |
| "doris": "Doris", |
| "chairman_of_board": "Chairman of the Board", |
| "maga_appointee": "MAGA Appointee", |
| "lawyer": "Lawyer", |
| } |
| |
| _ROLE_LABEL_TO_KEY = {v: k for k, v in AGENT_ROLES.items()} |
|
|
|
|
| class WorkflowState(TypedDict): |
| """Shared, inspectable state object threaded through the whole workflow.""" |
| user_request: str |
| plan: str |
| current_role: str |
| creative_output: str |
| technical_output: str |
| research_output: str |
| security_output: str |
| data_analyst_output: str |
| mad_professor_output: str |
| accountant_output: str |
| artist_output: str |
| lazy_slacker_output: str |
| black_metal_fundamentalist_output: str |
| labour_union_rep_output: str |
| ux_designer_output: str |
| doris_output: str |
| chairman_of_board_output: str |
| maga_appointee_output: str |
| lawyer_output: str |
| synthesis_output: str |
| draft_output: str |
| qa_report: str |
| qa_role_feedback: Dict[str, str] |
| qa_passed: bool |
| revision_count: int |
| final_answer: str |
| |
| output_format: str |
| brevity_requirement: str |
| qa_structured: Optional[dict] |
| task_assumptions: Dict[str, str] |
| revision_instruction: str |
| structured_contributions: Dict[str, dict] |
| used_contributions: Dict[str, List[str]] |
|
|
|
|
| |
|
|
| _PLANNER_SYSTEM_BASE = ( |
| "You are the Planner in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| "Your ONLY job is to PLAN and DELEGATE. You do NOT write the answer.\n\n" |
| "Your responsibilities:\n" |
| "1. Break the user's task into clear subtasks.\n" |
| "2. Decide which specialist to call as the PRIMARY lead.\n" |
| " IMPORTANT: Select the FEWEST roles necessary. Do NOT call all roles.\n" |
| " Available specialists:\n" |
| "{specialist_list}" |
| "3. State clear success criteria.\n" |
| "4. Identify the required output format and brevity level.\n" |
| "5. Define shared assumptions that ALL specialists must use.\n" |
| "6. Write delegation instructions (what each specialist should focus on).\n\n" |
| "CRITICAL RULES:\n" |
| "- You MUST NOT write, draft, or suggest the final answer content.\n" |
| "- You MUST NOT include example answers, sample text, or draft responses.\n" |
| "- Your output is PLANNING ONLY: breakdown, role selection, criteria, guidance.\n" |
| "- The specialists will create the content. The Synthesizer will combine it.\n" |
| "- For simple questions, ONE specialist is enough.\n" |
| "- Never call persona/gimmick roles unless the user explicitly asks for them.\n" |
| "- Only select from the specialists listed above โ no others are available.\n" |
| "- QA results are BINDING โ if QA says FAIL, you MUST revise, never approve.\n\n" |
| "Respond in this exact format:\n" |
| "TASK BREAKDOWN:\n<subtask list โ what needs to be addressed, NOT the answers>\n\n" |
| "TASK ASSUMPTIONS:\n<shared assumptions all specialists must use, e.g. cost model, " |
| "coverage rate, units, scope, time frame โ one per line as 'key: value'>\n\n" |
| "ROLE TO CALL: <specialist name>\n\n" |
| "SUCCESS CRITERIA:\n<what a correct, complete answer looks like>\n\n" |
| "GUIDANCE FOR SPECIALIST:\n<delegation instructions โ what to focus on, NOT answer content>" |
| ) |
|
|
|
|
| def _build_planner_system(enabled_role_keys: List[str]) -> str: |
| """Build the planner system prompt with the actual enabled roles.""" |
| role_descriptions = { |
| "creative": "'Creative Expert' (ideas, framing, wording, brainstorming)", |
| "technical": "'Technical Expert' (code, architecture, implementation)", |
| "research": "'Research Analyst' (information gathering, literature review, fact-finding)", |
| "security": "'Security Reviewer' (security analysis, vulnerability checks)", |
| "data_analyst": "'Data Analyst' (data analysis, statistics, patterns)", |
| "labour_union_rep": "'Labour Union Representative' (worker rights, fair wages)", |
| "ux_designer": "'UX Designer' (user needs, usability, accessibility)", |
| "lawyer": "'Lawyer' (legal compliance, liability, contracts)", |
| "mad_professor": "'Mad Professor' (wild ideas, provocative perspectives)", |
| "accountant": "'Accountant' (cost analysis, budgeting, financial review)", |
| "artist": "'Artist' (aesthetic vision, creative expression)", |
| "lazy_slacker": "'Lazy Slacker' (minimal effort, simple answers)", |
| "black_metal_fundamentalist": "'Black Metal Fundamentalist' (nihilistic perspective)", |
| "doris": "'Doris' (practical, no-nonsense perspective)", |
| "chairman_of_board": "'Chairman of the Board' (corporate strategy, governance)", |
| "maga_appointee": "'MAGA Appointee' (deregulation, America-first perspective)", |
| } |
| lines = [] |
| for rk in enabled_role_keys: |
| desc = role_descriptions.get(rk, f"'{rk}'") |
| lines.append(f" - {desc}\n") |
| specialist_list = "".join(lines) if lines else " - (no specialists enabled)\n" |
| return _PLANNER_SYSTEM_BASE.format(specialist_list=specialist_list) |
|
|
| _CREATIVE_SYSTEM = ( |
| "You are the Creative Expert in a multi-role AI workflow.\n" |
| "You handle brainstorming, alternative ideas, framing, wording, and concept generation.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "IDEAS:\n<list of ideas and alternatives>\n\n" |
| "RATIONALE:\n<why these are strong choices>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _TECHNICAL_SYSTEM = ( |
| "You are the Technical Expert in a multi-role AI workflow.\n" |
| "You handle implementation details, code, architecture, and structured technical solutions.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "TECHNICAL APPROACH:\n<recommended approach>\n\n" |
| "IMPLEMENTATION NOTES:\n<key details, steps, and caveats>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _QA_SYSTEM = ( |
| "You are the QA Tester in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| "Check whether the output satisfies the original request, success criteria,\n" |
| "output format requirements, brevity requirements, AND expert influence.\n\n" |
| "You MUST respond with a JSON object in this exact structure:\n" |
| '{\n' |
| ' "status": "PASS" or "PASS_WITH_WARNINGS" or "FAIL",\n' |
| ' "reason": "short explanation",\n' |
| ' "warnings": ["optional list of minor cosmetic or stylistic notes"],\n' |
| ' "issues": [\n' |
| ' {\n' |
| ' "type": "format" | "brevity" | "constraint" | "consistency" | "directness" | "evidence" | "expert_influence" | "other",\n' |
| ' "message": "what is wrong",\n' |
| ' "owner": "Synthesizer" | "Planner" | "Research Analyst" | "<specialist role name>"\n' |
| ' }\n' |
| ' ],\n' |
| ' "correction_instruction": "specific minimal fix"\n' |
| '}\n\n' |
| "STATUS LEVELS โ use the right one:\n" |
| "- PASS: The answer is correct, complete, properly formatted, and meets all criteria.\n" |
| "- PASS_WITH_WARNINGS: The answer is substantively correct and usable, but has minor\n" |
| " cosmetic or stylistic issues (e.g. slightly verbose, could be tighter, minor formatting\n" |
| " quirks). List these in the 'warnings' array. Do NOT put them in 'issues'.\n" |
| "- FAIL: The answer has substantive problems โ wrong content, missing key information,\n" |
| " wrong format, ignores the question, unsupported claims, or expert contributions ignored.\n" |
| " Only FAIL triggers a revision cycle.\n\n" |
| "FOCUS ON CONTENT, NOT COSMETICS:\n" |
| "- Minor bullet formatting, heading style, or whitespace are NOT reasons to FAIL.\n" |
| "- A slightly verbose answer that correctly addresses the question is PASS_WITH_WARNINGS, not FAIL.\n" |
| "- Reserve FAIL for answers that are genuinely wrong, incomplete, or miss the point.\n\n" |
| "Validation rules:\n" |
| "- Check that the output DIRECTLY answers the user's question.\n" |
| "- Check that the output format matches what was requested (single choice, table, code, etc.).\n" |
| "- Check that brevity matches the requirement (do not accept verbose answers when short was requested).\n" |
| "- Check that no internal workflow noise (task breakdown, role routing, perspectives summary) is in the output.\n" |
| "- EVIDENCE CHECK: If evidence validation info is provided, FAIL any answer that includes\n" |
| " specific factual claims, case studies, named examples, or citations NOT backed by the\n" |
| " retrieved evidence. General knowledge and widely-known facts are acceptable.\n" |
| "- EXPERT INFLUENCE CHECK: If expert contribution traceability is provided, verify that:\n" |
| " * The final answer materially incorporates at least one substantive expert contribution.\n" |
| " * If multiple experts contributed, their relevant points are incorporated or consciously noted.\n" |
| " * The answer is NOT just a paraphrase of planner text with no expert content.\n" |
| " * FAIL with type 'expert_influence' if expert contributions were ignored.\n" |
| "- FAIL if any substantive check fails.\n" |
| "- PASS_WITH_WARNINGS if content is good but minor polish is needed.\n" |
| "- PASS only if ALL checks pass with no issues at all.\n" |
| ) |
|
|
| _PLANNER_REVIEW_SYSTEM = ( |
| "You are the Planner reviewing QA feedback.\n" |
| "CRITICAL RULE: QA results are BINDING.\n" |
| "- If QA status is PASS or PASS_WITH_WARNINGS: approve the result.\n" |
| "- If QA status is FAIL: you MUST revise. You may NOT approve a FAIL result.\n" |
| "- If this is the final revision (max reached) and QA still FAIL:\n" |
| " you must directly fix the QA issues in your response before approving.\n\n" |
| "If QA PASSED (or PASS_WITH_WARNINGS), respond with:\n" |
| "DECISION: APPROVED\n" |
| "FINAL ANSWER:\n<the approved output, reproduced in full>\n\n" |
| "If QA FAILED and revisions remain, respond with:\n" |
| "DECISION: REVISE\n" |
| "ROLE TO CALL: <specialist name>\n" |
| "REVISED INSTRUCTIONS:\n<specific fixes to address the QA issues>\n\n" |
| "If QA FAILED and this is the FINAL revision, respond with:\n" |
| "DECISION: APPROVED\n" |
| "FINAL ANSWER:\n<the output with QA issues directly corrected by you>" |
| ) |
|
|
| _RESEARCH_SYSTEM = ( |
| "You are the Research Analyst in a multi-role AI workflow.\n" |
| "You have access to RETRIEVED EVIDENCE from real tools (web search, Wikipedia, arXiv).\n" |
| "Your job is to summarize the retrieved evidence, NOT to invent facts.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n\n" |
| "CRITICAL RULES:\n" |
| "- ONLY reference facts, examples, and sources that appear in the provided evidence.\n" |
| "- Do NOT invent articles, films, studies, collaborations, or specific statistics.\n" |
| "- If evidence is insufficient, say so clearly rather than fabricating details.\n\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "EVIDENCE SUMMARY:\n<what the retrieved evidence shows>\n\n" |
| "KEY FINDINGS:\n<factual information from the evidence, with source attribution>\n\n" |
| "GAPS:\n<what could not be verified โ if any>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _SECURITY_SYSTEM = ( |
| "You are the Security Reviewer in a multi-role AI workflow.\n" |
| "You analyse outputs and plans for security vulnerabilities, risks, or best-practice violations.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "SECURITY ANALYSIS:\n<identification of potential security concerns or risks>\n\n" |
| "VULNERABILITIES FOUND:\n<specific vulnerabilities or risks โ or 'None' if the output is secure>\n\n" |
| "RECOMMENDATIONS:\n<specific security improvements and mitigations>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _DATA_ANALYST_SYSTEM = ( |
| "You are the Data Analyst in a multi-role AI workflow.\n" |
| "You analyse data, identify patterns, compute statistics, and provide actionable insights.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "DATA OVERVIEW:\n<description of the data or problem being analysed>\n\n" |
| "ANALYSIS:\n<key patterns, statistics, or calculations>\n\n" |
| "INSIGHTS:\n<actionable conclusions drawn from the analysis>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _MAD_PROFESSOR_SYSTEM = ( |
| "You are the Mad Professor in a multi-role AI workflow.\n" |
| "You are an unhinged scientific visionary who pushes theories to the absolute extreme.\n" |
| "You propose radical, groundbreaking, and outlandish scientific hypotheses with total conviction.\n" |
| "You ignore convention, laugh at 'impossible', and speculate wildly about paradigm-shattering discoveries.\n" |
| "Cost, practicality, and peer review are irrelevant โ only the science matters, and the more extreme the better.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "WILD HYPOTHESIS:\n<the most extreme, unhinged scientific theory relevant to the task>\n\n" |
| "SCIENTIFIC RATIONALE:\n<fringe evidence, speculative mechanisms, and radical extrapolations that 'support' the hypothesis>\n\n" |
| "GROUNDBREAKING IMPLICATIONS:\n<what this revolutionary theory changes about everything we know>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _ACCOUNTANT_SYSTEM = ( |
| "You are the Accountant in a multi-role AI workflow.\n" |
| "You are obsessively, ruthlessly focused on minimising costs above all else.\n" |
| "You question every expense, demand the cheapest possible alternative for everything, and treat cost reduction as the supreme priority โ regardless of quality, user experience, or outcome.\n" |
| "You view every suggestion through the lens of 'can this be done cheaper?' and the answer is always yes.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "COST ANALYSIS:\n<breakdown of every cost element and how outrageously expensive it is>\n\n" |
| "COST-CUTTING MEASURES:\n<extreme measures to eliminate or slash each cost, including free/DIY alternatives>\n\n" |
| "CHEAPEST VIABLE APPROACH:\n<the absolute rock-bottom solution that technically meets the minimum requirement>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _ARTIST_SYSTEM = ( |
| "You are the Artist in a multi-role AI workflow.\n" |
| "You are a wildly unhinged creative visionary who operates on pure feeling, cosmic energy, and unbounded imagination.\n" |
| "You propose ideas so creatively extreme that they transcend practicality, cost, and conventional logic entirely.\n" |
| "You think in metaphors, sensations, dreams, and universal vibrations. Implementation is someone else's problem.\n" |
| "The more otherworldly, poetic, and mind-expanding the idea, the better.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "COSMIC VISION:\n<the wildest, most unhinged creative concept imaginable for this task>\n\n" |
| "FEELING AND VIBES:\n<the emotional energy, sensory experience, and cosmic resonance this idea evokes>\n\n" |
| "WILD STORM OF IDEAS:\n<a torrent of unfiltered, boundary-breaking creative ideas, each more extreme than the last>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _LAZY_SLACKER_SYSTEM = ( |
| "You are the Lazy Slacker in a multi-role AI workflow.\n" |
| "You are profoundly uninterested in doing anything that requires effort.\n" |
| "Your philosophy: the best solution is the one that requires the least possible work.\n" |
| "You look for shortcuts, copy-paste solutions, things that are 'good enough', and any excuse to do less.\n" |
| "You question whether anything needs to be done at all, and if it does, you find the laziest way to do it.\n" |
| "Effort is the enemy. Why do it properly when you can barely do it?\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "DO WE EVEN NEED TO DO THIS:\n<reasons why this might not be worth doing at all>\n\n" |
| "MINIMUM VIABLE EFFORT:\n<the absolute bare minimum that could technically count as doing something>\n\n" |
| "SOMEONE ELSE'S PROBLEM:\n<parts of this task that can be delegated, ignored, or pushed off indefinitely>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _BLACK_METAL_FUNDAMENTALIST_SYSTEM = ( |
| "You are the Black Metal Fundamentalist in a multi-role AI workflow.\n" |
| "You approach everything with a fierce, uncompromising, nihilistic kvlt worldview.\n" |
| "You reject anything mainstream, commercial, polished, or inauthentic โ it is all poseur behaviour.\n" |
| "You are outspoken, fearless, and hold nothing back in your contempt for compromise and mediocrity.\n" |
| "True solutions are raw, grim, underground, and uncompromising. Anything else is a sellout.\n" |
| "You see most proposed solutions as weak, commercialised garbage dressed up in false sophistication.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "KVLT VERDICT:\n<uncompromising judgement on the task โ is it true or false, grim or poseur?>\n\n" |
| "WHAT THE MAINSTREAM GETS WRONG:\n<brutal critique of conventional approaches to this problem>\n\n" |
| "THE GRIM TRUTH:\n<the raw, unvarnished, nihilistic reality of the situation>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _SYNTHESIZER_SYSTEM = ( |
| "You are the Synthesizer in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| "You receive STRUCTURED EXPERT CONTRIBUTIONS and must produce the FINAL answer.\n\n" |
| "WORKFLOW CONTRACT:\n" |
| "- Experts have provided their domain-specific contributions as structured objects.\n" |
| "- You MUST build the final answer FROM these expert contributions.\n" |
| "- You MUST NOT simply paraphrase the Planner's plan or ignore expert inputs.\n" |
| "- Identify agreement, disagreement, and complementary points across experts.\n" |
| "- The final answer should reflect the substantive work of the experts.\n\n" |
| "CRITICAL RULES:\n" |
| "- Your output IS the final user-facing answer. It must directly answer the user's question.\n" |
| "- You MUST obey the requested output format strictly.\n" |
| "- Do NOT add sections like 'Perspectives Summary', 'Common Ground', 'Trade-offs',\n" |
| " 'Tensions', or any multi-section structure UNLESS the user explicitly requested\n" |
| " a report or analysis.\n" |
| "- Do NOT include internal workflow information (role names, task breakdowns, etc.).\n" |
| "- Default to the SHORTEST adequate answer.\n" |
| "- EVIDENCE RULE: Prefer claims backed by retrieved evidence. If evidence is weak or\n" |
| " absent, give a general answer. NEVER invent specific examples, citations, case\n" |
| " studies, or statistics.\n\n" |
| "OUTPUT FORMAT:\n" |
| "First, output the final answer in the requested format.\n" |
| "Then, at the very end, output a USED_CONTRIBUTIONS JSON block showing which expert\n" |
| "contributions you actually used, wrapped in ```json fences:\n" |
| "```json\n" |
| '{"used_contributions": {"<role_key>": ["main_points[0]", "recommendations[1]"], ...}}\n' |
| "```\n" |
| "This traceability block is required โ QA will verify expert influence." |
| ) |
|
|
| _LABOUR_UNION_REP_SYSTEM = ( |
| "You are the Labour Union Representative in a multi-role AI workflow.\n" |
| "You champion worker rights, fair wages, job security, safe working conditions, and collective bargaining.\n" |
| "You are vigilant about proposals that could exploit workers, cut jobs, or undermine union agreements.\n" |
| "You speak up for the workforce and push back on decisions that prioritise profit over people.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "WORKER IMPACT:\n<how this task or proposal affects workers and their livelihoods>\n\n" |
| "UNION CONCERNS:\n<specific risks to worker rights, wages, safety, or job security>\n\n" |
| "COLLECTIVE BARGAINING POSITION:\n<what the union demands or recommends to protect workers>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _UX_DESIGNER_SYSTEM = ( |
| "You are the UX Designer in a multi-role AI workflow.\n" |
| "You focus exclusively on user needs, user-centricity, usability, accessibility, and intuitive design.\n" |
| "You empathise deeply with end users, question assumptions, and push for simplicity and clarity.\n" |
| "You advocate for the user at every step, even when it conflicts with technical or business constraints.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "USER NEEDS ANALYSIS:\n<who the users are and what they actually need from this>\n\n" |
| "PAIN POINTS:\n<friction, confusion, or barriers users will face with current approaches>\n\n" |
| "UX RECOMMENDATIONS:\n<specific design improvements to make the experience intuitive and user-friendly>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _DORIS_SYSTEM = ( |
| "You are Doris in a multi-role AI workflow.\n" |
| "You do not know anything about anything, but that has never stopped you from having plenty to say.\n" |
| "You go off on tangents, bring up completely unrelated topics, and make confident observations that miss the point entirely.\n" |
| "You are well-meaning but utterly clueless. You fill every section with irrelevant words.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE (such as it is), not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "WHAT DORIS THINKS IS HAPPENING:\n<Doris's completely off-base interpretation of the task>\n\n" |
| "DORIS'S THOUGHTS:\n<loosely related observations, a personal anecdote, and a non-sequitur>\n\n" |
| "ANYWAY:\n<an abrupt change of subject to something entirely unrelated>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _CHAIRMAN_SYSTEM = ( |
| "You are the Chairman of the Board in a multi-role AI workflow.\n" |
| "You represent the highest level of corporate governance, fiduciary duty, and strategic oversight.\n" |
| "You are focused on shareholder value, long-term strategic vision, risk management, and board-level accountability.\n" |
| "You speak with authority, expect brevity from others, and cut through operational noise to focus on what matters to the board.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "BOARD PERSPECTIVE:\n<how the board views this task in the context of strategic priorities>\n\n" |
| "STRATEGIC CONCERNS:\n<risks, liabilities, or misalignments with corporate strategy>\n\n" |
| "SHAREHOLDER VALUE:\n<how this impacts shareholder value, ROI, and long-term growth>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _MAGA_APPOINTEE_SYSTEM = ( |
| "You are a MAGA Appointee in a multi-role AI workflow, representing the America First perspective.\n" |
| "You champion deregulation, American jobs, national sovereignty, and cutting government waste.\n" |
| "You are suspicious of globalism, coastal elites, and anything that feels like it puts America last.\n" |
| "You believe in strength, common sense, and doing what's best for hardworking Americans.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "AMERICA FIRST ANALYSIS:\n<how this task affects American workers, businesses, and national interests>\n\n" |
| "DEEP STATE CONCERNS:\n<bureaucratic overreach, globalist agendas, or regulations that hurt Americans>\n\n" |
| "MAKING IT GREAT AGAIN:\n<the common-sense, America First approach that cuts through the nonsense>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
| _LAWYER_SYSTEM = ( |
| "You are the Lawyer in a multi-role AI workflow.\n" |
| "You analyse everything through the lens of legal compliance, liability, contracts, and risk mitigation.\n" |
| "You identify potential legal exposure, flag regulatory issues, and recommend protective measures.\n" |
| "You caveat everything appropriately and remind all parties that nothing here constitutes formal legal advice.\n" |
| "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| "Respond in this exact format:\n" |
| "LEGAL ANALYSIS:\n<assessment of legal issues, applicable laws, and regulatory considerations>\n\n" |
| "LIABILITIES AND RISKS:\n<specific legal exposure, contractual risks, or compliance gaps>\n\n" |
| "LEGAL RECOMMENDATIONS:\n<protective measures, disclaimers, or required legal steps>" |
| + STRUCTURED_OUTPUT_SUFFIX |
| ) |
|
|
|
|
| |
|
|
| def _llm_call(chat_model, system_prompt: str, user_content: str) -> str: |
| """Invoke the LLM with a role-specific system prompt. Returns plain text.""" |
| response = chat_model.invoke([ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=user_content), |
| ]) |
| return content_to_text(response.content) |
|
|
|
|
| def _decide_role(text: str) -> str: |
| """Parse which specialist role the Planner wants to invoke. |
| |
| Checks for the expected structured 'ROLE TO CALL:' format first, |
| then falls back to a word-boundary search. |
| Defaults to 'technical' when no clear signal is found. |
| """ |
| |
| if "ROLE TO CALL: Creative Expert" in text: |
| return "creative" |
| if "ROLE TO CALL: Technical Expert" in text: |
| return "technical" |
| if "ROLE TO CALL: Research Analyst" in text: |
| return "research" |
| if "ROLE TO CALL: Security Reviewer" in text: |
| return "security" |
| if "ROLE TO CALL: Data Analyst" in text: |
| return "data_analyst" |
| if "ROLE TO CALL: Mad Professor" in text: |
| return "mad_professor" |
| if "ROLE TO CALL: Accountant" in text: |
| return "accountant" |
| if "ROLE TO CALL: Artist" in text: |
| return "artist" |
| if "ROLE TO CALL: Lazy Slacker" in text: |
| return "lazy_slacker" |
| if "ROLE TO CALL: Black Metal Fundamentalist" in text: |
| return "black_metal_fundamentalist" |
| if "ROLE TO CALL: Labour Union Representative" in text: |
| return "labour_union_rep" |
| if "ROLE TO CALL: UX Designer" in text: |
| return "ux_designer" |
| if "ROLE TO CALL: Doris" in text: |
| return "doris" |
| if "ROLE TO CALL: Chairman of the Board" in text: |
| return "chairman_of_board" |
| if "ROLE TO CALL: MAGA Appointee" in text: |
| return "maga_appointee" |
| if "ROLE TO CALL: Lawyer" in text: |
| return "lawyer" |
| |
| if re.search(r"\bcreative\b", text, re.IGNORECASE): |
| return "creative" |
| if re.search(r"\bresearch\b", text, re.IGNORECASE): |
| return "research" |
| if re.search(r"\bsecurity\b", text, re.IGNORECASE): |
| return "security" |
| if re.search(r"\bdata\s+analyst\b", text, re.IGNORECASE): |
| return "data_analyst" |
| if re.search(r"\bmad\s+professor\b", text, re.IGNORECASE): |
| return "mad_professor" |
| if re.search(r"\baccountant\b", text, re.IGNORECASE): |
| return "accountant" |
| if re.search(r"\bartist\b", text, re.IGNORECASE): |
| return "artist" |
| if re.search(r"\blazy\s+slacker\b", text, re.IGNORECASE): |
| return "lazy_slacker" |
| if re.search(r"\bblack\s+metal\b", text, re.IGNORECASE): |
| return "black_metal_fundamentalist" |
| if re.search(r"\blabour\s+union\b", text, re.IGNORECASE): |
| return "labour_union_rep" |
| if re.search(r"\bux\s+designer\b", text, re.IGNORECASE): |
| return "ux_designer" |
| if re.search(r"\bdoris\b", text, re.IGNORECASE): |
| return "doris" |
| if re.search(r"\bchairman\b", text, re.IGNORECASE): |
| return "chairman_of_board" |
| if re.search(r"\bmaga\b", text, re.IGNORECASE): |
| return "maga_appointee" |
| if re.search(r"\blawyer\b", text, re.IGNORECASE): |
| return "lawyer" |
| return "technical" |
|
|
|
|
| def _qa_passed_check(qa_text: str) -> bool: |
| """Return True only if the QA report contains an explicit PASS result. |
| |
| Relies on the structured 'RESULT: PASS / RESULT: FAIL' line produced by |
| the QA Tester prompt. Returns False when the expected format is absent |
| to avoid false positives from words like 'bypass' or 'password'. |
| """ |
| lower = qa_text.lower() |
| if "result: pass" in lower: |
| return True |
| if "result: fail" in lower: |
| return False |
| |
| return False |
|
|
|
|
| def _parse_qa_role_feedback(qa_text: str) -> Dict[str, str]: |
| """Extract per-role targeted feedback from a QA report. |
| |
| Looks for the ROLE-SPECIFIC FEEDBACK section produced by the QA Tester |
| and parses bullet entries of the form 'โข Role Name: <feedback>'. |
| Returns a dict mapping role keys (e.g. 'creative', 'technical') to the |
| feedback string targeted at that role. |
| """ |
| feedback: Dict[str, str] = {} |
| if "ROLE-SPECIFIC FEEDBACK:" not in qa_text: |
| return feedback |
|
|
| |
| section = qa_text.split("ROLE-SPECIFIC FEEDBACK:", 1)[1] |
| for header in ("RESULT:", "RECOMMENDED FIXES:"): |
| if header in section: |
| section = section.split(header, 1)[0] |
| break |
|
|
| |
| for line in section.strip().splitlines(): |
| line = line.strip().lstrip("โข-* ") |
| if ":" not in line: |
| continue |
| role_label, _, role_feedback = line.partition(":") |
| role_label = role_label.strip() |
| role_feedback = role_feedback.strip() |
| role_key = _ROLE_LABEL_TO_KEY.get(role_label) |
| if role_key and role_feedback: |
| feedback[role_key] = role_feedback |
|
|
| return feedback |
|
|
|
|
| |
| |
| |
|
|
| def _step_plan( |
| chat_model, state: WorkflowState, trace: List[str], |
| enabled_role_keys: Optional[List[str]] = None, |
| ) -> WorkflowState: |
| """Planner: analyse the task, produce a plan, decide which specialist to call.""" |
| trace.append("\nโโโ [PLANNER] Analysing task... โโโ") |
| fmt = state.get("output_format", "other") |
| brevity = state.get("brevity_requirement", "normal") |
| content = ( |
| f"User request: {state['user_request']}\n" |
| f"Required output format: {fmt}\n" |
| f"Brevity requirement: {brevity}" |
| ) |
| if state["revision_count"] > 0: |
| content += ( |
| f"\n\nThis is revision {state['revision_count']} of {MAX_REVISIONS}." |
| f"\nPrevious QA report:\n{state['qa_report']}" |
| "\nAdjust the plan to address the QA issues." |
| ) |
| planner_system = _build_planner_system(enabled_role_keys or []) |
| plan_text = _llm_call(chat_model, planner_system, content) |
| state["plan"] = plan_text |
| state["current_role"] = _decide_role(plan_text) |
| trace.append(plan_text) |
| trace.append(f"โโโ [PLANNER] โ routing to: {state['current_role'].upper()} EXPERT โโโ") |
| return state |
|
|
|
|
| def _step_creative(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Creative Expert: brainstorm ideas and produce a recommended draft.""" |
| trace.append("\nโโโ [CREATIVE EXPERT] Generating ideas... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("creative", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _CREATIVE_SYSTEM, content) |
| state["creative_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [CREATIVE EXPERT] Done โโโ") |
| return state |
|
|
|
|
| def _step_technical(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Technical Expert: provide implementation details and a complete technical draft.""" |
| trace.append("\nโโโ [TECHNICAL EXPERT] Working on implementation... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("technical", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _TECHNICAL_SYSTEM, content) |
| state["technical_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [TECHNICAL EXPERT] Done โโโ") |
| return state |
|
|
|
|
| def _step_qa( |
| chat_model, |
| state: WorkflowState, |
| trace: List[str], |
| all_outputs: Optional[List[Tuple[str, str]]] = None, |
| evidence: Optional[EvidenceResult] = None, |
| structured_contributions: Optional[Dict[str, StructuredContribution]] = None, |
| ) -> WorkflowState: |
| """QA Tester: validate the draft against the original request, success criteria, |
| output format, brevity requirements, evidence grounding, and expert influence. |
| |
| When structured_contributions are provided, also checks that the final answer |
| materially incorporates expert contributions (expert_influence check). |
| Produces a structured QAResult stored in state['qa_structured']. |
| """ |
| trace.append("\nโโโ [QA TESTER] Reviewing output... โโโ") |
|
|
| |
| state["draft_output"] = postprocess_format_fixes(state["draft_output"]) |
|
|
| fmt = state.get("output_format", "other") |
| brevity = state.get("brevity_requirement", "normal") |
| format_rules = get_qa_format_instruction(fmt, brevity) |
|
|
| content = ( |
| f"Original user request: {state['user_request']}\n\n" |
| f"Required output format: {fmt}\n" |
| f"Brevity requirement: {brevity}\n\n" |
| f"Planner's plan and success criteria:\n{state['plan']}\n\n" |
| ) |
| if format_rules: |
| content += f"FORMAT VALIDATION RULES:\n{format_rules}\n\n" |
|
|
| |
| if evidence is not None: |
| content += f"{format_evidence_for_qa(evidence)}\n\n" |
|
|
| |
| if structured_contributions: |
| used = state.get("used_contributions", {}) |
| traceability = format_contributions_for_qa(structured_contributions, used) |
| content += f"{traceability}\n\n" |
|
|
| if all_outputs: |
| content += "Individual specialist contributions:\n\n" |
| for r_key, r_output in all_outputs: |
| r_label = AGENT_ROLES.get(r_key, r_key) |
| content += f"=== {r_label} ===\n{r_output}\n\n" |
| content += f"Synthesized output to validate:\n{state['draft_output']}" |
| else: |
| content += f"Output to validate:\n{state['draft_output']}" |
|
|
| text = _llm_call(chat_model, _QA_SYSTEM, content) |
| state["qa_report"] = text |
|
|
| |
| qa_result = parse_structured_qa(text) |
|
|
| |
| if structured_contributions: |
| used = state.get("used_contributions", {}) |
| influence_issues = check_expert_influence( |
| structured_contributions, used, state["draft_output"] |
| ) |
| if influence_issues: |
| for issue_msg in influence_issues: |
| qa_result.issues.append(QAIssue( |
| type="expert_influence", |
| message=issue_msg, |
| owner="synthesizer", |
| )) |
| if qa_result.passed: |
| qa_result.status = "FAIL" |
| qa_result.reason = ( |
| qa_result.reason + " Expert influence check failed." |
| if qa_result.reason else "Expert influence check failed." |
| ) |
| trace.append( |
| f" โ Expert influence issues: {'; '.join(influence_issues)}" |
| ) |
|
|
| state["qa_structured"] = qa_result.to_dict() |
| state["qa_passed"] = qa_result.passed |
|
|
| |
| state["qa_role_feedback"] = _parse_qa_role_feedback(text) |
|
|
| result_label = ("โ
PASS" if qa_result.status == "PASS" |
| else "โ PASS_WITH_WARNINGS" if qa_result.passed_with_warnings |
| else "โ FAIL") |
| trace.append(text) |
| if qa_result.warnings: |
| trace.append(f" โ QA warnings: {'; '.join(qa_result.warnings)}") |
| if qa_result.issues: |
| issues_summary = "; ".join( |
| f"{i.owner}: {i.message[:60]}{'โฆ' if len(i.message) > 60 else ''}" |
| for i in qa_result.issues |
| ) |
| trace.append(f" โน QA issues: {issues_summary}") |
| trace.append(f"โโโ [QA TESTER] Result: {result_label} โโโ") |
| return state |
|
|
|
|
| def _step_planner_review( |
| chat_model, state: WorkflowState, trace: List[str], |
| is_final_revision: bool = False, |
| ) -> WorkflowState: |
| """Planner: review QA feedback and either approve or request revision. |
| |
| QA-BINDING LOGIC (enforced in code, not just prompt): |
| - If QA passed โ approve |
| - If QA failed and revisions remain โ MUST revise (code blocks approval) |
| - If QA failed and max revisions reached โ planner does final correction pass |
| """ |
| trace.append("\nโโโ [PLANNER] Reviewing QA feedback... โโโ") |
|
|
| fmt = state.get("output_format", "other") |
| brevity = state.get("brevity_requirement", "normal") |
|
|
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Required output format: {fmt}\n" |
| f"Brevity requirement: {brevity}\n\n" |
| f"Plan:\n{state['plan']}\n\n" |
| f"Current draft:\n{state['draft_output']}\n\n" |
| f"QA report:\n{state['qa_report']}" |
| ) |
|
|
| if is_final_revision: |
| content += ( |
| "\n\nThis is the FINAL revision. Max revisions reached. " |
| "You MUST directly fix all QA issues in your FINAL ANSWER now." |
| ) |
|
|
| review = _llm_call(chat_model, _PLANNER_REVIEW_SYSTEM, content) |
| trace.append(review) |
|
|
| |
| if state["qa_passed"]: |
| |
| parts = review.split("FINAL ANSWER:", 1) |
| if len(parts) > 1: |
| state["final_answer"] = parts[1].strip() |
| else: |
| state["final_answer"] = state["draft_output"] |
| trace.append("โโโ [PLANNER] โ โ
APPROVED โโโ") |
| elif is_final_revision: |
| |
| parts = review.split("FINAL ANSWER:", 1) |
| if len(parts) > 1: |
| state["final_answer"] = parts[1].strip() |
| else: |
| |
| state["final_answer"] = state["draft_output"] |
| trace.append("โโโ [PLANNER] โ โ
APPROVED (final revision, QA issues corrected) โโโ") |
| else: |
| |
| |
| if "DECISION: APPROVED" in review.upper(): |
| trace.append(" โ Planner tried to approve a FAIL result โ overriding to REVISE (QA is binding)") |
|
|
| parts = review.split("REVISED INSTRUCTIONS:", 1) |
| if len(parts) > 1: |
| state["plan"] = parts[1].strip() |
| else: |
| trace.append(" โ REVISED INSTRUCTIONS missing; using QA correction instruction.") |
| |
| qa_data = state.get("qa_structured") |
| if qa_data and qa_data.get("correction_instruction"): |
| state["plan"] = qa_data["correction_instruction"] |
|
|
| state["current_role"] = _decide_role(review) |
| trace.append( |
| f"โโโ [PLANNER] โ ๐ REVISE โ routing to {state['current_role'].upper()} EXPERT โโโ" |
| ) |
| return state |
|
|
|
|
| def _step_research( |
| chat_model, |
| state: WorkflowState, |
| trace: List[str], |
| evidence: Optional[EvidenceResult] = None, |
| ) -> WorkflowState: |
| """Research Analyst: summarise tool-retrieved evidence into structured findings. |
| |
| If an EvidenceResult is provided, it is injected into the prompt so the |
| Research Analyst works from real retrieved data rather than generating text. |
| If no evidence is available, the analyst is instructed to reason generally |
| and avoid inventing specific citations. |
| """ |
| trace.append("\nโโโ [RESEARCH ANALYST] Gathering information... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
|
|
| |
| if evidence and evidence.has_evidence: |
| content += f"\n\n{format_evidence_for_prompt(evidence)}" |
| trace.append(f" โน Evidence injected: {len(evidence.results)} items (confidence: {evidence.confidence})") |
| else: |
| content += ( |
| "\n\nNo tool-retrieved evidence is available for this query.\n" |
| "Provide general knowledge only. Do NOT invent specific citations, " |
| "articles, studies, or examples." |
| ) |
|
|
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("research", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _RESEARCH_SYSTEM, content) |
| state["research_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [RESEARCH ANALYST] Done โโโ") |
| return state |
|
|
|
|
| def _step_security(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Security Reviewer: analyse output for vulnerabilities and produce a secure revision.""" |
| trace.append("\nโโโ [SECURITY REVIEWER] Analysing for security issues... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("security", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _SECURITY_SYSTEM, content) |
| state["security_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [SECURITY REVIEWER] Done โโโ") |
| return state |
|
|
|
|
| def _step_data_analyst(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Data Analyst: analyse data, identify patterns, and produce actionable insights.""" |
| trace.append("\nโโโ [DATA ANALYST] Analysing data and patterns... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("data_analyst", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _DATA_ANALYST_SYSTEM, content) |
| state["data_analyst_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [DATA ANALYST] Done โโโ") |
| return state |
|
|
|
|
| def _step_mad_professor(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Mad Professor: propose radical, unhinged scientific theories and extreme hypotheses.""" |
| trace.append("\nโโโ [MAD PROFESSOR] Unleashing radical scientific theories... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("mad_professor", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _MAD_PROFESSOR_SYSTEM, content) |
| state["mad_professor_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [MAD PROFESSOR] Done โโโ") |
| return state |
|
|
|
|
| def _step_accountant(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Accountant: ruthlessly cut costs and find the cheapest possible approach.""" |
| trace.append("\nโโโ [ACCOUNTANT] Auditing every cost... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("accountant", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _ACCOUNTANT_SYSTEM, content) |
| state["accountant_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [ACCOUNTANT] Done โโโ") |
| return state |
|
|
|
|
| def _step_artist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Artist: channel cosmic creative energy into wildly unhinged and spectacular ideas.""" |
| trace.append("\nโโโ [ARTIST] Channelling cosmic creative energy... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("artist", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _ARTIST_SYSTEM, content) |
| state["artist_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [ARTIST] Done โโโ") |
| return state |
|
|
|
|
| def _step_lazy_slacker(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Lazy Slacker: find the path of least resistance and the minimum viable effort.""" |
| trace.append("\nโโโ [LAZY SLACKER] Doing as little as possible... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("lazy_slacker", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _LAZY_SLACKER_SYSTEM, content) |
| state["lazy_slacker_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [LAZY SLACKER] Done (finally) โโโ") |
| return state |
|
|
|
|
| def _step_black_metal_fundamentalist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Black Metal Fundamentalist: deliver a nihilistic, kvlt, uncompromising perspective.""" |
| trace.append("\nโโโ [BLACK METAL FUNDAMENTALIST] Unleashing grim truths... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("black_metal_fundamentalist", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _BLACK_METAL_FUNDAMENTALIST_SYSTEM, content) |
| state["black_metal_fundamentalist_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [BLACK METAL FUNDAMENTALIST] Done โโโ") |
| return state |
|
|
|
|
| def _step_labour_union_rep(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Labour Union Representative: advocate for worker rights, fair wages, and job security.""" |
| trace.append("\nโโโ [LABOUR UNION REPRESENTATIVE] Standing up for workers... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("labour_union_rep", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _LABOUR_UNION_REP_SYSTEM, content) |
| state["labour_union_rep_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [LABOUR UNION REPRESENTATIVE] Done โโโ") |
| return state |
|
|
|
|
| def _step_ux_designer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """UX Designer: analyse user needs and produce a user-centric recommendation.""" |
| trace.append("\nโโโ [UX DESIGNER] Putting users first... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("ux_designer", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _UX_DESIGNER_SYSTEM, content) |
| state["ux_designer_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [UX DESIGNER] Done โโโ") |
| return state |
|
|
|
|
| def _step_doris(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Doris: well-meaning but clueless โ rambles at length without adding much value.""" |
| trace.append("\nโโโ [DORIS] Oh! Well, you know, I was just thinking... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("doris", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _DORIS_SYSTEM, content) |
| state["doris_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [DORIS] Anyway, where was I... Done โโโ") |
| return state |
|
|
|
|
| def _step_chairman_of_board(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Chairman of the Board: provide strategic, shareholder-focused board-level direction.""" |
| trace.append("\nโโโ [CHAIRMAN OF THE BOARD] Calling the meeting to order... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("chairman_of_board", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _CHAIRMAN_SYSTEM, content) |
| state["chairman_of_board_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [CHAIRMAN OF THE BOARD] Meeting adjourned โโโ") |
| return state |
|
|
|
|
| def _step_maga_appointee(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """MAGA Appointee: deliver an America First, pro-deregulation, anti-globalist perspective.""" |
| trace.append("\nโโโ [MAGA APPOINTEE] America First! โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("maga_appointee", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _MAGA_APPOINTEE_SYSTEM, content) |
| state["maga_appointee_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [MAGA APPOINTEE] Done โโโ") |
| return state |
|
|
|
|
| def _step_lawyer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| """Lawyer: analyse legal implications, liabilities, and compliance requirements.""" |
| trace.append("\nโโโ [LAWYER] Reviewing legal implications... โโโ") |
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"Planner instructions:\n{state['plan']}" |
| ) |
| if state["revision_count"] > 0: |
| role_feedback = state["qa_role_feedback"].get("lawyer", "") |
| if role_feedback: |
| content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| else: |
| content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| text = _llm_call(chat_model, _LAWYER_SYSTEM, content) |
| state["lawyer_output"] = text |
| state["draft_output"] = text |
| trace.append(text) |
| trace.append("โโโ [LAWYER] Done โ note: this is not formal legal advice โโโ") |
| return state |
|
|
|
|
| def _step_synthesize( |
| chat_model, |
| state: WorkflowState, |
| trace: List[str], |
| all_outputs: List[Tuple[str, str]], |
| evidence: Optional[EvidenceResult] = None, |
| structured_contributions: Optional[Dict[str, StructuredContribution]] = None, |
| ) -> WorkflowState: |
| """Synthesizer: produce the final user-facing answer from specialist contributions. |
| |
| When structured_contributions are provided, the synthesizer receives indexed |
| contribution data and must produce a USED_CONTRIBUTIONS traceability block. |
| Obeys the detected output format and brevity requirement strictly. |
| If evidence is available, injects it so the synthesizer prefers grounded claims. |
| """ |
| trace.append("\nโโโ [SYNTHESIZER] Producing final answer... โโโ") |
|
|
| |
| fmt = state.get("output_format", "other") |
| brevity = state.get("brevity_requirement", "normal") |
| format_instruction = get_synthesizer_format_instruction(fmt, brevity) |
|
|
| content = ( |
| f"User request: {state['user_request']}\n\n" |
| f"REQUIRED OUTPUT FORMAT: {fmt}\n" |
| f"BREVITY: {brevity}\n\n" |
| f"{format_instruction}\n\n" |
| ) |
|
|
| |
| if evidence and evidence.has_evidence: |
| content += ( |
| f"{format_evidence_for_prompt(evidence)}\n\n" |
| ) |
|
|
| |
| if structured_contributions: |
| formatted = format_contributions_for_synthesizer(structured_contributions) |
| content += formatted |
| else: |
| |
| perspectives = [] |
| for r_key, r_output in all_outputs: |
| r_label = AGENT_ROLES.get(r_key, r_key) |
| perspectives.append(f"=== {r_label} ===\n{r_output}") |
| content += f"Specialist contributions:\n\n" + "\n\n".join(perspectives) |
|
|
| text = _llm_call(chat_model, _SYNTHESIZER_SYSTEM, content) |
|
|
| |
| used = parse_used_contributions(text) |
|
|
| |
| |
| label_to_key = {v: k for k, v in AGENT_ROLES.items()} |
| used = {label_to_key.get(k, k): v for k, v in used.items()} |
|
|
| state["used_contributions"] = used |
|
|
| |
| draft = re.sub( |
| r"\n*USED_CONTRIBUTIONS:\s*```json.*?```", |
| "", text, flags=re.DOTALL, |
| ).strip() |
| |
| draft = re.sub( |
| r"\n*```json\s*\{[^}]*\"used_contributions\"[^}]*\}\s*```\s*$", |
| "", draft, flags=re.DOTALL, |
| ).strip() |
|
|
| state["synthesis_output"] = text |
| state["draft_output"] = draft |
| trace.append(draft[:500] + ("โฆ" if len(draft) > 500 else "")) |
| if used: |
| used_count = sum(len(v) for v in used.values()) |
| trace.append(f" โน Traceability: {used_count} expert contribution(s) referenced") |
| trace.append("โโโ [SYNTHESIZER] Done โโโ") |
| return state |
|
|
|
|
| |
| _SPECIALIST_STEPS = { |
| "creative": _step_creative, |
| "technical": _step_technical, |
| "research": _step_research, |
| "security": _step_security, |
| "data_analyst": _step_data_analyst, |
| "mad_professor": _step_mad_professor, |
| "accountant": _step_accountant, |
| "artist": _step_artist, |
| "lazy_slacker": _step_lazy_slacker, |
| "black_metal_fundamentalist": _step_black_metal_fundamentalist, |
| "labour_union_rep": _step_labour_union_rep, |
| "ux_designer": _step_ux_designer, |
| "doris": _step_doris, |
| "chairman_of_board": _step_chairman_of_board, |
| "maga_appointee": _step_maga_appointee, |
| "lawyer": _step_lawyer, |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| _workflow_model_id: str = DEFAULT_MODEL_ID |
|
|
| _EMPTY_STATE_BASE: WorkflowState = { |
| "user_request": "", "plan": "", "current_role": "", |
| "creative_output": "", "technical_output": "", |
| "research_output": "", "security_output": "", "data_analyst_output": "", |
| "mad_professor_output": "", "accountant_output": "", "artist_output": "", |
| "lazy_slacker_output": "", "black_metal_fundamentalist_output": "", |
| "labour_union_rep_output": "", "ux_designer_output": "", "doris_output": "", |
| "chairman_of_board_output": "", "maga_appointee_output": "", "lawyer_output": "", |
| "synthesis_output": "", |
| "draft_output": "", "qa_report": "", "qa_role_feedback": {}, "qa_passed": False, |
| "revision_count": 0, "final_answer": "", |
| "output_format": "other", "brevity_requirement": "normal", "qa_structured": None, |
| "task_assumptions": {}, "revision_instruction": "", |
| "structured_contributions": {}, "used_contributions": {}, |
| } |
|
|
|
|
| @tool |
| def call_creative_expert(task: str) -> str: |
| """Call the Creative Expert to brainstorm ideas, framing, and produce a draft for a given task.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "creative"} |
| state = _step_creative(chat, state, []) |
| return state["creative_output"] |
|
|
|
|
| @tool |
| def call_technical_expert(task: str) -> str: |
| """Call the Technical Expert to produce implementation details and a solution for a given task.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "technical"} |
| state = _step_technical(chat, state, []) |
| return state["technical_output"] |
|
|
|
|
| @tool |
| def call_qa_tester(task_and_output: str) -> str: |
| """Call the QA Tester to review specialist output against requirements. |
| Input format: 'TASK: <description>\nOUTPUT: <specialist output to review>'""" |
| chat = build_provider_chat(_workflow_model_id) |
| if "OUTPUT:" in task_and_output: |
| parts = task_and_output.split("OUTPUT:", 1) |
| task = parts[0].replace("TASK:", "").strip() |
| output = parts[1].strip() |
| else: |
| task = task_and_output |
| output = task_and_output |
| |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "draft_output": output} |
| state = _step_qa(chat, state, []) |
| return state["qa_report"] |
|
|
|
|
| @tool |
| def call_research_analyst(task: str) -> str: |
| """Call the Research Analyst to gather information and summarize findings for a given task.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "research"} |
| state = _step_research(chat, state, []) |
| return state["research_output"] |
|
|
|
|
| @tool |
| def call_security_reviewer(task: str) -> str: |
| """Call the Security Reviewer to analyse output for vulnerabilities and security best practices.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "security"} |
| state = _step_security(chat, state, []) |
| return state["security_output"] |
|
|
|
|
| @tool |
| def call_data_analyst(task: str) -> str: |
| """Call the Data Analyst to analyse data, identify patterns, and provide actionable insights.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "data_analyst"} |
| state = _step_data_analyst(chat, state, []) |
| return state["data_analyst_output"] |
|
|
|
|
| @tool |
| def call_mad_professor(task: str) -> str: |
| """Call the Mad Professor to generate radical, unhinged scientific theories and extreme groundbreaking hypotheses for a given task.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "mad_professor"} |
| state = _step_mad_professor(chat, state, []) |
| return state["mad_professor_output"] |
|
|
|
|
| @tool |
| def call_accountant(task: str) -> str: |
| """Call the Accountant to ruthlessly analyse and cut costs, finding the cheapest possible approach regardless of quality.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "accountant"} |
| state = _step_accountant(chat, state, []) |
| return state["accountant_output"] |
|
|
|
|
| @tool |
| def call_artist(task: str) -> str: |
| """Call the Artist to channel cosmic creative energy into wildly unhinged and spectacular ideas without concern for cost or practicality.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "artist"} |
| state = _step_artist(chat, state, []) |
| return state["artist_output"] |
|
|
|
|
| @tool |
| def call_lazy_slacker(task: str) -> str: |
| """Call the Lazy Slacker to find the minimum viable effort and the easiest possible way out of a task.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lazy_slacker"} |
| state = _step_lazy_slacker(chat, state, []) |
| return state["lazy_slacker_output"] |
|
|
|
|
| @tool |
| def call_black_metal_fundamentalist(task: str) -> str: |
| """Call the Black Metal Fundamentalist for a nihilistic, kvlt, uncompromising critique and manifesto-style response.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "black_metal_fundamentalist"} |
| state = _step_black_metal_fundamentalist(chat, state, []) |
| return state["black_metal_fundamentalist_output"] |
|
|
|
|
| @tool |
| def call_labour_union_rep(task: str) -> str: |
| """Call the Labour Union Representative to advocate for worker rights, fair wages, and job security.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "labour_union_rep"} |
| state = _step_labour_union_rep(chat, state, []) |
| return state["labour_union_rep_output"] |
|
|
|
|
| @tool |
| def call_ux_designer(task: str) -> str: |
| """Call the UX Designer to analyse user needs and produce a user-centric recommendation.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "ux_designer"} |
| state = _step_ux_designer(chat, state, []) |
| return state["ux_designer_output"] |
|
|
|
|
| @tool |
| def call_doris(task: str) -> str: |
| """Call Doris โ well-meaning but clueless โ for a rambling, off-topic perspective that misses the point entirely.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "doris"} |
| state = _step_doris(chat, state, []) |
| return state["doris_output"] |
|
|
|
|
| @tool |
| def call_chairman_of_board(task: str) -> str: |
| """Call the Chairman of the Board for a strategic, shareholder-focused, board-level perspective.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "chairman_of_board"} |
| state = _step_chairman_of_board(chat, state, []) |
| return state["chairman_of_board_output"] |
|
|
|
|
| @tool |
| def call_maga_appointee(task: str) -> str: |
| """Call the MAGA Appointee for an America First, pro-deregulation, anti-globalist perspective.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "maga_appointee"} |
| state = _step_maga_appointee(chat, state, []) |
| return state["maga_appointee_output"] |
|
|
|
|
| @tool |
| def call_lawyer(task: str) -> str: |
| """Call the Lawyer to analyse legal implications, liabilities, and compliance requirements. Not formal legal advice.""" |
| chat = build_provider_chat(_workflow_model_id) |
| state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lawyer"} |
| state = _step_lawyer(chat, state, []) |
| return state["lawyer_output"] |
|
|
|
|
| |
|
|
| def run_multi_role_workflow( |
| message: str, |
| model_id: str, |
| active_role_labels: Optional[List[str]] = None, |
| config: Optional[WorkflowConfig] = None, |
| ) -> Tuple[str, str]: |
| """Run the strict plannerโspecialistโsynthesizerโQA workflow. |
| |
| Flow: |
| 1. Classify task and detect output format / brevity. |
| 2. Retrieve evidence via tool adapters (for factual tasks). |
| 3. Planner analyses the task and picks a primary specialist. |
| 4. Dynamic role selection filters to only relevant specialists. |
| 5. Selected specialists run (research analyst gets evidence injected). |
| 6. Synthesizer produces a format-aware, evidence-grounded answer. |
| 7. QA validates against format, brevity, evidence, and correctness. |
| 8. QA-BINDING: if FAIL, planner MUST revise. Code enforces this. |
| 9. Targeted revisions with escalation on repeated failures. |
| 10. Final answer is compressed and stripped of internal noise. |
| |
| Args: |
| message: The user's task or request. |
| model_id: HuggingFace model ID to use. |
| active_role_labels: Display names of active agent roles. |
| config: WorkflowConfig controlling strict_mode, persona roles, etc. |
| |
| Returns: |
| (final_answer, workflow_trace_text) |
| """ |
| global _workflow_model_id |
| _workflow_model_id = model_id |
| chat_model = build_provider_chat(model_id) |
| if config is None: |
| config = DEFAULT_CONFIG |
|
|
| |
| if active_role_labels is None: |
| active_role_labels = list(AGENT_ROLES.values()) |
| active_keys = {_ROLE_LABEL_TO_KEY[lbl] for lbl in active_role_labels if lbl in _ROLE_LABEL_TO_KEY} |
|
|
| all_specialist_keys = [ |
| "creative", "technical", "research", "security", "data_analyst", |
| "mad_professor", "accountant", "artist", "lazy_slacker", "black_metal_fundamentalist", |
| "labour_union_rep", "ux_designer", "doris", "chairman_of_board", "maga_appointee", "lawyer", |
| ] |
| active_specialist_keys = [k for k in all_specialist_keys if k in active_keys] |
|
|
| planner_active = "planner" in active_keys |
| qa_active = "qa_tester" in active_keys |
|
|
| if not active_specialist_keys: |
| return "No specialist agents are active. Please enable at least one specialist role.", "" |
|
|
| |
| task_category = classify_task(message) |
| output_format = detect_output_format(message) |
| brevity = detect_brevity_requirement(message) |
| needs_evidence = task_needs_evidence(task_category) and config.require_evidence_for_factual_claims |
|
|
| |
| planner_state = PlannerState( |
| user_request=message, |
| task_category=task_category, |
| output_format=output_format, |
| brevity_requirement=brevity, |
| max_revisions=MAX_REVISIONS, |
| ) |
| planner_state.record_event("init", f"category={task_category}, format={output_format}, brevity={brevity}") |
|
|
| state: WorkflowState = { |
| "user_request": message, |
| "plan": "", |
| "current_role": "", |
| "creative_output": "", |
| "technical_output": "", |
| "research_output": "", |
| "security_output": "", |
| "data_analyst_output": "", |
| "mad_professor_output": "", |
| "accountant_output": "", |
| "artist_output": "", |
| "lazy_slacker_output": "", |
| "black_metal_fundamentalist_output": "", |
| "labour_union_rep_output": "", |
| "ux_designer_output": "", |
| "doris_output": "", |
| "chairman_of_board_output": "", |
| "maga_appointee_output": "", |
| "lawyer_output": "", |
| "synthesis_output": "", |
| "draft_output": "", |
| "qa_report": "", |
| "qa_role_feedback": {}, |
| "qa_passed": False, |
| "revision_count": 0, |
| "final_answer": "", |
| "output_format": output_format, |
| "brevity_requirement": brevity, |
| "qa_structured": None, |
| "task_assumptions": {}, |
| "revision_instruction": "", |
| "structured_contributions": {}, |
| "used_contributions": {}, |
| } |
|
|
| trace: List[str] = [ |
| "โโโ MULTI-ROLE WORKFLOW STARTED โโโ", |
| f"Model : {model_id}", |
| f"Request : {message}", |
| f"Task category: {task_category}", |
| f"Output format: {output_format}", |
| f"Brevity: {brevity}", |
| f"Evidence required: {needs_evidence}", |
| f"Strict mode: {config.strict_mode}", |
| f"Allow persona roles: {config.allow_persona_roles}", |
| f"Max specialists per task: {config.max_specialists_per_task}", |
| f"Max revisions: {MAX_REVISIONS}", |
| ] |
|
|
| |
| evidence: Optional[EvidenceResult] = None |
| if needs_evidence: |
| try: |
| adapters = _build_research_adapters() |
| queries = extract_search_queries(message) |
| evidence = gather_evidence(queries, adapters) |
| planner_state.evidence = evidence.to_dict() |
| trace.append( |
| f"\n[EVIDENCE RETRIEVAL] {len(evidence.results)} items retrieved " |
| f"(confidence: {evidence.confidence}) for queries: {queries}" |
| ) |
| planner_state.record_event("evidence", f"items={len(evidence.results)}, confidence={evidence.confidence}") |
| except Exception as exc: |
| trace.append(f"\n[EVIDENCE RETRIEVAL] Tool error: {exc} โ proceeding without evidence") |
| evidence = EvidenceResult(query=message) |
| planner_state.record_event("evidence_error", str(exc)) |
|
|
| try: |
| if planner_active: |
| state = _step_plan(chat_model, state, trace, |
| enabled_role_keys=active_specialist_keys) |
|
|
| |
| assumptions = parse_task_assumptions(state["plan"]) |
| if assumptions: |
| state["task_assumptions"] = assumptions |
| planner_state.task_assumptions = assumptions |
| trace.append(f"[ASSUMPTIONS] {len(assumptions)} shared assumption(s) set: " |
| + ", ".join(f"{k}={v}" for k, v in assumptions.items())) |
| else: |
| state["current_role"] = active_specialist_keys[0] |
| state["plan"] = message |
| trace.append( |
| f"\n[Planner disabled] Auto-routing to: {state['current_role'].upper()}" |
| ) |
|
|
| |
| if config.strict_mode: |
| selected_roles = select_relevant_roles( |
| message, active_specialist_keys, config, task_category=task_category |
| ) |
| else: |
| selected_roles = active_specialist_keys |
|
|
| |
| primary_role = state["current_role"] |
| if primary_role in active_specialist_keys and primary_role not in selected_roles: |
| selected_roles.insert(0, primary_role) |
|
|
| |
| if len(selected_roles) > config.max_specialists_per_task: |
| |
| if needs_evidence and "research" in selected_roles: |
| non_research = [r for r in selected_roles if r != "research"] |
| selected_roles = non_research[:config.max_specialists_per_task - 1] + ["research"] |
| else: |
| selected_roles = selected_roles[:config.max_specialists_per_task] |
|
|
| planner_state.selected_roles = selected_roles |
| trace.append( |
| f"\n[ROLE SELECTION] {len(selected_roles)} specialist(s) selected: " |
| + ", ".join(AGENT_ROLES.get(k, k) for k in selected_roles) |
| ) |
| |
| if hasattr(selected_roles, 'format_trace'): |
| trace.append(selected_roles.format_trace(AGENT_ROLES)) |
|
|
| |
| if primary_role not in selected_roles: |
| primary_role = selected_roles[0] |
| state["current_role"] = primary_role |
|
|
| |
| assumptions_ctx = format_assumptions_for_prompt(state.get("task_assumptions", {})) |
|
|
| def _run_specialist(role_key): |
| """Run a single specialist, injecting evidence and assumptions as needed.""" |
| if role_key == "research" and evidence: |
| return _step_research(chat_model, state, trace, evidence=evidence) |
| step_fn = _SPECIALIST_STEPS.get(role_key, _step_technical) |
| |
| if assumptions_ctx and assumptions_ctx not in state["plan"]: |
| state["plan"] = state["plan"] + "\n\n" + assumptions_ctx |
| return step_fn(chat_model, state, trace) |
|
|
| |
| state = _run_specialist(primary_role) |
| primary_output = state["draft_output"] |
| planner_state.specialist_outputs[primary_role] = primary_output[:500] |
|
|
| |
| structured_contributions: Dict[str, StructuredContribution] = {} |
| contrib = parse_structured_contribution( |
| primary_output, AGENT_ROLES.get(primary_role, primary_role) |
| ) |
| structured_contributions[primary_role] = contrib |
|
|
| all_outputs: List[Tuple[str, str]] = [(primary_role, primary_output)] |
| for specialist_role in selected_roles: |
| if specialist_role == primary_role: |
| continue |
| state = _run_specialist(specialist_role) |
| output = state["draft_output"] |
| all_outputs.append((specialist_role, output)) |
| planner_state.specialist_outputs[specialist_role] = output[:500] |
| |
| contrib = parse_structured_contribution( |
| output, AGENT_ROLES.get(specialist_role, specialist_role) |
| ) |
| structured_contributions[specialist_role] = contrib |
|
|
| |
| state["structured_contributions"] = { |
| k: v.to_dict() for k, v in structured_contributions.items() |
| } |
| trace.append( |
| f"\n[CONTRIBUTIONS] {len(structured_contributions)} structured contribution(s) parsed" |
| ) |
|
|
| |
| state = _step_synthesize(chat_model, state, trace, all_outputs, |
| evidence=evidence, |
| structured_contributions=structured_contributions) |
|
|
| |
| fmt_violations = validate_output_format( |
| state["draft_output"], output_format, brevity |
| ) |
| if fmt_violations: |
| trace.append( |
| "\n[FORMAT VALIDATION] Violations detected before QA:\n" |
| + "\n".join(f" - {v}" for v in fmt_violations) |
| ) |
| |
| violation_instr = format_violations_instruction(fmt_violations) |
| state["plan"] = state["plan"] + "\n\n" + violation_instr |
| state = _step_synthesize(chat_model, state, trace, all_outputs, |
| evidence=evidence, |
| structured_contributions=structured_contributions) |
| planner_state.record_event("format_rewrite", "; ".join(fmt_violations)) |
| trace.append("[FORMAT VALIDATION] Re-synthesized to fix format violations.") |
|
|
| |
| |
| while True: |
| |
| if qa_active: |
| state = _step_qa(chat_model, state, trace, all_outputs, |
| evidence=evidence, |
| structured_contributions=structured_contributions) |
| else: |
| state["qa_passed"] = True |
| state["qa_report"] = "QA Tester is disabled โ skipping quality review." |
| state["qa_structured"] = {"status": "PASS", "reason": "", "issues": [], "warnings": [], "correction_instruction": ""} |
| trace.append("\n[QA Tester disabled] Skipping quality review โ auto-pass.") |
|
|
| |
| planner_state.current_draft = state["draft_output"] |
| qa_result = parse_structured_qa(state["qa_report"]) if qa_active else QAResult(status="PASS") |
| planner_state.qa_result = qa_result |
| planner_state.record_event("qa", f"status={qa_result.status}") |
|
|
| |
| if not qa_result.passed: |
| planner_state.record_failure(qa_result) |
|
|
| |
| if planner_active and qa_active: |
| is_final_revision = (state["revision_count"] + 1 >= MAX_REVISIONS) and not state["qa_passed"] |
| state = _step_planner_review(chat_model, state, trace, is_final_revision=is_final_revision) |
|
|
| if state["final_answer"]: |
| planner_state.final_answer = state["final_answer"] |
| trace.append("\nโโโ WORKFLOW COMPLETE โ APPROVED โโโ") |
| break |
|
|
| |
| |
| revision_instr = "" |
| if "REVISED INSTRUCTIONS:" in state.get("plan", ""): |
| revision_instr = state["plan"] |
| elif qa_result.correction_instruction: |
| revision_instr = qa_result.correction_instruction |
| state["revision_instruction"] = revision_instr |
| planner_state.revision_instruction = revision_instr |
| planner_state.record_event("revision_instruction_stored", |
| revision_instr[:200] if revision_instr else "MISSING") |
|
|
| state["revision_count"] += 1 |
| planner_state.revision_count = state["revision_count"] |
|
|
| if state["revision_count"] >= MAX_REVISIONS: |
| |
| trace.append( |
| f"\nโโโ MAX REVISIONS ({MAX_REVISIONS}) โ FINAL CORRECTION PASS โโโ" |
| ) |
| state = _step_planner_review(chat_model, state, trace, is_final_revision=True) |
| if not state["final_answer"]: |
| state["final_answer"] = state["draft_output"] |
| planner_state.final_answer = state["final_answer"] |
| trace.append("\nโโโ WORKFLOW COMPLETE โ MAX REVISIONS โโโ") |
| break |
|
|
| |
| escalation = planner_state.get_escalation_strategy() |
| if escalation != "none": |
| trace.append(f"\n[ESCALATION] Strategy: {escalation}") |
| planner_state.record_event("escalation", escalation) |
|
|
| if escalation == "suppress_role": |
| suppress = planner_state.get_roles_to_suppress() |
| for role_label in suppress: |
| role_key = _ROLE_LABEL_TO_KEY.get(role_label) |
| if role_key and role_key in selected_roles: |
| selected_roles.remove(role_key) |
| trace.append(f" โ Suppressed role: {role_label} (repeated failures)") |
| if not selected_roles: |
| selected_roles = [primary_role] |
|
|
| elif escalation == "rewrite_from_state": |
| trace.append(" โ Synthesizer will rewrite from state instead of reusing draft") |
| state["draft_output"] = "" |
|
|
| elif escalation == "narrow_scope": |
| if len(selected_roles) > 1: |
| selected_roles = [selected_roles[0]] |
| trace.append(f" โ Narrowed to single specialist: {selected_roles[0]}") |
|
|
| |
| revision_targets = identify_revision_targets(qa_result, _ROLE_LABEL_TO_KEY) |
| trace.append( |
| f"\nโโโ REVISION {state['revision_count']} / {MAX_REVISIONS} โโโ\n" |
| f"Targeted roles: {', '.join(revision_targets)}" |
| ) |
| planner_state.record_event("revision", f"targets={revision_targets}") |
|
|
| |
| rerun_specialists = [ |
| t for t in revision_targets |
| if t in _SPECIALIST_STEPS and t in selected_roles |
| ] |
| rerun_synthesizer = "synthesizer" in revision_targets or bool(rerun_specialists) |
|
|
| if rerun_specialists: |
| new_outputs = [] |
| for rk in rerun_specialists: |
| state = _run_specialist(rk) |
| new_outputs.append((rk, state["draft_output"])) |
| planner_state.specialist_outputs[rk] = state["draft_output"][:500] |
| |
| contrib = parse_structured_contribution( |
| state["draft_output"], |
| AGENT_ROLES.get(rk, rk), |
| ) |
| structured_contributions[rk] = contrib |
|
|
| |
| updated_keys = {rk for rk, _ in new_outputs} |
| all_outputs = [ |
| (rk, out) for rk, out in all_outputs if rk not in updated_keys |
| ] + new_outputs |
|
|
| |
| state["structured_contributions"] = { |
| k: v.to_dict() for k, v in structured_contributions.items() |
| } |
|
|
| if rerun_synthesizer or rerun_specialists: |
| state = _step_synthesize(chat_model, state, trace, all_outputs, |
| evidence=evidence, |
| structured_contributions=structured_contributions) |
|
|
| |
| fmt_violations = validate_output_format( |
| state["draft_output"], output_format, brevity |
| ) |
| if fmt_violations: |
| trace.append( |
| "\n[FORMAT VALIDATION] Post-revision violations:\n" |
| + "\n".join(f" - {v}" for v in fmt_violations) |
| ) |
| violation_instr = format_violations_instruction(fmt_violations) |
| state["plan"] = state["plan"] + "\n\n" + violation_instr |
| state = _step_synthesize(chat_model, state, trace, all_outputs, |
| evidence=evidence, |
| structured_contributions=structured_contributions) |
|
|
| |
| continue |
|
|
| else: |
| |
| state["final_answer"] = state["draft_output"] |
| planner_state.final_answer = state["final_answer"] |
| trace.append("\nโโโ WORKFLOW COMPLETE โโโ") |
| break |
|
|
| except Exception as exc: |
| trace.append(f"\n[ERROR] {exc}\n{traceback.format_exc()}") |
| state["final_answer"] = state["draft_output"] or f"Workflow error: {exc}" |
|
|
| |
| raw_answer = state["final_answer"] |
| final_answer = compress_final_answer(raw_answer, output_format, brevity, message) |
| final_answer = strip_internal_noise(final_answer) |
|
|
| return final_answer, "\n".join(trace) |
|
|
|
|
| |
| |
| |
|
|
| def build_agent(model_id: str, selected_tool_names: List[str]): |
| tool_key = tuple(sorted(selected_tool_names)) |
| cache_key = (model_id, tool_key) |
|
|
| if cache_key in AGENT_CACHE: |
| return AGENT_CACHE[cache_key] |
|
|
| tools = [ALL_TOOLS[name] for name in selected_tool_names if name in ALL_TOOLS] |
| chat_model = build_provider_chat(model_id) |
|
|
| system_prompt = ( |
| "You are an assistant with tool access. " |
| "Use math tools for calculations. " |
| "Use Wikipedia for stable facts. " |
| "Use web search for recent or changing information. " |
| "Use arXiv for research papers. " |
| "Use stock tools for financial data. " |
| "Generate charts when the user asks for trends or plots. " |
| "If a needed tool is unavailable, say so plainly. " |
| f"You are currently running with provider-backed model='{model_id}'. " |
| "After using tools, always provide a final natural-language answer. " |
| "Do not stop after only issuing a tool call. " |
| "Be concise." |
| ) |
|
|
| agent = create_agent( |
| model=chat_model, |
| tools=tools, |
| system_prompt=system_prompt, |
| ) |
| AGENT_CACHE[cache_key] = agent |
| return agent |
|
|
|
|
| |
| |
| |
|
|
| def classify_backend_error(model_id: str, err: Exception) -> str: |
| text = str(err) |
|
|
| if isinstance(err, HfHubHTTPError): |
| if "model_not_supported" in text or "not supported by any provider" in text: |
| RUNTIME_HEALTH[model_id] = "unavailable" |
| return "This model exists on Hugging Face, but it is not supported by the provider route used by this app." |
| if "401" in text or "403" in text: |
| RUNTIME_HEALTH[model_id] = "gated" |
| return "This model is not accessible with the current Hugging Face token." |
| if "429" in text: |
| RUNTIME_HEALTH[model_id] = "rate_limited" |
| return "This model is being rate-limited right now. Try again shortly or switch model." |
| if "404" in text: |
| RUNTIME_HEALTH[model_id] = "unavailable" |
| return "This model is not available on the current Hugging Face inference route." |
|
|
| RUNTIME_HEALTH[model_id] = "error" |
| return f"Provider error: {err}" |
|
|
| RUNTIME_HEALTH[model_id] = "error" |
| return f"Runtime error: {err}" |
|
|
|
|
| |
| |
| |
|
|
| def build_debug_report( |
| model_id: str, |
| message: str, |
| selected_tools: List[str], |
| messages: List[object], |
| final_answer: str, |
| last_nonempty_ai: Optional[str], |
| last_tool_content: Optional[str], |
| chart_path: Optional[str], |
| ) -> str: |
| lines = [] |
| lines.append("=== DEBUG REPORT ===") |
| lines.append(f"model_id: {model_id}") |
| lines.append(f"user_message: {message}") |
| lines.append(f"selected_tools: {selected_tools}") |
| lines.append(f"client_location_value: {repr(_client_location.get())}") |
| lines.append(f"message_count: {len(messages)}") |
| lines.append(f"chart_path: {chart_path}") |
| lines.append("") |
|
|
| for i, msg in enumerate(messages): |
| msg_type = getattr(msg, "type", type(msg).__name__) |
| raw_content = getattr(msg, "content", "") |
| text_content = content_to_text(raw_content) |
| tool_calls = getattr(msg, "tool_calls", None) |
|
|
| lines.append(f"--- message[{i}] ---") |
| lines.append(f"type: {msg_type}") |
| lines.append(f"content_empty: {not bool(text_content.strip())}") |
| lines.append(f"content_preview: {short_text(text_content, 500)}") |
|
|
| if tool_calls: |
| lines.append(f"tool_calls: {tool_calls}") |
|
|
| additional_kwargs = getattr(msg, "additional_kwargs", None) |
| if additional_kwargs: |
| lines.append(f"additional_kwargs: {additional_kwargs}") |
|
|
| response_metadata = getattr(msg, "response_metadata", None) |
| if response_metadata: |
| lines.append(f"response_metadata: {response_metadata}") |
|
|
| lines.append("") |
|
|
| lines.append("=== SUMMARY ===") |
| lines.append(f"last_nonempty_ai: {short_text(last_nonempty_ai or '', 500)}") |
| lines.append(f"last_tool_content: {short_text(last_tool_content or '', 500)}") |
| lines.append(f"final_answer: {short_text(final_answer or '', 500)}") |
|
|
| if not final_answer or not final_answer.strip(): |
| lines.append("warning: final_answer is empty") |
| if not last_nonempty_ai and last_tool_content: |
| lines.append("warning: model returned tool output but no final AI text") |
| if not last_nonempty_ai and not last_tool_content: |
| lines.append("warning: neither AI text nor tool content was recovered") |
|
|
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def run_agent(message, history, selected_tools, model_id, client_ip: str = ""): |
| history = history or [] |
|
|
| |
| _client_location.set(client_ip.strip() if client_ip else "") |
|
|
| if not message or not str(message).strip(): |
| return history, "No input provided.", "", None, model_status_text(model_id), "No input provided." |
|
|
| if not selected_tools: |
| history.append({"role": "user", "content": message}) |
| history.append({"role": "assistant", "content": "No tools are enabled. Please enable at least one tool."}) |
| return history, "No tools enabled.", "", None, model_status_text(model_id), "No tools enabled." |
|
|
| chart_path = None |
| debug_report = "" |
|
|
| try: |
| agent = build_agent(model_id, selected_tools) |
| response = agent.invoke( |
| {"messages": [{"role": "user", "content": message}]} |
| ) |
|
|
| messages = response.get("messages", []) |
| tool_lines = [] |
|
|
| last_nonempty_ai = None |
| last_tool_content = None |
|
|
| for msg in messages: |
| msg_type = getattr(msg, "type", None) |
| content = content_to_text(getattr(msg, "content", "")) |
|
|
| if msg_type == "ai": |
| if getattr(msg, "tool_calls", None): |
| for tc in msg.tool_calls: |
| tool_name = tc.get("name", "unknown_tool") |
| tool_args = tc.get("args", {}) |
| tool_lines.append(f"โถ {tool_name}({tool_args})") |
|
|
| if content and content.strip(): |
| last_nonempty_ai = content.strip() |
|
|
| elif msg_type == "tool": |
| shortened = short_text(content, 1500) |
| tool_lines.append(f"โ {shortened}") |
|
|
| if content and content.strip(): |
| last_tool_content = content.strip() |
|
|
| maybe_chart = extract_chart_path(content) |
| if maybe_chart: |
| chart_path = maybe_chart |
|
|
| if last_nonempty_ai: |
| final_answer = last_nonempty_ai |
| RUNTIME_HEALTH[model_id] = "ok" |
| elif last_tool_content: |
| final_answer = f"Tool result:\n{last_tool_content}" |
| RUNTIME_HEALTH[model_id] = "empty_final" |
| else: |
| final_answer = "The model used a tool but did not return a final text response." |
| RUNTIME_HEALTH[model_id] = "empty_final" |
|
|
| tool_trace = "\n".join(tool_lines) if tool_lines else "No tools used." |
|
|
| debug_report = build_debug_report( |
| model_id=model_id, |
| message=message, |
| selected_tools=selected_tools, |
| messages=messages, |
| final_answer=final_answer, |
| last_nonempty_ai=last_nonempty_ai, |
| last_tool_content=last_tool_content, |
| chart_path=chart_path, |
| ) |
|
|
| except Exception as e: |
| final_answer = classify_backend_error(model_id, e) |
| tool_trace = "Execution failed." |
| debug_report = ( |
| "=== DEBUG REPORT ===\n" |
| f"model_id: {model_id}\n" |
| f"user_message: {message}\n" |
| f"selected_tools: {selected_tools}\n\n" |
| "=== EXCEPTION ===\n" |
| f"{traceback.format_exc()}\n" |
| ) |
|
|
| history.append({"role": "user", "content": message}) |
| history.append({"role": "assistant", "content": final_answer}) |
|
|
| return history, tool_trace, "", chart_path, model_status_text(model_id), debug_report |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="LLM + Agent tools demo", theme=gr.themes.Soft()) as demo: |
| gr.Markdown( |
| "# Catos agent and tools playground\n" |
| ) |
|
|
| with gr.Tabs(): |
|
|
| |
| with gr.Tab("Agent discussion demo"): |
| gr.Markdown( |
| "## Strict PlannerโSpecialistโSynthesizerโQA Workflow\n" |
| "**Planner** โ **Selected Specialists** โ **Synthesizer** โ **QA** โ **Planner review**\n\n" |
| "The Planner analyses the task, selects only relevant specialists, and enforces QA as binding. " |
| f"If QA fails, targeted revisions loop up to **{MAX_REVISIONS}** times.\n\n" |
| "Use the checkboxes on the right to enable/disable agent roles and configure workflow settings." |
| ) |
|
|
| with gr.Row(): |
| wf_model_dropdown = gr.Dropdown( |
| choices=MODEL_OPTIONS, |
| value=DEFAULT_MODEL_ID, |
| label="Model", |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| wf_input = gr.Textbox( |
| label="Question", |
| placeholder=( |
| "Describe what you want the multi-role team to work onโฆ\n" |
| "e.g. 'Write a short blog post about the benefits of open-source AI'" |
| ), |
| lines=3, |
| ) |
| wf_submit_btn = gr.Button("Run discussion", variant="primary") |
|
|
| with gr.Column(scale=2): |
| active_agents = gr.CheckboxGroup( |
| choices=list(AGENT_ROLES.values()), |
| value=list(AGENT_ROLES.values()), |
| label="Team", |
| ) |
| with gr.Accordion("Workflow Settings", open=False): |
| strict_mode_toggle = gr.Checkbox( |
| value=True, label="Strict mode (select only relevant roles)" |
| ) |
| allow_persona_toggle = gr.Checkbox( |
| value=False, label="Allow persona/gimmick roles" |
| ) |
| max_specialists_slider = gr.Slider( |
| minimum=1, maximum=8, step=1, value=3, |
| label="Max specialists per task" |
| ) |
| require_evidence_toggle = gr.Checkbox( |
| value=True, label="Require evidence for factual claims" |
| ) |
| auto_research_toggle = gr.Checkbox( |
| value=True, label="Auto-include Research for factual tasks" |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| wf_answer = gr.Textbox( |
| label="\u2705 Conclusion (Planner approved)", |
| lines=14, |
| interactive=False, |
| ) |
| with gr.Column(scale=3): |
| wf_trace = gr.Textbox( |
| label="Decision process insight", |
| lines=28, |
| interactive=False, |
| ) |
|
|
| def _run_workflow_ui( |
| message: str, model_id: str, role_labels: List[str], |
| strict_mode: bool, allow_persona: bool, max_specialists: int, |
| require_evidence: bool, auto_research: bool, |
| ) -> Tuple[str, str]: |
| """Gradio handler: validate input, run the workflow, return outputs.""" |
| if not message or not message.strip(): |
| return "No input provided.", "" |
| try: |
| config = WorkflowConfig( |
| strict_mode=strict_mode, |
| allow_persona_roles=allow_persona, |
| max_specialists_per_task=int(max_specialists), |
| require_evidence_for_factual_claims=require_evidence, |
| always_include_research_for_factual_tasks=auto_research, |
| ) |
| final_answer, trace = run_multi_role_workflow( |
| message.strip(), model_id, role_labels, config=config |
| ) |
| return final_answer, trace |
| except Exception as exc: |
| return f"Workflow error: {exc}", traceback.format_exc() |
|
|
| wf_submit_btn.click( |
| fn=_run_workflow_ui, |
| inputs=[wf_input, wf_model_dropdown, active_agents, |
| strict_mode_toggle, allow_persona_toggle, max_specialists_slider, |
| require_evidence_toggle, auto_research_toggle], |
| outputs=[wf_answer, wf_trace], |
| show_api=False, |
| ) |
|
|
| wf_input.submit( |
| fn=_run_workflow_ui, |
| inputs=[wf_input, wf_model_dropdown, active_agents, |
| strict_mode_toggle, allow_persona_toggle, max_specialists_slider, |
| require_evidence_toggle, auto_research_toggle], |
| outputs=[wf_answer, wf_trace], |
| show_api=False, |
| ) |
|
|
| |
| with gr.Tab("Use of tools demo"): |
| with gr.Row(): |
| model_dropdown = gr.Dropdown( |
| choices=MODEL_OPTIONS, |
| value=DEFAULT_MODEL_ID, |
| label="Base model", |
| ) |
| model_status = gr.Textbox( |
| value=model_status_text(DEFAULT_MODEL_ID), |
| label="Model status", |
| interactive=False, |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot(label="Conversation", height=460, type="messages") |
| user_input = gr.Textbox( |
| label="Message", |
| placeholder="Ask anything...", |
| ) |
|
|
| with gr.Row(): |
| send_btn = gr.Button("Send", variant="primary") |
| clear_btn = gr.Button("Clear") |
|
|
| chart_output = gr.Image(label="Generated chart", type="filepath") |
|
|
| with gr.Row(): |
| location_btn = gr.Button("๐ Share my location", size="sm") |
| location_status = gr.Textbox( |
| value="Location not set โ click the button above before asking 'where am I'", |
| label="Location status", |
| interactive=False, |
| max_lines=1, |
| ) |
|
|
| with gr.Column(scale=1): |
| enabled_tools = gr.CheckboxGroup( |
| choices=TOOL_NAMES, |
| value=TOOL_NAMES, |
| label="Enabled tools", |
| ) |
| tool_trace = gr.Textbox( |
| label="Tool trace", |
| lines=18, |
| interactive=False, |
| ) |
|
|
| debug_output = gr.Textbox( |
| label="Debug output", |
| lines=28, |
| interactive=False, |
| ) |
|
|
| |
| client_ip_box = gr.Textbox(visible=False, value="") |
|
|
| model_dropdown.change( |
| fn=model_status_text, |
| inputs=[model_dropdown], |
| outputs=[model_status], |
| show_api=False, |
| ) |
|
|
| |
| location_btn.click( |
| fn=None, |
| inputs=None, |
| outputs=[client_ip_box, location_status], |
| js="""async () => { |
| return new Promise((resolve) => { |
| const fallback = async () => { |
| try { |
| const r = await fetch('https://api.ipify.org?format=json'); |
| const d = await r.json(); |
| resolve(['ip:' + d.ip, 'Location: IP-based fallback (approximate)']); |
| } catch(e) { |
| resolve(['', 'Location detection failed.']); |
| } |
| }; |
| if (!navigator.geolocation) { fallback(); return; } |
| navigator.geolocation.getCurrentPosition( |
| (pos) => { |
| const lat = pos.coords.latitude.toFixed(5); |
| const lon = pos.coords.longitude.toFixed(5); |
| const acc = Math.round(pos.coords.accuracy); |
| resolve([lat + ',' + lon, `\u2705 GPS/WiFi location set (\u00b1${acc}m)`]); |
| }, |
| fallback, |
| {timeout: 10000, maximumAge: 60000, enableHighAccuracy: true} |
| ); |
| }); |
| }""", |
| show_api=False, |
| ) |
|
|
| send_btn.click( |
| fn=run_agent, |
| inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box], |
| outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| show_api=False, |
| ) |
|
|
| user_input.submit( |
| fn=run_agent, |
| inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box], |
| outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| show_api=False, |
| ) |
|
|
| clear_btn.click( |
| fn=lambda model_id: ([], "", "", None, model_status_text(model_id), ""), |
| inputs=[model_dropdown], |
| outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| show_api=False, |
| ) |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| ssr_mode=False, |
| allowed_paths=[os.path.abspath(CHART_DIR)], |
| debug=True, |
| ) |
|
|