File size: 5,337 Bytes
7ff7119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""DD specialist agents: Audit, Legal, Compliance, Financial.

Dummy implementation: Python-deterministic aggregates over ``contracts``.
The Phase 7+ vLLM/Qwen path will replace these with ``with_structured_output``
Pydantic structs (this is the LangGraph-native form, ready for the LLM swap).
"""

from __future__ import annotations

from graph.states.dd_state import (
    AuditFindings,
    ComplianceFindings,
    DDState,
    FinancialFindings,
    LegalFindings,
)
from utils.dates import is_expiring_soon


# ---------------------------------------------------------------------------
# Audit — financial anomalies, price changes
# ---------------------------------------------------------------------------


async def audit_specialist(state: DDState) -> dict:
    contracts = state.get("contracts") or []
    pricing_anomalies: list[str] = []
    overcharging: list[str] = []

    # Heuristic: 2+ contracts with the same parties → if values differ > 30% → anomaly
    if len(contracts) >= 2:
        groups: dict[tuple, list] = {}
        for c in contracts:
            key = tuple(sorted(c.parties))
            groups.setdefault(key, []).append(c)
        for parties, group in groups.items():
            if len(group) < 2:
                continue
            values = [c.total_value for c in group if c.total_value]
            if len(values) >= 2 and min(values) > 0:
                ratio = max(values) / min(values)
                if ratio > 1.3:
                    pricing_anomalies.append(
                        f"Between parties {list(parties)}: value ratio {ratio:.1f}x "
                        f"(min: {min(values):.0f}, max: {max(values):.0f})"
                    )

    findings = AuditFindings(
        pricing_anomalies=pricing_anomalies,
        overcharging=overcharging,
        note=f"{len(contracts)} contracts analyzed from an audit perspective.",
    )
    return {
        "audit_findings": findings,
        "call_history": ["audit"],
    }


# ---------------------------------------------------------------------------
# Legal — clauses, change-of-control, non-compete, penalty
# ---------------------------------------------------------------------------


async def legal_specialist(state: DDState) -> dict:
    contracts = state.get("contracts") or []
    red_flags: list[str] = []
    coc_list: list[str] = []
    nc_list: list[str] = []

    for c in contracts:
        for flag in c.red_flags:
            red_flags.append(f"{c.file_name}: {flag}")
            if "change-of-control" in flag.lower():
                coc_list.append(c.file_name)
            if "non-compete" in flag.lower() or "versenytilalom" in flag.lower():
                nc_list.append(c.file_name)

    findings = LegalFindings(
        red_flags=red_flags[:7],  # top-7
        change_of_control=coc_list,
        non_compete=nc_list,
        note=f"{len(contracts)} contracts analyzed from a legal perspective; {len(red_flags)} red flags.",
    )
    return {
        "legal_findings": findings,
        "call_history": ["legal"],
    }


# ---------------------------------------------------------------------------
# Compliance — GDPR, AML
# ---------------------------------------------------------------------------


async def compliance_specialist(state: DDState) -> dict:
    documents = state.get("documents") or []  # only contracts here, after contract_filter
    gdpr_issues: list[str] = []
    aml_alerts: list[str] = []

    for d in documents:
        if d.ingested is None:
            continue
        for r in d.risks:
            if r.source_check_id == "check_08_gdpr_28":
                gdpr_issues.append(f"{d.ingested.file_name}: {r.description}")
            elif r.source_check_id == "check_13_aml_sanctions":
                aml_alerts.append(f"{d.ingested.file_name}: {r.description}")

    findings = ComplianceFindings(
        gdpr_issues=gdpr_issues[:5],
        aml_alerts=aml_alerts[:5],
        note=f"{len(gdpr_issues)} GDPR + {len(aml_alerts)} AML signals.",
    )
    return {
        "compliance_findings": findings,
        "call_history": ["compliance"],
    }


# ---------------------------------------------------------------------------
# Financial — monthly obligations, expirations
# ---------------------------------------------------------------------------


async def financial_specialist(state: DDState) -> dict:
    contracts = state.get("contracts") or []
    monthly_obligations: dict[str, float] = {}
    expiring_soon: list[str] = []
    high_value: list[str] = []

    for c in contracts:
        if c.monthly_fee and c.monthly_fee > 0:
            currency = c.monthly_fee_currency or "USD"
            monthly_obligations[currency] = monthly_obligations.get(currency, 0.0) + c.monthly_fee
        if is_expiring_soon(c.expiry_date, months=12):
            expiring_soon.append(c.file_name)
        if c.total_value and c.total_value >= 10_000_000:
            high_value.append(c.file_name)

    findings = FinancialFindings(
        monthly_obligations=monthly_obligations,
        expiring_soon=expiring_soon,
        high_value_contracts=high_value,
        note=f"{len(contracts)} contracts analyzed from a financial perspective.",
    )
    return {
        "financial_findings": findings,
        "call_history": ["financial"],
    }