muhammadbinmurtza
Restructure: clauseguard as package subfolder, app_file: clauseguard/app.py
913a064
"""Agent 5: Reporter — compiles the final risk report."""
import json
import logging
from datetime import datetime
from typing import List
from clauseguard.config.prompts import REPORTER_SYSTEM_PROMPT
from clauseguard.config.settings import MODEL_NAME, TEMPERATURE
from clauseguard.models.findings import ScoredClause, Severity
from clauseguard.models.report import FinalReport, RiskSummary
from clauseguard.services.model_service import call_model, clean_json_response
logger = logging.getLogger(__name__)
MAX_RETRIES = 1
async def run_reporter(
scored_clauses: List[ScoredClause],
contract_name: str,
contract_type: str,
partial: bool = False,
truncation_note: str = "",
) -> FinalReport:
"""Compile all analysis into a structured FinalReport.
Args:
scored_clauses: All scored clauses with risk findings.
contract_name: Name of the source contract file.
contract_type: Detected type of the contract.
partial: Whether this is a partial report due to agent failures.
truncation_note: Note about document truncation if contract exceeded clause limit.
Returns:
A complete FinalReport with summary, actions, and markdown.
"""
sorted_clauses = _sort_by_severity(scored_clauses)
summary = _build_summary(sorted_clauses, contract_type)
top_3 = _extract_top_actions(sorted_clauses)
markdown = _build_markdown_programmatically(
sorted_clauses, contract_name, contract_type, summary, top_3
)
return FinalReport(
contract_name=contract_name,
generated_at=datetime.now(),
summary=summary,
top_3_actions=top_3,
scored_clauses=sorted_clauses,
markdown_report=markdown,
processed_normally=not partial,
truncation_note=truncation_note,
)
def _sort_by_severity(scored_clauses: List[ScoredClause]) -> List[ScoredClause]:
"""Sort scored clauses by severity (CRITICAL first)."""
severity_order = {Severity.CRITICAL: 0, Severity.HIGH: 1, Severity.MEDIUM: 2, Severity.LOW: 3, Severity.INFO: 4}
return sorted(scored_clauses, key=lambda sc: severity_order.get(sc.finding.severity, 99))
def _build_summary(scored_clauses: List[ScoredClause], contract_type: str) -> RiskSummary:
"""Build risk summary statistics from scored clauses."""
counts = {Severity.CRITICAL: 0, Severity.HIGH: 0, Severity.MEDIUM: 0, Severity.LOW: 0, Severity.INFO: 0}
for sc in scored_clauses:
counts[sc.finding.severity] = counts.get(sc.finding.severity, 0) + 1
total = len(scored_clauses)
if total > 0:
raw_score = (
counts[Severity.CRITICAL] * 10
+ counts[Severity.HIGH] * 7
+ counts[Severity.MEDIUM] * 4
+ counts[Severity.LOW] * 1
) / total
overall_score = round(min(raw_score, 10.0), 1)
else:
overall_score = 0.0
return RiskSummary(
total_clauses=total,
critical_count=counts[Severity.CRITICAL],
high_count=counts[Severity.HIGH],
medium_count=counts[Severity.MEDIUM],
low_count=counts[Severity.LOW],
overall_score=overall_score,
contract_type=contract_type,
)
def _extract_top_actions(scored_clauses: List[ScoredClause]) -> List[str]:
"""Extract the top 3 most impactful recommended actions."""
actions: List[str] = []
severity_priority = [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW, Severity.INFO]
for sev in severity_priority:
for sc in scored_clauses:
if sc.finding.severity == sev and sc.finding.recommended_action:
if sc.finding.recommended_action not in actions:
actions.append(sc.finding.recommended_action)
if len(actions) >= 3:
return actions
if not actions:
actions.append("Review all clauses with legal counsel before signing.")
return actions[:3]
def _build_markdown_programmatically(
scored_clauses: List[ScoredClause],
contract_name: str,
contract_type: str,
summary: RiskSummary,
top_3: List[str],
) -> str:
"""Build the markdown report programmatically."""
generated_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines: List[str] = [
"# ClauseGuard Risk Report",
f"**Contract:** {contract_name}",
f"**Type:** {contract_type}",
f"**Overall Risk Score:** {summary.overall_score}/10",
f"**Generated:** {generated_str}",
"",
"---",
"",
"## Executive Summary",
_build_executive_summary_text(scored_clauses, summary),
"",
"## Top 3 Actions Before Signing",
]
for i, action in enumerate(top_3, 1):
lines.append(f"{i}. {action}")
info_count = summary.total_clauses - summary.critical_count - summary.high_count - summary.medium_count - summary.low_count
lines.extend([
"",
"## Risk Summary",
"| Severity | Count |",
"|----------|-------|",
f"| 🔴 Critical | {summary.critical_count} |",
f"| 🟠 High | {summary.high_count} |",
f"| 🟡 Medium | {summary.medium_count} |",
f"| 🟢 Low | {summary.low_count} |",
f"| ℹ️ Info | {max(info_count, 0)} |",
"",
"---",
"",
"## Clause Analysis",
"",
])
for sc in scored_clauses:
emoji = _severity_emoji(sc.finding.severity)
lines.append(
f"### {sc.clause.clause_type.value}{emoji} {sc.finding.severity.value}"
)
lines.append(f"**Original:** {sc.clause.raw_text}")
if sc.clause.plain_english:
lines.append(f"**Plain English:** {sc.clause.plain_english}")
lines.append(f"**Risk:** {sc.finding.risk_reason}")
if sc.finding.recommended_action:
lines.append(f"**Action:** {sc.finding.recommended_action}")
lines.append("")
return "\n".join(lines)
def _build_executive_summary_text(
scored_clauses: List[ScoredClause], summary: RiskSummary
) -> str:
"""Build a brief executive summary of the main risks."""
high_severity = [sc for sc in scored_clauses if sc.finding.severity in (Severity.CRITICAL, Severity.HIGH)]
if not high_severity:
return (
"This contract appears to be reasonably balanced with no critical or high-risk clauses identified. "
"Review the medium-risk findings below for items that may warrant attention."
)
types_found = list({sc.clause.clause_type.value for sc in high_severity})
types_str = ", ".join(types_found)
return (
f"This contract contains {summary.critical_count} critical and {summary.high_count} high-severity risks "
f"that require immediate attention. The most concerning areas involve: {types_str}. "
f"We strongly recommend addressing the top 3 actions below before signing this agreement."
)
def _severity_emoji(severity: Severity) -> str:
"""Return emoji for severity level."""
emoji_map = {
Severity.CRITICAL: "🔴",
Severity.HIGH: "🟠",
Severity.MEDIUM: "🟡",
Severity.LOW: "🟢",
Severity.INFO: "ℹ️",
}
return emoji_map.get(severity, "⚪")