File size: 5,337 Bytes
7ff7119 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """DD specialist agents: Audit, Legal, Compliance, Financial.
Dummy implementation: Python-deterministic aggregates over ``contracts``.
The Phase 7+ vLLM/Qwen path will replace these with ``with_structured_output``
Pydantic structs (this is the LangGraph-native form, ready for the LLM swap).
"""
from __future__ import annotations
from graph.states.dd_state import (
AuditFindings,
ComplianceFindings,
DDState,
FinancialFindings,
LegalFindings,
)
from utils.dates import is_expiring_soon
# ---------------------------------------------------------------------------
# Audit — financial anomalies, price changes
# ---------------------------------------------------------------------------
async def audit_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
pricing_anomalies: list[str] = []
overcharging: list[str] = []
# Heuristic: 2+ contracts with the same parties → if values differ > 30% → anomaly
if len(contracts) >= 2:
groups: dict[tuple, list] = {}
for c in contracts:
key = tuple(sorted(c.parties))
groups.setdefault(key, []).append(c)
for parties, group in groups.items():
if len(group) < 2:
continue
values = [c.total_value for c in group if c.total_value]
if len(values) >= 2 and min(values) > 0:
ratio = max(values) / min(values)
if ratio > 1.3:
pricing_anomalies.append(
f"Between parties {list(parties)}: value ratio {ratio:.1f}x "
f"(min: {min(values):.0f}, max: {max(values):.0f})"
)
findings = AuditFindings(
pricing_anomalies=pricing_anomalies,
overcharging=overcharging,
note=f"{len(contracts)} contracts analyzed from an audit perspective.",
)
return {
"audit_findings": findings,
"call_history": ["audit"],
}
# ---------------------------------------------------------------------------
# Legal — clauses, change-of-control, non-compete, penalty
# ---------------------------------------------------------------------------
async def legal_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
red_flags: list[str] = []
coc_list: list[str] = []
nc_list: list[str] = []
for c in contracts:
for flag in c.red_flags:
red_flags.append(f"{c.file_name}: {flag}")
if "change-of-control" in flag.lower():
coc_list.append(c.file_name)
if "non-compete" in flag.lower() or "versenytilalom" in flag.lower():
nc_list.append(c.file_name)
findings = LegalFindings(
red_flags=red_flags[:7], # top-7
change_of_control=coc_list,
non_compete=nc_list,
note=f"{len(contracts)} contracts analyzed from a legal perspective; {len(red_flags)} red flags.",
)
return {
"legal_findings": findings,
"call_history": ["legal"],
}
# ---------------------------------------------------------------------------
# Compliance — GDPR, AML
# ---------------------------------------------------------------------------
async def compliance_specialist(state: DDState) -> dict:
documents = state.get("documents") or [] # only contracts here, after contract_filter
gdpr_issues: list[str] = []
aml_alerts: list[str] = []
for d in documents:
if d.ingested is None:
continue
for r in d.risks:
if r.source_check_id == "check_08_gdpr_28":
gdpr_issues.append(f"{d.ingested.file_name}: {r.description}")
elif r.source_check_id == "check_13_aml_sanctions":
aml_alerts.append(f"{d.ingested.file_name}: {r.description}")
findings = ComplianceFindings(
gdpr_issues=gdpr_issues[:5],
aml_alerts=aml_alerts[:5],
note=f"{len(gdpr_issues)} GDPR + {len(aml_alerts)} AML signals.",
)
return {
"compliance_findings": findings,
"call_history": ["compliance"],
}
# ---------------------------------------------------------------------------
# Financial — monthly obligations, expirations
# ---------------------------------------------------------------------------
async def financial_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
monthly_obligations: dict[str, float] = {}
expiring_soon: list[str] = []
high_value: list[str] = []
for c in contracts:
if c.monthly_fee and c.monthly_fee > 0:
currency = c.monthly_fee_currency or "USD"
monthly_obligations[currency] = monthly_obligations.get(currency, 0.0) + c.monthly_fee
if is_expiring_soon(c.expiry_date, months=12):
expiring_soon.append(c.file_name)
if c.total_value and c.total_value >= 10_000_000:
high_value.append(c.file_name)
findings = FinancialFindings(
monthly_obligations=monthly_obligations,
expiring_soon=expiring_soon,
high_value_contracts=high_value,
note=f"{len(contracts)} contracts analyzed from a financial perspective.",
)
return {
"financial_findings": findings,
"call_history": ["financial"],
}
|