paperhawk / nodes /dd /specialists.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
raw
history blame
5.34 kB
"""DD specialist agents: Audit, Legal, Compliance, Financial.
Dummy implementation: Python-deterministic aggregates over ``contracts``.
The Phase 7+ vLLM/Qwen path will replace these with ``with_structured_output``
Pydantic structs (this is the LangGraph-native form, ready for the LLM swap).
"""
from __future__ import annotations
from graph.states.dd_state import (
AuditFindings,
ComplianceFindings,
DDState,
FinancialFindings,
LegalFindings,
)
from utils.dates import is_expiring_soon
# ---------------------------------------------------------------------------
# Audit — financial anomalies, price changes
# ---------------------------------------------------------------------------
async def audit_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
pricing_anomalies: list[str] = []
overcharging: list[str] = []
# Heuristic: 2+ contracts with the same parties → if values differ > 30% → anomaly
if len(contracts) >= 2:
groups: dict[tuple, list] = {}
for c in contracts:
key = tuple(sorted(c.parties))
groups.setdefault(key, []).append(c)
for parties, group in groups.items():
if len(group) < 2:
continue
values = [c.total_value for c in group if c.total_value]
if len(values) >= 2 and min(values) > 0:
ratio = max(values) / min(values)
if ratio > 1.3:
pricing_anomalies.append(
f"Between parties {list(parties)}: value ratio {ratio:.1f}x "
f"(min: {min(values):.0f}, max: {max(values):.0f})"
)
findings = AuditFindings(
pricing_anomalies=pricing_anomalies,
overcharging=overcharging,
note=f"{len(contracts)} contracts analyzed from an audit perspective.",
)
return {
"audit_findings": findings,
"call_history": ["audit"],
}
# ---------------------------------------------------------------------------
# Legal — clauses, change-of-control, non-compete, penalty
# ---------------------------------------------------------------------------
async def legal_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
red_flags: list[str] = []
coc_list: list[str] = []
nc_list: list[str] = []
for c in contracts:
for flag in c.red_flags:
red_flags.append(f"{c.file_name}: {flag}")
if "change-of-control" in flag.lower():
coc_list.append(c.file_name)
if "non-compete" in flag.lower() or "versenytilalom" in flag.lower():
nc_list.append(c.file_name)
findings = LegalFindings(
red_flags=red_flags[:7], # top-7
change_of_control=coc_list,
non_compete=nc_list,
note=f"{len(contracts)} contracts analyzed from a legal perspective; {len(red_flags)} red flags.",
)
return {
"legal_findings": findings,
"call_history": ["legal"],
}
# ---------------------------------------------------------------------------
# Compliance — GDPR, AML
# ---------------------------------------------------------------------------
async def compliance_specialist(state: DDState) -> dict:
documents = state.get("documents") or [] # only contracts here, after contract_filter
gdpr_issues: list[str] = []
aml_alerts: list[str] = []
for d in documents:
if d.ingested is None:
continue
for r in d.risks:
if r.source_check_id == "check_08_gdpr_28":
gdpr_issues.append(f"{d.ingested.file_name}: {r.description}")
elif r.source_check_id == "check_13_aml_sanctions":
aml_alerts.append(f"{d.ingested.file_name}: {r.description}")
findings = ComplianceFindings(
gdpr_issues=gdpr_issues[:5],
aml_alerts=aml_alerts[:5],
note=f"{len(gdpr_issues)} GDPR + {len(aml_alerts)} AML signals.",
)
return {
"compliance_findings": findings,
"call_history": ["compliance"],
}
# ---------------------------------------------------------------------------
# Financial — monthly obligations, expirations
# ---------------------------------------------------------------------------
async def financial_specialist(state: DDState) -> dict:
contracts = state.get("contracts") or []
monthly_obligations: dict[str, float] = {}
expiring_soon: list[str] = []
high_value: list[str] = []
for c in contracts:
if c.monthly_fee and c.monthly_fee > 0:
currency = c.monthly_fee_currency or "USD"
monthly_obligations[currency] = monthly_obligations.get(currency, 0.0) + c.monthly_fee
if is_expiring_soon(c.expiry_date, months=12):
expiring_soon.append(c.file_name)
if c.total_value and c.total_value >= 10_000_000:
high_value.append(c.file_name)
findings = FinancialFindings(
monthly_obligations=monthly_obligations,
expiring_soon=expiring_soon,
high_value_contracts=high_value,
note=f"{len(contracts)} contracts analyzed from a financial perspective.",
)
return {
"financial_findings": findings,
"call_history": ["financial"],
}