File size: 11,069 Bytes
7ff7119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""dd_synthesizer — build the DD portfolio Pydantic report with an LLM exec summary.

  1. The 4 specialists (audit/legal/compliance/financial) have already run; their
     outputs live in the state (``audit_findings``, etc.).
  2. The per-contract Python summary (``contracts``) has also been built.
  3. Aggregate monthly obligations + expiring_soon come from ``financial_findings``.
  4. **One LLM call** with structured output: executive_summary +
     top_red_flags (3-7 items) + contract_risk_ratings (per-contract rating + rationale).
  5. The LLM rating overrides the per-contract Python-computed ``risk_level``.
  6. On error: a Python fallback executive summary.

Factory ``build_dd_synthesizer(llm)`` captures the LLM Runnable in a closure.
"""

from __future__ import annotations

import re
from typing import Literal

from langchain_core.messages import HumanMessage, SystemMessage
from pydantic import BaseModel, Field, field_validator

from graph.states.dd_state import DDState
from graph.states.pipeline_state import DDPortfolioReport


def _normalize_string_list(raw) -> list[str]:
    """Sometimes the LLM emits ``<item>...</item>`` markup for a JSON list[str].

    We normalize before pydantic validates so ``top_red_flags`` and similar
    list fields parse cleanly even when the LLM wraps items.
    """
    if raw is None:
        return []
    if isinstance(raw, list):
        return [str(item).strip() for item in raw if item is not None and str(item).strip()]
    if isinstance(raw, str):
        # 1. Try <item>...</item> XML-like parsing
        items = re.findall(r"<item>\s*(.*?)\s*</item>", raw, flags=re.DOTALL)
        if items:
            return [it.strip() for it in items if it.strip()]
        # 2. Line-by-line splitting
        lines = [line.strip() for line in raw.split("\n") if line.strip()]
        if len(lines) > 1:
            cleaned = []
            for line in lines:
                line = re.sub(r"^[\-\*•]\s+", "", line)
                line = re.sub(r"^\d+[\.\)]\s+", "", line)
                if line:
                    cleaned.append(line)
            return cleaned
        # 3. Fallback
        return [raw.strip()] if raw.strip() else []
    return []


DD_SUMMARY_SYSTEM_PROMPT = """You are a Due Diligence (DD) expert in the context of an
acquisition transaction. Based on the contract portfolio, you produce an
executive summary of transaction risks.

REQUIREMENTS:
1. Rely ONLY on FACTS that appear in the documents. Do not speculate.
2. Focus on DD-relevant risks:
   - Imminent expirations (6-12 months)
   - Change-of-control clauses (termination on owner change)
   - High monthly obligations
   - GDPR / data-protection issues
   - Excessively long termination notice periods
   - Unusual penalty clauses
3. Rank: most severe risks first.
4. English, concise, professional tone.
5. Avoid filler ("worth examining", "advisable to review") — give concrete
   observations, e.g. "The DataLab contract is +67% pricier under the NDA — a red flag".

Respond strictly per the JSON schema."""


class _ContractRiskRating(BaseModel):
    file_name: str
    risk_level: Literal["low", "medium", "high"] = "low"
    rationale: str = ""


class _DDReportLLM(BaseModel):
    """Structured LLM output for the DD synthesis."""
    executive_summary: str = ""
    top_red_flags: list[str] = Field(default_factory=list)
    contract_risk_ratings: list[_ContractRiskRating] = Field(default_factory=list)

    @field_validator("top_red_flags", mode="before")
    @classmethod
    def _normalize_red_flags(cls, v):
        return _normalize_string_list(v)


def _build_summary_prompt(state: DDState) -> str:
    """Structured input prompt."""
    contracts = state.get("contracts") or []
    parts = [
        "Contract portfolio for DD analysis:",
        "",
    ]
    for i, s in enumerate(contracts, start=1):
        parts.append(f"--- Contract {i}: {s.file_name} ---")
        parts.append(f"Type: {s.contract_type}")
        parts.append(f"Parties: {', '.join(s.parties)}")
        parts.append(f"Effective: {s.effective_date} -- expires: {s.expiry_date}")
        if s.total_value:
            parts.append(f"Value: {s.total_value} {s.currency}")
        if s.risk_elements:
            parts.append("Risk elements:")
            for k in s.risk_elements[:5]:
                parts.append(f"  - {k}")
        if s.red_flags:
            parts.append("Red flags:")
            for p in s.red_flags[:3]:
                parts.append(f"  - {p}")
        parts.append("")

    # Append the 4 specialists' findings to enrich the exec summary
    audit = state.get("audit_findings")
    legal = state.get("legal_findings")
    compliance = state.get("compliance_findings")
    financial = state.get("financial_findings")

    if any([audit, legal, compliance, financial]):
        parts.append("--- Specialist analyses ---")
        if audit:
            if audit.pricing_anomalies:
                parts.append(f"Audit (pricing anomalies): {', '.join(audit.pricing_anomalies[:3])}")
            if audit.overcharging:
                parts.append(f"Audit (overcharging): {', '.join(audit.overcharging[:3])}")
        if legal:
            if legal.red_flags:
                parts.append(f"Legal (red flags): {', '.join(legal.red_flags[:3])}")
            if legal.change_of_control:
                parts.append(f"Legal (CoC): {', '.join(legal.change_of_control[:2])}")
            if legal.non_compete:
                parts.append(f"Legal (non-compete): {', '.join(legal.non_compete[:2])}")
        if compliance:
            if compliance.gdpr_issues:
                parts.append(f"Compliance (GDPR): {', '.join(compliance.gdpr_issues[:3])}")
            if compliance.aml_alerts:
                parts.append(f"Compliance (AML): {', '.join(compliance.aml_alerts[:2])}")
        if financial:
            if financial.expiring_soon:
                parts.append(f"Financial (expiring soon): {', '.join(financial.expiring_soon[:3])}")
            if financial.high_value_contracts:
                parts.append(f"Financial (high value): {', '.join(financial.high_value_contracts[:3])}")
        parts.append("")

    parts.append(
        "Produce a DD executive summary, a top red flags list, and a per-contract "
        "risk rating with rationale."
    )
    return "\n".join(parts)


def build_dd_synthesizer(llm=None):
    """Factory: dd_synthesizer node that captures the LLM."""

    async def dd_synthesizer(state: DDState) -> dict:
        contracts = state.get("contracts") or []
        audit = state.get("audit_findings")
        legal = state.get("legal_findings")
        compliance = state.get("compliance_findings")
        financial = state.get("financial_findings")

        # Aggregated metrics (Python-deterministic)
        monthly_obligations = financial.monthly_obligations if financial else {}
        expiring_soon = list(financial.expiring_soon) if financial else []

        # LLM call (if llm is provided)
        executive_summary = ""
        top_red_flags: list[str] = []
        rating_map: dict[str, tuple[str, str]] = {}

        if llm is not None and contracts:
            try:
                structured_llm = llm.with_structured_output(_DDReportLLM)
                response: _DDReportLLM = await structured_llm.ainvoke([
                    SystemMessage(content=DD_SUMMARY_SYSTEM_PROMPT),
                    HumanMessage(content=_build_summary_prompt(state)),
                ])
                executive_summary = response.executive_summary or ""
                top_red_flags = list(response.top_red_flags or [])
                # Per-contract rating mapping (file_name → (risk_level, rationale))
                for r in response.contract_risk_ratings:
                    if r.file_name:
                        rating_map[r.file_name] = (r.risk_level, r.rationale)

                # LLM rating overrides Python-computed level
                for c in contracts:
                    if c.file_name in rating_map:
                        new_level, rationale = rating_map[c.file_name]
                        if new_level in ("low", "medium", "high"):
                            c.risk_level = new_level
                            if rationale:
                                c.red_flags.insert(0, f"DD assessment: {rationale}")
            except Exception as exc:
                # LLM error: Python fallback summary
                high_risk_count = sum(1 for c in contracts if c.risk_level == "high")
                executive_summary = (
                    f"LLM-based DD summary failed ({type(exc).__name__}). "
                    f"Python-based metrics: "
                    f"{len(contracts)} contracts, {high_risk_count} high-risk, "
                    f"{len(expiring_soon)} expiring soon."
                )

        # If no LLM or no contracts: minimal Python fallback
        if not executive_summary:
            high_risk_count = sum(1 for c in contracts if c.risk_level == "high")
            if not contracts:
                executive_summary = (
                    "No contract-type documents are present in the input. "
                    "Upload at least one contract for DD analysis."
                )
            else:
                executive_summary = (
                    f"DD portfolio: {len(contracts)} contracts, "
                    f"{high_risk_count} high-risk, "
                    f"{len(expiring_soon)} expiring soon."
                )

        # High risk list per the (LLM-overridden) per-contract rating
        high_risk_contracts = [c.file_name for c in contracts if c.risk_level == "high"]

        # Top red flags fallback: if the LLM didn't provide them, gather from Python red flags
        if not top_red_flags:
            for c in contracts:
                top_red_flags.extend(c.red_flags[:2])
            top_red_flags = top_red_flags[:7]

        # Specialist outputs (debug)
        specialist_outputs = {}
        if audit:
            specialist_outputs["audit"] = audit.model_dump()
        if legal:
            specialist_outputs["legal"] = legal.model_dump()
        if compliance:
            specialist_outputs["compliance"] = compliance.model_dump()
        if financial:
            specialist_outputs["financial"] = financial.model_dump()

        report = DDPortfolioReport(
            contract_count=len(contracts),
            contracts=[c.model_dump() for c in contracts],
            total_monthly_obligations=dict(monthly_obligations),
            expiring_soon=expiring_soon,
            high_risk_contracts=high_risk_contracts,
            top_red_flags=top_red_flags,
            executive_summary=executive_summary,
            specialist_outputs=specialist_outputs,
        )

        return {"dd_report": report}

    return dd_synthesizer


# Backward-compat
async def dd_synthesizer(state: DDState) -> dict:
    """Backward-compat wrapper — runs build_dd_synthesizer without an LLM."""
    inner = build_dd_synthesizer(llm=None)
    return await inner(state)