File size: 9,734 Bytes
7ff7119 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | """report_node — report generation (Python structure + LLM exec summary).
Factory ``build_report_node(llm=None)``:
* If ``llm`` is provided, the LLM produces a 2-4 sentence English exec summary
from the top risks + package-level findings (``REPORT_SYSTEM_PROMPT`` +
bureaucratic-jargon ban list).
* If ``llm`` is None, ``executive_summary`` stays empty (backward-compatible).
``state["package_insights"]`` and ``state["dd_report"]`` (when present) are
folded into the report — the UI Report tab and the DOCX export render the
full sections from this dict.
"""
from __future__ import annotations
from datetime import datetime
from langchain_core.messages import HumanMessage, SystemMessage
from graph.states.pipeline_state import (
ComparisonReport,
DDPortfolioReport,
PackageInsights,
PipelineState,
ProcessedDocument,
Risk,
)
# Manual-handling-time estimates (per doc_type, in minutes)
_MANUAL_MINUTES = {
"invoice": 8,
"delivery_note": 6,
"purchase_order": 6,
"contract": 35,
"financial_report": 25,
"other": 15,
}
REPORT_SYSTEM_PROMPT = """You write an audit report executive summary in English.
REQUIRED RULES:
1. Work only from the concrete numbers and data points provided. Do not fabricate anything.
2. Use the numbers VERBATIM — do not round, do not reinterpret.
3. Write in natural, concise English. No bureaucratic, robotic phrasing.
4. AVOID these words and phrases: "comprehensive", "thorough", "in-depth",
"regulatory requirements", "recommended actions", "implement", "leveraging",
"going forward" — these are filler.
5. Do not invent words. If unsure, choose a simpler word.
6. If there are no critical findings, say so plainly: "No critical discrepancies found."
7. 2-4 sentences, max 80 words. Be tight.
8. Plain prose. No headings, no bullet points."""
def _bucketize_risks(risks: list[Risk]) -> dict[str, list[dict]]:
"""Group risks by severity (UI rendering helper)."""
out: dict[str, list[dict]] = {"high": [], "medium": [], "low": [], "info": []}
for r in risks:
sev = r.severity.lower()
bucket = sev if sev in out else "low"
out[bucket].append(r.model_dump())
return out
def _evidence_for(doc_type: str) -> int:
from domain_checks import get_evidence_score
return get_evidence_score(doc_type)
def _build_summary_prompt(
documents: list[ProcessedDocument],
risks: list[Risk],
comparison: ComparisonReport | None,
package_insights: PackageInsights | None,
) -> str:
"""Structured line-based prompt so the LLM only uses the provided values."""
doc_count = len(documents)
high = [r for r in risks if r.severity == "high"]
medium = [r for r in risks if r.severity == "medium"]
top_risks = [r.description for r in high[:3]]
top_warnings = [r.description for r in medium[:3]]
parts = [
"Audit results — write a 2-4 sentence English executive summary from these.",
"Use the numbers EXACTLY; do not change them.",
"",
f"Documents processed: {doc_count}",
]
if comparison:
ok = sum(1 for m in comparison.matches if m.get("severity") == "ok")
warn = sum(1 for m in comparison.matches if m.get("severity") == "warning")
crit = sum(1 for m in comparison.matches if m.get("severity") == "critical")
parts.append(
f"Cross-document checks: {ok} ok, "
f"{warn} warnings, {crit} critical discrepancies"
)
parts.append(f"Identified risks: {len(high)} high, {len(medium)} medium")
if top_risks:
parts.append("")
parts.append("Top high-severity risks:")
for r in top_risks:
parts.append(f"- {r}")
if top_warnings:
parts.append("")
parts.append("Top warnings:")
for r in top_warnings:
parts.append(f"- {r}")
# Package-level findings
if package_insights is not None and package_insights.findings:
top_pkg_high = [
f.get("description") or f.get("leiras", "")
for f in package_insights.findings
if (f.get("severity") or f.get("sulyossag") or "").lower() == "high"
or (f.get("severity") or f.get("sulyossag") or "").lower() == "magas"
][:3]
top_pkg_med = [
f.get("description") or f.get("leiras", "")
for f in package_insights.findings
if (f.get("severity") or f.get("sulyossag") or "").lower() in ("medium", "kozepes", "közepes")
][:2]
if top_pkg_high or top_pkg_med:
parts.append("")
parts.append("Package-level findings (cross-doc):")
for r in top_pkg_high:
parts.append(f"- [HIGH] {r}")
for r in top_pkg_med:
parts.append(f"- [MEDIUM] {r}")
return "\n".join(parts)
def build_report_node(llm=None):
"""Factory: capture ``llm`` in a closure for the exec summary call.
Args:
llm: optional BaseChatModel-like Runnable. If provided, it generates a
2-4 sentence English executive summary from the structured input.
If None, the summary stays empty.
"""
async def report_node(state: PipelineState) -> dict:
documents: list[ProcessedDocument] = state.get("documents") or []
risks: list[Risk] = state.get("risks") or []
comparison: ComparisonReport | None = state.get("comparison")
package_insights: PackageInsights | None = state.get("package_insights")
dd_report: DDPortfolioReport | None = state.get("dd_report")
processing_seconds = state.get("processing_seconds") or 0.0
# Per-doc info + manual_total computation
docs_info = []
manual_total = 0
for d in documents:
if d.ingested is None:
continue
doc_type = d.classification.doc_type if d.classification else "other"
manual = _MANUAL_MINUTES.get(doc_type, 15)
manual_total += manual
docs_info.append({
"file": d.ingested.file_name,
"type": d.classification.doc_type_display if d.classification else "Other",
"extracted_fields": (
len(d.extracted.raw) if d.extracted and isinstance(d.extracted.raw, dict) else 0
),
"evidence_score": _evidence_for(doc_type),
})
speedup = (manual_total * 60.0) / processing_seconds if processing_seconds > 0 else 0.0
report: dict = {
"generated_at": datetime.now().isoformat(),
"document_count": len(documents),
"performance": {
"processing_seconds": round(processing_seconds, 2),
"documents": len(documents),
"manual_estimate_minutes": manual_total,
"speedup": round(speedup, 1),
},
"documents": docs_info,
"risks": _bucketize_risks(risks),
"comparison": comparison.model_dump() if comparison else None,
"executive_summary": "",
# Opt-in sections — populated only when demo flow or DD tab ran
"package_insights": None,
"dd_analysis": None,
}
# Package-level analysis integration
if package_insights is not None:
report["package_insights"] = {
"executive_summary": package_insights.executive_summary or "",
"findings": list(package_insights.findings or []),
"key_observations": list(package_insights.key_observations or []),
"package_type": package_insights.package_type or "general",
}
# DD analysis integration
if dd_report is not None and dd_report.executive_summary:
report["dd_analysis"] = {
"executive_summary": dd_report.executive_summary,
"top_red_flags": list(dd_report.top_red_flags or []),
"contracts": list(dd_report.contracts or []),
"total_monthly_obligations": dict(dd_report.total_monthly_obligations or {}),
"high_risk_contracts": list(dd_report.high_risk_contracts or []),
"expiring_soon": list(dd_report.expiring_soon or []),
}
# LLM exec summary — when llm is provided
if llm is not None:
try:
summary_prompt = _build_summary_prompt(
documents, risks, comparison, package_insights,
)
response = await llm.ainvoke([
SystemMessage(content=REPORT_SYSTEM_PROMPT),
HumanMessage(content=summary_prompt),
])
content = response.content
if isinstance(content, str):
report["executive_summary"] = content.strip()
elif isinstance(content, list):
text_parts = [
part.get("text", "") for part in content
if isinstance(part, dict) and part.get("type") == "text"
]
report["executive_summary"] = "\n".join(t for t in text_parts if t).strip()
except Exception:
# Empty summary on error — the rest of the report is still useful
report["executive_summary"] = ""
return {"report": report}
return report_node
# Backward-compat: keep the legacy report_node API (llm=None default)
async def report_node(state: PipelineState) -> dict:
"""Backward-compat wrapper — runs build_report_node without an LLM."""
inner = build_report_node(llm=None)
return await inner(state)
|