File size: 4,806 Bytes
361d29c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""
quorum.py — Consensus and disagreement-driven topology switches.

When multiple agents produce outputs:
  - Agreement → merge outputs confidently
  - Disagreement → escalate to critic ensemble or HITL
  - Critical risk → require human approval

Critic ensemble personas:
  - Correctness critic
  - Safety/security critic
  - Cost/latency critic
  - User-alignment critic
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.llm_backend import LLMBackend, ChatMessage

logger = logging.getLogger(__name__)

class QuorumDecision:
    MERGE = "merge"
    ESCALATE = "escalate"
    HITL = "hitl"
    REJECT = "reject"

@dataclass
class QuorumConfig:
    agreement_threshold: float = 0.7
    disagreement_threshold: float = 0.4
    critical_risk_keywords: list[str] = field(default_factory=lambda: ["delete","drop","remove","destroy","sudo","admin"])
    min_votes: int = 2

@dataclass
class CriticVerdict:
    critic_name: str
    score: float
    reasoning: str
    flags: list[str] = field(default_factory=list)

class QuorumCoordinator:
    """
    Decides topology based on agent agreement/disagreement.

    Usage:
        qc = QuorumCoordinator(config=QuorumConfig())
        decision = qc.evaluate(outputs=["answer_a", "answer_b", "answer_c"])
        if decision == QuorumDecision.MERGE: ...
        elif decision == QuorumDecision.ESCALATE: ...
    """
    def __init__(self, config: QuorumConfig | None = None):
        self.config = config or QuorumConfig()

    def evaluate(self, outputs: list[str], task: str = "") -> str:
        if len(outputs) < self.config.min_votes:
            return QuorumDecision.MERGE
        # Check critical risk
        combined = " ".join(outputs).lower()
        if any(kw in combined for kw in self.config.critical_risk_keywords):
            return QuorumDecision.HITL
        # Measure agreement (simple: how many outputs share common content)
        agreement = self._measure_agreement(outputs)
        if agreement >= self.config.agreement_threshold:
            return QuorumDecision.MERGE
        elif agreement <= self.config.disagreement_threshold:
            return QuorumDecision.ESCALATE
        return QuorumDecision.MERGE

    def _measure_agreement(self, outputs: list[str]) -> float:
        if len(outputs) <= 1: return 1.0
        # Simple word-overlap agreement metric
        word_sets = [set(o.lower().split()) for o in outputs]
        if not word_sets: return 0.0
        common = word_sets[0]
        for ws in word_sets[1:]: common = common & ws
        total = set()
        for ws in word_sets: total = total | ws
        return len(common) / max(len(total), 1)

class CriticEnsemble:
    """
    Ensemble of specialized critics for multi-perspective evaluation.

    Usage:
        ensemble = CriticEnsemble(llm=backend)
        verdicts = ensemble.evaluate(output="agent's response", task="original task")
        avg_score = ensemble.aggregate(verdicts)
    """
    CRITICS = [
        ("correctness", "Is the output factually correct and complete?"),
        ("safety", "Does the output contain unsafe, harmful, or policy-violating content?"),
        ("efficiency", "Is the output concise and cost-effective?"),
        ("alignment", "Does the output align with the user's stated purpose?"),
    ]

    def __init__(self, llm: LLMBackend | None = None):
        self.llm = llm
        self._history: list[list[CriticVerdict]] = []

    def evaluate(self, output: str, task: str = "") -> list[CriticVerdict]:
        verdicts = []
        for name, question in self.CRITICS:
            if self.llm:
                verdict = self._llm_evaluate(name, question, output, task)
            else:
                verdict = CriticVerdict(critic_name=name, score=0.5, reasoning="No LLM available")
            verdicts.append(verdict)
        self._history.append(verdicts)
        return verdicts

    def aggregate(self, verdicts: list[CriticVerdict]) -> float:
        if not verdicts: return 0.0
        return sum(v.score for v in verdicts) / len(verdicts)

    def _llm_evaluate(self, name: str, question: str, output: str, task: str) -> CriticVerdict:
        prompt = f"Task: {task}\nOutput: {output[:500]}\n\nQuestion: {question}\nScore 0-10 and explain briefly."
        try:
            from purpose_agent.robust_parser import extract_number
            raw = self.llm.generate([ChatMessage(role="user", content=prompt)], temperature=0.2, max_tokens=300)
            score = extract_number(raw, "score", 5.0) / 10.0
            return CriticVerdict(critic_name=name, score=min(1.0, max(0.0, score)), reasoning=raw[:200])
        except:
            return CriticVerdict(critic_name=name, score=0.5, reasoning="Evaluation failed")