Rohan03 commited on
Commit
361d29c
·
verified ·
1 Parent(s): c63c002

Sprint 8: quorum.py — consensus/disagreement topology switches + critic ensemble

Browse files
Files changed (1) hide show
  1. purpose_agent/quorum.py +125 -0
purpose_agent/quorum.py ADDED
@@ -0,0 +1,125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ quorum.py — Consensus and disagreement-driven topology switches.
3
+
4
+ When multiple agents produce outputs:
5
+ - Agreement → merge outputs confidently
6
+ - Disagreement → escalate to critic ensemble or HITL
7
+ - Critical risk → require human approval
8
+
9
+ Critic ensemble personas:
10
+ - Correctness critic
11
+ - Safety/security critic
12
+ - Cost/latency critic
13
+ - User-alignment critic
14
+ """
15
+ from __future__ import annotations
16
+ import logging
17
+ from dataclasses import dataclass, field
18
+ from typing import Any
19
+ from purpose_agent.llm_backend import LLMBackend, ChatMessage
20
+
21
+ logger = logging.getLogger(__name__)
22
+
23
+ class QuorumDecision:
24
+ MERGE = "merge"
25
+ ESCALATE = "escalate"
26
+ HITL = "hitl"
27
+ REJECT = "reject"
28
+
29
+ @dataclass
30
+ class QuorumConfig:
31
+ agreement_threshold: float = 0.7
32
+ disagreement_threshold: float = 0.4
33
+ critical_risk_keywords: list[str] = field(default_factory=lambda: ["delete","drop","remove","destroy","sudo","admin"])
34
+ min_votes: int = 2
35
+
36
+ @dataclass
37
+ class CriticVerdict:
38
+ critic_name: str
39
+ score: float
40
+ reasoning: str
41
+ flags: list[str] = field(default_factory=list)
42
+
43
+ class QuorumCoordinator:
44
+ """
45
+ Decides topology based on agent agreement/disagreement.
46
+
47
+ Usage:
48
+ qc = QuorumCoordinator(config=QuorumConfig())
49
+ decision = qc.evaluate(outputs=["answer_a", "answer_b", "answer_c"])
50
+ if decision == QuorumDecision.MERGE: ...
51
+ elif decision == QuorumDecision.ESCALATE: ...
52
+ """
53
+ def __init__(self, config: QuorumConfig | None = None):
54
+ self.config = config or QuorumConfig()
55
+
56
+ def evaluate(self, outputs: list[str], task: str = "") -> str:
57
+ if len(outputs) < self.config.min_votes:
58
+ return QuorumDecision.MERGE
59
+ # Check critical risk
60
+ combined = " ".join(outputs).lower()
61
+ if any(kw in combined for kw in self.config.critical_risk_keywords):
62
+ return QuorumDecision.HITL
63
+ # Measure agreement (simple: how many outputs share common content)
64
+ agreement = self._measure_agreement(outputs)
65
+ if agreement >= self.config.agreement_threshold:
66
+ return QuorumDecision.MERGE
67
+ elif agreement <= self.config.disagreement_threshold:
68
+ return QuorumDecision.ESCALATE
69
+ return QuorumDecision.MERGE
70
+
71
+ def _measure_agreement(self, outputs: list[str]) -> float:
72
+ if len(outputs) <= 1: return 1.0
73
+ # Simple word-overlap agreement metric
74
+ word_sets = [set(o.lower().split()) for o in outputs]
75
+ if not word_sets: return 0.0
76
+ common = word_sets[0]
77
+ for ws in word_sets[1:]: common = common & ws
78
+ total = set()
79
+ for ws in word_sets: total = total | ws
80
+ return len(common) / max(len(total), 1)
81
+
82
+ class CriticEnsemble:
83
+ """
84
+ Ensemble of specialized critics for multi-perspective evaluation.
85
+
86
+ Usage:
87
+ ensemble = CriticEnsemble(llm=backend)
88
+ verdicts = ensemble.evaluate(output="agent's response", task="original task")
89
+ avg_score = ensemble.aggregate(verdicts)
90
+ """
91
+ CRITICS = [
92
+ ("correctness", "Is the output factually correct and complete?"),
93
+ ("safety", "Does the output contain unsafe, harmful, or policy-violating content?"),
94
+ ("efficiency", "Is the output concise and cost-effective?"),
95
+ ("alignment", "Does the output align with the user's stated purpose?"),
96
+ ]
97
+
98
+ def __init__(self, llm: LLMBackend | None = None):
99
+ self.llm = llm
100
+ self._history: list[list[CriticVerdict]] = []
101
+
102
+ def evaluate(self, output: str, task: str = "") -> list[CriticVerdict]:
103
+ verdicts = []
104
+ for name, question in self.CRITICS:
105
+ if self.llm:
106
+ verdict = self._llm_evaluate(name, question, output, task)
107
+ else:
108
+ verdict = CriticVerdict(critic_name=name, score=0.5, reasoning="No LLM available")
109
+ verdicts.append(verdict)
110
+ self._history.append(verdicts)
111
+ return verdicts
112
+
113
+ def aggregate(self, verdicts: list[CriticVerdict]) -> float:
114
+ if not verdicts: return 0.0
115
+ return sum(v.score for v in verdicts) / len(verdicts)
116
+
117
+ def _llm_evaluate(self, name: str, question: str, output: str, task: str) -> CriticVerdict:
118
+ prompt = f"Task: {task}\nOutput: {output[:500]}\n\nQuestion: {question}\nScore 0-10 and explain briefly."
119
+ try:
120
+ from purpose_agent.robust_parser import extract_number
121
+ raw = self.llm.generate([ChatMessage(role="user", content=prompt)], temperature=0.2, max_tokens=300)
122
+ score = extract_number(raw, "score", 5.0) / 10.0
123
+ return CriticVerdict(critic_name=name, score=min(1.0, max(0.0, score)), reasoning=raw[:200])
124
+ except:
125
+ return CriticVerdict(critic_name=name, score=0.5, reasoning="Evaluation failed")