paperhawk / graph /package_insights_graph.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
raw
history blame
8.13 kB
"""package_insights_graph — package-level cross-doc analysis in a single LLM call.
Simple 1-LLM-call topology:
START
→ generate_insights (1 LLM call with ALL document data, perspective-driven
instructions, RISK_SYSTEM_PROMPT-style anti-hallucination)
END → final_insights key
The ``package_type`` (audit/dd/compliance/general) selects different prompt
instructions — see ``_PACKAGE_TYPE_INSTRUCTIONS`` below.
"""
from __future__ import annotations
import json
from typing import TypedDict
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field
from graph.states.pipeline_state import PackageInsights, ProcessedDocument
class PackageInsightsState(TypedDict, total=False):
"""The package_insights_graph state."""
documents: list[ProcessedDocument]
package_type: str # audit | dd | compliance | general
final_insights: PackageInsights | None
# 4 detailed perspective instructions
_PACKAGE_TYPE_INSTRUCTIONS = {
"audit": (
"Analyze the document package from an audit perspective. Focus on financial "
"anomalies: pricing patterns, signs of over-billing, quantity discrepancies, "
"VAT anomalies, back-dating, payment-term inconsistencies. If the same "
"service or item appears in multiple documents at different prices or "
"quantities, that is a strong audit risk signal."
),
"dd": (
"Analyze the document package from a Due Diligence perspective in the "
"context of a transaction. Focus on: change-of-control clauses, near-term "
"expirations, amendments under NDA, unusually long termination notice, "
"significant percentage price hikes, legal red-flag clauses, "
"disproportionate penalty clauses, warranty obligations."
),
"compliance": (
"Analyze the document package from a compliance perspective. Focus on: "
"GDPR and data-protection clauses present/absent, encryption requirements, "
"incident-handling procedures, audit rights, liability limitations, "
"access controls, data-processor declarations. If the contract handles "
"PERSONAL DATA without proper data-protection language, that is a "
"critical compliance risk."
),
"general": (
"Analyze the document package from a general business audit perspective. "
"Focus on cross-doc patterns: consistency, missing data, anomalies, "
"broken business logic."
),
}
SYSTEM_PROMPT = """You are a package-level audit assistant. You receive multiple
documents at once and look for risks and anomalies that are visible ONLY when
the documents are reviewed TOGETHER — not within a single document.
CRITICAL RULES:
1. Rely ONLY on data that actually appears in the supplied documents. NEVER
fabricate a number, date, name, or field value.
2. If a piece of data is missing from every document, mention it as a fact
("missing data") — do NOT invent a value.
3. Cite specific references: which document, which field, which value you saw.
Do not generalize.
4. Descriptions should be concise but informative: concrete numbers, dates,
names — NOT generic "worth checking" filler.
5. Do not repeat the same observation. One risk = one entry.
6. Write in English, in a natural business tone. Avoid bureaucratic jargon:
"comprehensive", "thorough", "in-depth", "leveraging", "implement",
"going forward", "regulatory requirements".
7. Fill every field: executive_summary (4-6 sentences), findings (list of
structured risks), key_observations (3-7 concise observations)."""
# Pydantic structure for ``with_structured_output()``
class _PackageFinding(BaseModel):
description: str
severity: str = "low" # high | medium | low
rationale: str = ""
affected_documents: list[str] = Field(default_factory=list)
class _PackageInsightsResult(BaseModel):
executive_summary: str = ""
findings: list[_PackageFinding] = Field(default_factory=list)
key_observations: list[str] = Field(default_factory=list)
def _build_documents_summary(documents: list[ProcessedDocument]) -> list[dict]:
"""Compact per-document representation for the LLM.
Strips meta-fields (_quotes, _confidence, _source) to save prompt context.
"""
summary: list[dict] = []
for doc in documents:
if doc.extracted is None or doc.classification is None or doc.ingested is None:
continue
clean_data = {
k: v
for k, v in (doc.extracted.raw or {}).items()
if not k.startswith("_")
}
summary.append({
"file": doc.ingested.file_name,
"type": doc.classification.doc_type_display,
"doc_type": doc.classification.doc_type,
"data": clean_data,
})
return summary
def build_package_insights_graph(*, llm=None, checkpointer=None):
"""Compile the package_insights graph.
Args:
llm: optional BaseChatModel-like Runnable. If provided, one LLM call
produces a cross-doc PackageInsights bound to the
``_PackageInsightsResult`` Pydantic schema. If None, dummy
fallback (empty findings + a basic exec summary).
checkpointer: optional checkpointer.
"""
async def generate_insights_node(state: PackageInsightsState) -> dict:
"""Generate cross-doc analysis in a single LLM call."""
documents = state.get("documents") or []
package_type = state.get("package_type", "general")
if not documents:
return {"final_insights": PackageInsights(
executive_summary="No processed documents are available.",
package_type=package_type,
)}
# No LLM → dummy fallback
if llm is None:
return {"final_insights": PackageInsights(
executive_summary=(
f"{len(documents)} documents in the '{package_type}' package. "
"Package-level AI analysis requires a configured LLM provider (vLLM/Ollama)."
),
package_type=package_type,
)}
documents_summary = _build_documents_summary(documents)
try:
docs_json = json.dumps(documents_summary, ensure_ascii=False, indent=2)
except (TypeError, ValueError):
docs_json = str(documents_summary)
perspective = _PACKAGE_TYPE_INSTRUCTIONS.get(
package_type, _PACKAGE_TYPE_INSTRUCTIONS["general"]
)
prompt = f"""{perspective}
The full data set of the document package is below (each with the extracted fields):
{docs_json}
Return a structured package-level analysis per the schema. Use concrete data
references, not generic phrasing."""
structured_llm = llm.with_structured_output(_PackageInsightsResult)
try:
response: _PackageInsightsResult = await structured_llm.ainvoke([
SystemMessage(content=SYSTEM_PROMPT),
HumanMessage(content=prompt),
])
except Exception as exc:
return {"final_insights": PackageInsights(
executive_summary=(
f"Package-level analysis failed ({type(exc).__name__}). "
f"Try again later or check the LLM endpoint."
),
package_type=package_type,
)}
return {"final_insights": PackageInsights(
executive_summary=response.executive_summary or "",
findings=[f.model_dump() for f in response.findings],
key_observations=list(response.key_observations or []),
package_type=package_type,
)}
graph = StateGraph(PackageInsightsState)
graph.add_node("generate_insights", generate_insights_node)
graph.add_edge(START, "generate_insights")
graph.add_edge("generate_insights", END)
if checkpointer is not None:
return graph.compile(checkpointer=checkpointer)
return graph.compile()