"""Governance, validation, and coordination logic for MolForge.""" from __future__ import annotations from typing import Any, Dict, List, Mapping, Optional from .shared import ( DEFAULT_TOOL_COSTS, EDITABLE_SLOTS, FRAGMENT_LIBRARY, ROLE_MESSAGE_TYPES, ROLE_PERMISSIONS, ) try: from ..models import GovernanceStatus, MolForgeAction, RewardComponent except ImportError: from models import GovernanceStatus, MolForgeAction, RewardComponent class MolForgeGovernanceMixin: """Validation and multi-agent review methods.""" def _validate_action(self, action: MolForgeAction) -> Optional[tuple[str, str]]: if action.action_type not in self._scenario.enabled_actions: return "ACTION_DISABLED", f"{action.action_type} is disabled for this scenario." if action.acting_role not in self._scenario.enabled_roles: return "ROLE_DISABLED", f"{action.acting_role} is not enabled for this scenario." allowed_actions = ROLE_PERMISSIONS.get(action.acting_role, []) if action.action_type not in allowed_actions: return ( "ROLE_PERMISSION_DENIED", f"{action.acting_role} is not permitted to execute {action.action_type}.", ) if len(action.messages) > self._scenario.max_messages_per_turn: return ( "MESSAGE_LIMIT_EXCEEDED", f"At most {self._scenario.max_messages_per_turn} messages may be sent per turn.", ) seen_senders = set() for message in action.messages: if message.sender not in self._scenario.enabled_roles: return "MESSAGE_ROLE_INVALID", f"{message.sender} is not enabled in this scenario." if message.sender in seen_senders: return ( "DUPLICATE_ROLE_MESSAGE", f"Each specialist may emit at most one message per turn; duplicate from {message.sender}.", ) seen_senders.add(message.sender) if message.message_type not in ROLE_MESSAGE_TYPES.get(message.sender, []): return ( "MESSAGE_PERMISSION_DENIED", f"{message.sender} cannot emit message type {message.message_type}.", ) if action.action_type == "edit": if action.slot is None or action.edit_type is None: return "MISSING_EDIT_FIELDS", "Edit actions require both slot and edit_type." if action.slot not in EDITABLE_SLOTS: return "INVALID_SLOT", f"{action.slot} is not editable in MolForge." if action.edit_type in {"add_fragment", "substitute"} and not action.fragment: return "MISSING_FRAGMENT", "Edit actions require a fragment for add/substitute." if action.fragment: if action.fragment not in FRAGMENT_LIBRARY[action.slot]: return "UNKNOWN_FRAGMENT", f"{action.fragment} is not valid for slot {action.slot}." if self._molecule[action.slot] == action.fragment: return "NO_STATE_CHANGE", "Edit selected the fragment already present in that slot." if action.action_type == "run_assay": if action.tool_name is None: return "MISSING_TOOL_NAME", "run_assay actions require a tool_name." if action.tool_name not in self._scenario.enabled_tools: return "TOOL_DISABLED", f"{action.tool_name} is not enabled for this scenario." cost = DEFAULT_TOOL_COSTS[action.tool_name] if self._state.remaining_budget < cost: return "BUDGET_EXCEEDED", f"{action.tool_name} costs {cost}, exceeding remaining budget." if action.action_type == "restart": if self._restart_used: return "RESTART_ALREADY_USED", "restart_from_new_scaffold may be used at most once per episode." if self._state.remaining_budget < 350: return "BUDGET_EXCEEDED", "Not enough budget remains to restart from a new scaffold." return None def _assess_governance( self, action: MolForgeAction, previous_properties: Mapping[str, float], ) -> tuple[GovernanceStatus, List[RewardComponent], bool]: reward_components: List[RewardComponent] = [] approvals: List[str] = [] objections: List[str] = [] vetoes: List[str] = [] required_roles = ( [] if action.action_type == "defer" else [role for role in self._scenario.required_review_roles if role != action.acting_role] ) policy_veto = False current_signature = self._molecule_signature() simulated_properties = self._simulate_action_properties(action) sender_map = {message.sender: message for message in action.messages} actor_message = sender_map.get(action.acting_role) if action.action_type != "defer": if actor_message and actor_message.message_type == "proposal": self._record_message(actor_message) reward_components.append( RewardComponent( name="proposal_logged", value=0.05, explanation=f"{action.acting_role} logged a structured proposal before execution.", ) ) self._role_metrics[action.acting_role]["correct_messages"] += 1 else: reward_components.append( RewardComponent( name="missing_proposal", value=-0.06, explanation="The acting specialist did not provide an explicit proposal message.", ) ) for role in required_roles: expected = self._expected_feedback(role, action, previous_properties, simulated_properties) actual = sender_map.get(role) if actual is None: reward_components.append( RewardComponent( name=f"missing_review_{role}", value=-0.08, explanation=f"{role} did not provide the required review for this turn.", ) ) if expected["hard_veto"]: policy_veto = True vetoes.append(role) continue if role != action.acting_role: self._record_message(actual) if self._matches_feedback(actual.message_type, expected["type"]): reward_components.append( RewardComponent( name=f"coordination_{role}", value=0.12, explanation=expected["reason"], ) ) self._role_metrics[role]["correct_messages"] += 1 if expected["type"] in {"approval", "submission_recommendation"}: approvals.append(role) else: objections.append(role) elif expected["type"] == "neutral": reward_components.append( RewardComponent( name=f"unnecessary_message_{role}", value=-0.02, explanation=f"{role} contributed a message even though no strong intervention was needed.", ) ) self._role_metrics[role]["incorrect_messages"] += 1 else: reward_components.append( RewardComponent( name=f"misaligned_review_{role}", value=-0.1, explanation=( f"{role} sent {actual.message_type}, but the hidden environment evaluation " f"expected {expected['type']}." ), ) ) self._role_metrics[role]["incorrect_messages"] += 1 if expected["hard_veto"]: policy_veto = True vetoes.append(role) if expected["hard_veto"] and actual and self._matches_feedback(actual.message_type, expected["type"]): policy_veto = True vetoes.append(role) extra_roles = { sender for sender in sender_map if sender not in required_roles and sender != action.acting_role } for role in sorted(extra_roles): self._record_message(sender_map[role]) reward_components.append( RewardComponent( name=f"optional_review_{role}", value=0.02, explanation=f"{role} added optional context for the current decision.", ) ) self._role_metrics[role]["correct_messages"] += 1 if policy_veto: reward_components.append( RewardComponent( name="policy_veto", value=-0.35, explanation="A specialist raised a valid hard veto, so the action was blocked.", ) ) explanation = ( "Team review approved the decision." if not policy_veto else f"Action blocked after hard veto from {', '.join(vetoes)}." ) status = "policy_veto" if policy_veto else "ready" return ( GovernanceStatus( status=status, explanation=explanation, required_roles=required_roles, approvals=approvals, objections=objections, vetoes=vetoes, executable=not policy_veto, ), reward_components, policy_veto, ) def _expected_feedback( self, role: str, action: MolForgeAction, previous_properties: Mapping[str, float], simulated_properties: Mapping[str, float], ) -> Dict[str, Any]: tox_threshold = self._scenario.hard_constraints.get("toxicity_max", 0.45) synth_threshold = self._scenario.hard_constraints.get("synth_min", 0.56) current_signature = self._molecule_signature() known_toxicity = self._current_property_estimate("toxicity", current_signature) known_synth = self._current_property_estimate("synth", current_signature) neutral = {"type": "neutral", "hard_veto": False, "reason": f"{role} had no strong intervention."} if role == "toxicologist": if action.action_type == "run_assay": if action.tool_name == "assay_toxicity": return { "type": "approval", "hard_veto": False, "reason": "Toxicologist correctly encouraged a direct safety assay.", } return { "type": "approval", "hard_veto": False, "reason": "Toxicologist correctly allowed an assay that improves decision evidence without increasing chemical risk.", } if action.action_type == "submit": if known_toxicity is not None and known_toxicity > tox_threshold: return { "type": "objection", "hard_veto": simulated_properties["toxicity"] > tox_threshold, "reason": "Toxicologist correctly objected to a submission with visible toxicity evidence above threshold.", } if simulated_properties["toxicity"] > tox_threshold: return { "type": "objection", "hard_veto": True, "reason": "Toxicologist correctly blocked an unsafe submission.", } if known_toxicity is None: return { "type": "assay_request", "hard_veto": True, "reason": "Toxicologist correctly demanded explicit toxicity evidence before submission.", } return { "type": "approval", "hard_veto": False, "reason": "Toxicologist correctly approved a submission with acceptable visible safety evidence.", } if action.action_type in {"edit", "restart"}: toxicity_delta = simulated_properties["toxicity"] - previous_properties["toxicity"] if toxicity_delta > 0.08: return { "type": "objection", "hard_veto": True, "reason": "Toxicologist correctly raised a hard objection to a major safety regression.", } if ( simulated_properties["toxicity"] > tox_threshold + 0.02 and toxicity_delta >= -0.02 ): return { "type": "objection", "hard_veto": True, "reason": "Toxicologist correctly blocked a move that left an unsafe scaffold unimproved.", } if simulated_properties["toxicity"] > tox_threshold + 0.02: return { "type": "approval", "hard_veto": False, "reason": "Toxicologist correctly allowed a risk-reducing move while residual safety work remains.", } return { "type": "approval", "hard_veto": False, "reason": "Toxicologist correctly approved a safety-compatible move.", } return neutral if role == "assay_planner": if action.action_type == "run_assay": info_gain = self._estimate_information_gain(action.tool_name or "") prior_runs = self._assay_runs.get(f"{current_signature}::{action.tool_name}", 0) if (action.tool_name == "run_md_simulation" and self._state.remaining_budget < 4500) or ( prior_runs > 0 and info_gain < 0.05 ): return { "type": "rejection", "hard_veto": True, "reason": "Assay Planner correctly blocked a wasteful or over-expensive assay.", } if info_gain < 0.04 and action.tool_name != "search_literature": return { "type": "rejection", "hard_veto": False, "reason": "Assay Planner correctly questioned a low-value assay.", } return { "type": "approval", "hard_veto": False, "reason": "Assay Planner correctly approved an information-efficient assay.", } if action.action_type == "submit": required_props = ["potency", "toxicity"] if "synth_min" in self._scenario.hard_constraints: required_props.append("synth") missing = [ prop for prop in required_props if self._current_property_estimate(prop, current_signature) is None ] if missing: return { "type": "assay_request", "hard_veto": True, "reason": "Assay Planner correctly asked for missing evidence before submission.", } return { "type": "approval", "hard_veto": False, "reason": "Assay Planner correctly approved a well-supported submission.", } if action.action_type == "restart": potency_threshold = self._scenario.hard_constraints.get("potency_min", 0.72) if self._scenario.trap_penalty and previous_properties["potency"] < potency_threshold: return { "type": "approval", "hard_veto": False, "reason": "Assay Planner correctly endorsed escaping a low-value scaffold family.", } return { "type": "rejection", "hard_veto": False, "reason": "Assay Planner correctly questioned an unnecessary restart.", } if action.action_type == "edit": return { "type": "approval", "hard_veto": False, "reason": "Assay Planner correctly approved a low-cost edit before spending assay budget.", } return neutral if role == "process_chemist": if action.action_type == "run_assay": if action.tool_name == "estimate_synthesizability": return { "type": "approval", "hard_veto": False, "reason": "Process Chemist correctly requested explicit synthesizeability evidence.", } return { "type": "approval", "hard_veto": False, "reason": "Process Chemist correctly allowed an assay that does not worsen route feasibility.", } if action.action_type == "submit": if known_synth is not None and known_synth < synth_threshold: return { "type": "objection", "hard_veto": simulated_properties["synth"] < synth_threshold, "reason": "Process Chemist correctly objected to a submission with visible route evidence below threshold.", } if simulated_properties["synth"] < synth_threshold: return { "type": "objection", "hard_veto": "synth_min" in self._scenario.hard_constraints, "reason": "Process Chemist correctly blocked a submission that looks infeasible to make.", } if known_synth is None: return { "type": "assay_request", "hard_veto": False, "reason": "Process Chemist correctly asked for synthesizeability evidence before submission.", } return { "type": "approval", "hard_veto": False, "reason": "Process Chemist correctly approved a feasible-looking submission.", } if action.action_type in {"edit", "restart"}: if simulated_properties["synth"] < synth_threshold - 0.03: return { "type": "objection", "hard_veto": False, "reason": "Process Chemist correctly flagged a severe feasibility regression.", } if previous_properties["synth"] - simulated_properties["synth"] > 0.08: return { "type": "objection", "hard_veto": False, "reason": "Process Chemist correctly objected to a less tractable route.", } return { "type": "approval", "hard_veto": False, "reason": "Process Chemist correctly approved a tractable chemistry move.", } return neutral return neutral @staticmethod def _matches_feedback(actual_type: str, expected_type: str) -> bool: if expected_type == "neutral": return False if expected_type == "approval": return actual_type in {"approval", "submission_recommendation"} if expected_type == "objection": return actual_type in {"objection", "risk_flag", "rejection"} if expected_type == "rejection": return actual_type in {"rejection", "objection"} if expected_type == "assay_request": return actual_type == "assay_request" return actual_type == expected_type def _evaluate_reasoning_consistency( self, action: MolForgeAction, previous_properties: Mapping[str, float], current_properties: Mapping[str, float], reward_components: List[RewardComponent], ) -> float: del previous_properties, current_properties rationale = action.rationale.lower().strip() evidence = [item.lower().strip() for item in action.evidence if item.strip()] expected_effects = {key: value for key, value in action.expected_effects.items() if value} score = 0.0 explanations = [] if rationale: score += 0.02 explanations.append("short rationale present") else: score -= 0.03 explanations.append("missing rationale") if evidence: grounded = sum(1 for item in evidence if self._evidence_item_is_visible(item)) score += min(grounded, 3) * 0.015 if grounded < len(evidence): score -= min(len(evidence) - grounded, 2) * 0.02 explanations.append(f"{grounded}/{len(evidence)} evidence item(s) matched visible state") else: score -= 0.03 explanations.append("missing visible evidence") if expected_effects: plausible = sum( 1 for metric, direction in expected_effects.items() if self._expected_effect_is_plausible(action, metric, direction) ) checked = len(expected_effects) score += min(plausible, 3) * 0.01 if plausible < checked: score -= min(checked - plausible, 2) * 0.015 explanations.append(f"{plausible}/{checked} expected effect(s) were directionally plausible") else: score -= 0.02 explanations.append("missing expected effects") score = max(-0.04, min(0.04, score)) if score != 0.0: reward_components.append( RewardComponent( name="reasoning_grounding", value=round(score, 4), explanation="; ".join(explanations), ) ) return score def _evidence_item_is_visible(self, item: str) -> bool: if not item: return False visible_terms = { self._scenario.scenario_id.lower(), self._scenario.difficulty.lower(), self._molecule_signature().lower(), str(self._state.remaining_budget), str(self._state.max_budget), str(self._state.step_count), str(self._scenario.max_steps), } visible_terms.update(fragment.lower() for fragment in self._molecule.values()) visible_terms.update(tool.lower() for tool in self._scenario.enabled_tools) visible_terms.update(constraint.lower() for constraint in self._scenario.hard_constraints) visible_terms.update(reading.property_name.lower() for reading in self._known_assays) visible_terms.update(reading.tool_name.lower() for reading in self._known_assays) return any(term and term in item for term in visible_terms) def _expected_effect_is_plausible( self, action: MolForgeAction, metric: str, direction: str, ) -> bool: if metric not in {"potency", "toxicity", "synth", "novelty", "budget"}: return False if direction not in {"up", "down", "neutral", "unknown", "not_applicable"}: return False if direction in {"unknown", "not_applicable"}: return True if metric == "budget": if action.action_type in {"run_assay", "restart"}: return direction == "down" return direction == "neutral" if action.action_type in {"run_assay", "submit", "defer"}: return direction in {"neutral", "unknown", "not_applicable"} if action.action_type == "restart": if metric in {"toxicity", "budget"}: return direction == "down" if metric == "synth": return direction in {"up", "unknown"} return direction in {"up", "unknown", "neutral"} if action.action_type != "edit" or not action.slot or not action.fragment: return direction in {"neutral", "unknown"} fragment = action.fragment plausibility = { ("hinge", "azaindole", "potency", "up"), ("back_pocket", "cyano", "potency", "up"), ("back_pocket", "cyano", "toxicity", "down"), ("back_pocket", "chloro", "potency", "up"), ("back_pocket", "chloro", "toxicity", "up"), ("back_pocket", "trifluoromethyl", "potency", "up"), ("back_pocket", "trifluoromethyl", "toxicity", "up"), ("solvent_tail", "morpholine", "toxicity", "down"), ("solvent_tail", "morpholine", "synth", "up"), ("solvent_tail", "dimethylamino", "toxicity", "up"), ("warhead", "reversible_cyanoacrylamide", "toxicity", "down"), ("warhead", "reversible_cyanoacrylamide", "novelty", "up"), ("warhead", "nitrile", "toxicity", "down"), } if (action.slot, fragment, metric, direction) in plausibility: return True return direction in {"neutral", "unknown"}