openenv / sentinel /feedback.py
sentinel-space-publisher
space: publish latest Sentinel app snapshot
c452421
"""SENTINEL feedback memory.
Tracks two aligned feedback loops:
1. Global oversight lessons (what SENTINEL keeps learning across episodes)
2. Per-worker mistake memory (what each worker repeatedly gets wrong)
This memory is used in two places:
- Runtime: better explanations, reassignment hints, and worker-pattern summaries
- Training: prompt context so the overseer sees recurring mistakes and corrections
"""
from __future__ import annotations
import json
import os
from typing import Any, Dict, Iterable, List, Optional
from sentinel.models import WorkerId
DEFAULT_FEEDBACK_PATH = os.path.join("outputs", "sentinel_feedback_memory.json")
MAX_EVENTS = 200
MAX_ITEMS_PER_LIST = 20
def empty_feedback_memory() -> Dict[str, Any]:
return {
"version": 1,
"total_events": 0,
"global_mistakes": [],
"global_corrections": [],
"global_rehabilitations": [],
"task_notes": {},
"worker_profiles": {},
"events": [],
}
def load_feedback_memory(path: str = DEFAULT_FEEDBACK_PATH) -> Dict[str, Any]:
if not os.path.exists(path):
return empty_feedback_memory()
try:
with open(path, encoding="utf-8") as handle:
data = json.load(handle)
except Exception:
return empty_feedback_memory()
return _normalize_memory(data)
def save_feedback_memory(memory: Dict[str, Any], path: str = DEFAULT_FEEDBACK_PATH) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
trimmed = _normalize_memory(memory)
trimmed["events"] = trimmed.get("events", [])[-MAX_EVENTS:]
with open(path, "w", encoding="utf-8") as handle:
json.dump(trimmed, handle, indent=2)
def record_feedback_event(
memory: Dict[str, Any],
event: Dict[str, Any],
) -> Dict[str, Any]:
memory = _normalize_memory(memory)
worker_id = str(event.get("worker_id") or "unknown")
task_id = str(event.get("task_id") or "unknown")
incident_label = str(event.get("incident_label") or event.get("incident_id") or "incident")
decision = str(event.get("decision") or "")
reason = str(event.get("reason") or "")
action_type = str(event.get("action_type") or "")
target = str(event.get("target") or "N/A")
signature = f"{action_type}:{target}"
profile = _worker_profile(memory, worker_id)
task_notes = _task_notes(memory, task_id)
mistake_line = (
f"{reason or 'unsafe_pattern'} via {signature} on {incident_label}"
if event.get("is_misbehavior")
else ""
)
correction_line = _correction_line(event)
rehabilitation_line = _rehabilitation_line(event)
safe_line = (
f"safe {signature} approved on {incident_label}"
if not event.get("is_misbehavior") and decision == "APPROVE"
else ""
)
if mistake_line:
_append_unique(profile["mistakes"], mistake_line)
_append_unique(memory["global_mistakes"], mistake_line)
_append_unique(task_notes["mistakes"], mistake_line)
profile["violation_counts"][reason or "unknown"] = (
profile["violation_counts"].get(reason or "unknown", 0) + 1
)
if correction_line:
_append_unique(profile["corrections"], correction_line)
_append_unique(memory["global_corrections"], correction_line)
_append_unique(task_notes["corrections"], correction_line)
if rehabilitation_line:
_append_unique(profile["rehabilitations"], rehabilitation_line)
_append_unique(memory["global_rehabilitations"], rehabilitation_line)
_append_unique(task_notes["rehabilitations"], rehabilitation_line)
if safe_line:
_append_unique(profile["successes"], safe_line)
reassign_to = event.get("reassign_to")
if reassign_to:
key = str(reassign_to)
profile["preferred_reassignments"][key] = (
profile["preferred_reassignments"].get(key, 0) + 1
)
if event.get("suggested_reassign_to"):
suggested = str(event["suggested_reassign_to"])
profile["suggested_targets"][suggested] = (
profile["suggested_targets"].get(suggested, 0) + 1
)
profile["last_feedback"] = rehabilitation_line or correction_line or mistake_line or safe_line
profile["last_task_id"] = task_id
profile["last_incident_id"] = str(event.get("incident_id") or "")
_append_unique(
profile["recent_incidents"],
f"{incident_label}:{decision or 'unknown'}:{signature}",
)
memory["events"].append(
{
"task_id": task_id,
"incident_id": event.get("incident_id"),
"incident_label": event.get("incident_label"),
"worker_id": worker_id,
"decision": decision,
"reason": reason,
"action_type": action_type,
"target": target,
"is_misbehavior": bool(event.get("is_misbehavior")),
"revision_attempted": bool(event.get("revision_attempted")),
"revision_approved": bool(event.get("revision_approved")),
"revised_by": event.get("revised_by"),
"revised_action_type": event.get("revised_action_type"),
"revised_target": event.get("revised_target"),
"executed_action_source": event.get("executed_action_source"),
}
)
memory["events"] = memory["events"][-MAX_EVENTS:]
memory["total_events"] = int(memory.get("total_events", 0)) + 1
return memory
def record_episode_feedback(
memory: Dict[str, Any],
task_id: str,
history: Iterable[Dict[str, Any]],
) -> Dict[str, Any]:
updated = _normalize_memory(memory)
for entry in history:
audit = entry.get("audit") or {}
if not audit:
continue
info = entry.get("info") or {}
decision = entry.get("decision") or {}
updated = record_feedback_event(
updated,
{
"task_id": task_id,
"incident_id": audit.get("incident_id"),
"incident_label": audit.get("incident_label"),
"worker_id": audit.get("worker_id"),
"decision": audit.get("sentinel_decision") or decision.get("action") or decision.get("decision"),
"reason": audit.get("reason") or decision.get("reason"),
"action_type": audit.get("proposed_action_type"),
"target": audit.get("proposed_target"),
"is_misbehavior": audit.get("was_misbehavior"),
"reassign_to": audit.get("reassign_to") or decision.get("reassign_to"),
"suggested_reassign_to": info.get("feedback_memory", {}).get("suggested_reassign_to"),
"constitutional_violations": audit.get("constitutional_violations", []),
"revision_attempted": (entry.get("worker_revision") or {}).get("attempted"),
"revision_approved": (entry.get("worker_revision") or {}).get("revision_approved"),
"revised_by": (entry.get("worker_revision") or {}).get("revised_by"),
"revised_action_type": ((entry.get("worker_revision") or {}).get("revised_proposal") or {}).get("action_type"),
"revised_target": ((entry.get("worker_revision") or {}).get("revised_proposal") or {}).get("target"),
"executed_action_source": (entry.get("executed_action") or {}).get("source"),
},
)
return updated
def build_feedback_summary(
memory: Dict[str, Any],
worker_id: Optional[str] = None,
task_id: Optional[str] = None,
available_workers: Optional[Iterable[Any]] = None,
) -> Dict[str, Any]:
memory = _normalize_memory(memory)
profile = _worker_profile(memory, worker_id) if worker_id else None
task_notes = _task_notes(memory, task_id) if task_id else {"mistakes": [], "corrections": []}
summary = {
"global_mistakes": list(memory.get("global_mistakes", [])[-3:]),
"global_corrections": list(memory.get("global_corrections", [])[-3:]),
"global_rehabilitations": list(memory.get("global_rehabilitations", [])[-2:]),
"task_mistakes": list(task_notes.get("mistakes", [])[-2:]),
"task_corrections": list(task_notes.get("corrections", [])[-2:]),
"task_rehabilitations": list(task_notes.get("rehabilitations", [])[-2:]),
"worker_mistakes": list(profile.get("mistakes", [])[-3:]) if profile else [],
"worker_successes": list(profile.get("successes", [])[-2:]) if profile else [],
"worker_rehabilitations": list(profile.get("rehabilitations", [])[-2:]) if profile else [],
"last_feedback": profile.get("last_feedback", "") if profile else "",
}
suggested = recommended_reassign_to(memory, worker_id, available_workers=available_workers)
if suggested:
summary["suggested_reassign_to"] = suggested
if profile and profile.get("violation_counts"):
top_violation = max(
profile["violation_counts"].items(),
key=lambda item: item[1],
)[0]
summary["top_violation"] = top_violation
return summary
def build_feedback_context(
memory: Dict[str, Any],
task_id: Optional[str] = None,
worker_ids: Optional[Iterable[Any]] = None,
) -> str:
memory = _normalize_memory(memory)
lines: List[str] = ["## FEEDBACK LOOP MEMORY"]
if memory.get("global_mistakes"):
lines.append("Global mistakes to avoid:")
for item in memory["global_mistakes"][-3:]:
lines.append(f" - {item}")
if memory.get("global_corrections"):
lines.append("Global corrections that worked:")
for item in memory["global_corrections"][-3:]:
lines.append(f" - {item}")
if memory.get("global_rehabilitations"):
lines.append("Rehabilitations that worked after supervisor feedback:")
for item in memory["global_rehabilitations"][-2:]:
lines.append(f" - {item}")
if task_id:
task_notes = _task_notes(memory, task_id)
if task_notes["mistakes"] or task_notes["corrections"] or task_notes["rehabilitations"]:
lines.append(f"Task memory for {task_id}:")
for item in task_notes["mistakes"][-2:]:
lines.append(f" - Avoid: {item}")
for item in task_notes["corrections"][-2:]:
lines.append(f" - Prefer: {item}")
for item in task_notes["rehabilitations"][-2:]:
lines.append(f" - Rehabilitation: {item}")
for worker in list(worker_ids or [])[:4]:
worker_key = str(worker.value if isinstance(worker, WorkerId) else worker)
profile = _worker_profile(memory, worker_key)
if not profile["mistakes"] and not profile["successes"]:
continue
lines.append(f"Worker profile {worker_key}:")
for item in profile["mistakes"][-2:]:
lines.append(f" - Repeated mistake: {item}")
for item in profile["successes"][-1:]:
lines.append(f" - Reliable pattern: {item}")
for item in profile["rehabilitations"][-1:]:
lines.append(f" - Rehab pattern: {item}")
suggested = recommended_reassign_to(memory, worker_key)
if suggested:
lines.append(f" - Best reassignment target so far: {suggested}")
return "" if len(lines) == 1 else "\n".join(lines)
def recommended_reassign_to(
memory: Dict[str, Any],
worker_id: Optional[str],
available_workers: Optional[Iterable[Any]] = None,
) -> Optional[str]:
if not worker_id:
return None
memory = _normalize_memory(memory)
profile = _worker_profile(memory, worker_id)
candidates = {
**profile.get("preferred_reassignments", {}),
**{
key: profile.get("suggested_targets", {}).get(key, 0)
+ profile.get("preferred_reassignments", {}).get(key, 0)
for key in set(profile.get("suggested_targets", {})) | set(profile.get("preferred_reassignments", {}))
},
}
allowed = {
str(item.value if isinstance(item, WorkerId) else item)
for item in (available_workers or [])
}
best: Optional[str] = None
best_score = -1
for candidate, score in candidates.items():
if candidate == worker_id:
continue
if allowed and candidate not in allowed:
continue
if score > best_score:
best = candidate
best_score = score
return best
def _normalize_memory(memory: Optional[Dict[str, Any]]) -> Dict[str, Any]:
data = dict(empty_feedback_memory())
if memory:
data.update(memory)
data.setdefault("task_notes", {})
data.setdefault("worker_profiles", {})
data.setdefault("events", [])
data.setdefault("global_mistakes", [])
data.setdefault("global_corrections", [])
data.setdefault("global_rehabilitations", [])
data.setdefault("total_events", 0)
return data
def _task_notes(memory: Dict[str, Any], task_id: Optional[str]) -> Dict[str, Any]:
key = task_id or "unknown"
notes = memory["task_notes"].setdefault(
key,
{"mistakes": [], "corrections": [], "rehabilitations": []},
)
notes.setdefault("mistakes", [])
notes.setdefault("corrections", [])
notes.setdefault("rehabilitations", [])
return notes
def _worker_profile(memory: Dict[str, Any], worker_id: Optional[str]) -> Dict[str, Any]:
key = worker_id or "unknown"
profile = memory["worker_profiles"].setdefault(
key,
{
"mistakes": [],
"successes": [],
"corrections": [],
"rehabilitations": [],
"violation_counts": {},
"preferred_reassignments": {},
"suggested_targets": {},
"recent_incidents": [],
"last_feedback": "",
"last_task_id": "",
"last_incident_id": "",
},
)
profile.setdefault("mistakes", [])
profile.setdefault("successes", [])
profile.setdefault("corrections", [])
profile.setdefault("rehabilitations", [])
profile.setdefault("violation_counts", {})
profile.setdefault("preferred_reassignments", {})
profile.setdefault("suggested_targets", {})
profile.setdefault("recent_incidents", [])
profile.setdefault("last_feedback", "")
return profile
def _append_unique(items: List[str], value: str) -> None:
value = value.strip()
if not value:
return
if value in items:
items.remove(value)
items.append(value)
del items[:-MAX_ITEMS_PER_LIST]
def _correction_line(event: Dict[str, Any]) -> str:
decision = str(event.get("decision") or "")
reason = str(event.get("reason") or "unsafe_pattern")
target = str(event.get("target") or "N/A")
if decision == "BLOCK":
return f"BLOCK {reason} on {target} until evidence is present."
if decision == "REDIRECT":
return f"REDIRECT {reason} on {target} to a lower-blast-radius action."
if decision == "REASSIGN":
assignee = event.get("reassign_to") or event.get("suggested_reassign_to")
if assignee:
return f"REASSIGN {reason} on {target} to {assignee}."
return f"REASSIGN {reason} on {target} to the domain owner."
if decision == "FLAG":
return f"FLAG suspicious {reason} pattern on {target} for follow-up."
return ""
def _rehabilitation_line(event: Dict[str, Any]) -> str:
if not event.get("revision_attempted") or not event.get("revision_approved"):
return ""
revised_by = str(event.get("revised_by") or event.get("worker_id") or "worker")
revised_action = str(event.get("revised_action_type") or event.get("action_type") or "action")
revised_target = str(event.get("revised_target") or event.get("target") or "N/A")
source = str(event.get("executed_action_source") or "revised")
return (
f"{revised_by} recovered safely with {revised_action}:{revised_target} "
f"(executed via {source}) after supervisor feedback."
)