paperhawk / validation /compare.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
raw
history blame
16.6 kB
"""Cross-document checks — three-way matching and two-doc compare.
Pure Python, no LLM calls. ``utils.numbers.coerce_number`` provides tolerant
numeric normalization (HU/US/EU/FR formats, currency tokens, null aliases).
Two APIs:
* ``three_way_match(invoice, delivery_note, purchase_order)`` → ComparisonResult
* ``compare_two_documents(doc_a, doc_b, fields)`` → ComparisonResult
``ComparisonResult`` is dict-shaped (Pydantic-compatible). The ``compare_node``
wraps it into a ``ComparisonReport`` Pydantic model in the parent state.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from utils.numbers import coerce_number
@dataclass
class MatchResult:
"""One comparison result."""
status: str # "match" | "mismatch" | "missing"
severity: str # "ok" | "warning" | "critical"
message: str
field_name: str
expected: str | float | None = None
actual: str | float | None = None
source_a: str = ""
source_b: str = ""
def to_dict(self) -> dict:
return {
"status": self.status,
"severity": self.severity,
"message": self.message,
"field_name": self.field_name,
"expected": self.expected,
"actual": self.actual,
"source_a": self.source_a,
"source_b": self.source_b,
}
@dataclass
class ComparisonResult:
"""Aggregated three-way / pair-wise comparison output."""
matches: list[MatchResult] = field(default_factory=list)
total_checks: int = 0
ok_count: int = 0
warning_count: int = 0
critical_count: int = 0
missing_count: int = 0
def add(self, result: MatchResult) -> None:
self.matches.append(result)
self.total_checks += 1
if result.severity == "ok":
self.ok_count += 1
elif result.severity == "warning":
self.warning_count += 1
elif result.severity == "critical":
self.critical_count += 1
if result.status == "missing":
self.missing_count += 1
# ---------------------------------------------------------------------------
# Apples-to-apples amount extraction (multilingual EN-first + HU/legacy fallback)
# ---------------------------------------------------------------------------
def _get_gross_amount(data: dict) -> float | None:
if not isinstance(data, dict):
return None
for field_name in (
"total_gross", "gross_total", "gross_amount",
# Legacy / multilingual fallback
"brutto_vegosszeg", "brutto_osszeg", "brutto_vegosszeg_huf",
):
val = coerce_number(data.get(field_name))
if val is not None:
return val
return None
def _get_net_amount(data: dict) -> float | None:
if not isinstance(data, dict):
return None
for field_name in (
"total_net", "net_total", "net_amount",
# Legacy / multilingual fallback
"netto_vegosszeg", "netto_osszeg", "netto_vegosszeg_huf",
):
val = coerce_number(data.get(field_name))
if val is not None:
return val
return None
def _get_generic_amount(data: dict) -> float | None:
if not isinstance(data, dict):
return None
for field_name in ("amount", "total", "value", "osszeg", "ertek"):
val = coerce_number(data.get(field_name))
if val is not None:
return val
return None
# ---------------------------------------------------------------------------
# Amount comparison with tolerance tiers
# ---------------------------------------------------------------------------
def _compare_amounts(
report: ComparisonResult,
label: str,
amount_a, amount_b,
source_a: str = "",
source_b: str = "",
tolerance_pct: float = 0.01,
) -> None:
"""Compare two amounts with tolerance tiers.
Tolerance levels:
* ≤ 1 absolute diff → OK
* ≤ 1% diff → OK (rounding edge)
* ≤ 5% diff → warning
* > 5% diff → critical
"""
a = coerce_number(amount_a)
b = coerce_number(amount_b)
if a is None or b is None:
return
if a == 0 and b == 0:
return
diff = abs(a - b)
max_val = max(abs(a), abs(b))
pct_diff = (diff / max_val * 100) if max_val > 0 else 0
if diff <= 1:
report.add(MatchResult(
status="match", severity="ok",
message=f"{label}: matches ({a:.0f})",
field_name=label,
))
elif pct_diff <= tolerance_pct * 100:
report.add(MatchResult(
status="match", severity="ok",
message=f"{label}: diff within rounding tolerance ({diff:.0f})",
field_name=label,
))
elif pct_diff <= 5:
report.add(MatchResult(
status="mismatch", severity="warning",
message=f"{label}: {pct_diff:.1f}% diff ({a:.0f} vs {b:.0f})",
field_name=label, expected=a, actual=b,
source_a=source_a, source_b=source_b,
))
else:
report.add(MatchResult(
status="mismatch", severity="critical",
message=f"{label}: {pct_diff:.1f}% diff ({a:.0f} vs {b:.0f})",
field_name=label, expected=a, actual=b,
source_a=source_a, source_b=source_b,
))
def _compare_doc_amounts(
report: ComparisonResult,
doc_a: dict, doc_b: dict,
label_a: str, label_b: str,
) -> None:
"""Apples-to-apples amount comparison between two documents.
Order of preference: gross-gross > net-net > generic-generic.
Documents at different levels (one only gross, the other only net) are skipped.
"""
source_a = doc_a.get("_source", {}).get("file_name", label_a) if isinstance(doc_a.get("_source"), dict) else label_a
source_b = doc_b.get("_source", {}).get("file_name", label_b) if isinstance(doc_b.get("_source"), dict) else label_b
# Gross-gross
gross_a = _get_gross_amount(doc_a)
gross_b = _get_gross_amount(doc_b)
if gross_a is not None and gross_b is not None:
_compare_amounts(
report, f"Gross total ({label_a} vs {label_b})",
gross_a, gross_b, source_a, source_b,
)
return
# Net-net
net_a = _get_net_amount(doc_a)
net_b = _get_net_amount(doc_b)
if net_a is not None and net_b is not None:
_compare_amounts(
report, f"Net total ({label_a} vs {label_b})",
net_a, net_b, source_a, source_b,
)
return
# Generic-generic
gen_a = _get_generic_amount(doc_a)
gen_b = _get_generic_amount(doc_b)
if gen_a is not None and gen_b is not None:
_compare_amounts(
report, f"Amount ({label_a} vs {label_b})",
gen_a, gen_b, source_a, source_b,
)
# ---------------------------------------------------------------------------
# Line-item comparison (4-pass matching)
# ---------------------------------------------------------------------------
def _get_item_quantity(item: dict) -> float | None:
if not isinstance(item, dict):
return None
for field_name in ("quantity", "qty", "mennyiseg", "db", "darabszam", "menny"):
val = coerce_number(item.get(field_name))
if val is not None:
return val
return None
def _get_item_code(item: dict) -> str:
if not isinstance(item, dict):
return ""
for field_name in ("item_code", "code", "sku", "article", "article_number",
"cikkszam", "cikk_szam"):
val = item.get(field_name)
if val:
return str(val).lower().strip()
return ""
def _get_item_description(item: dict) -> str:
"""Return the line-item description, multilingual fallback."""
if not isinstance(item, dict):
return ""
for field_name in ("description", "name", "megnevezes"):
val = item.get(field_name)
if val:
return str(val).lower().strip()
return ""
def _fuzzy_match_strict(a: str, b: str) -> bool:
"""Strict fuzzy match: 0.8 word overlap + diff-token must not contain digits."""
if not a or not b:
return False
if a == b:
return True
words_a = set(a.split())
words_b = set(b.split())
if not words_a or not words_b:
return False
intersection = len(words_a & words_b)
max_size = max(len(words_a), len(words_b))
overlap = intersection / max_size
if overlap < 0.8:
return False
diff_words = words_a ^ words_b
for word in diff_words:
if any(c.isdigit() for c in word):
return False
return True
def _find_matching_item(name_a: str, code_a: str, items_b: list) -> dict | None:
"""4-pass line-item matching.
Pass 1: item_code exact (strongest)
Pass 2: exact name
Pass 3: substring (one contains the other)
Pass 4: strict fuzzy (0.8 overlap, diff token must not contain digits)
"""
valid_b = [item for item in items_b if isinstance(item, dict)]
# Pass 1: item_code
if code_a:
for item_b in valid_b:
code_b = _get_item_code(item_b)
if code_b and code_b == code_a:
return item_b
# Pass 2: exact name
for item_b in valid_b:
name_b = _get_item_description(item_b)
if name_b and name_a == name_b:
return item_b
# Pass 3: substring
for item_b in valid_b:
name_b = _get_item_description(item_b)
if not name_b:
continue
if name_a in name_b or name_b in name_a:
return item_b
# Pass 4: strict fuzzy
for item_b in valid_b:
name_b = _get_item_description(item_b)
if not name_b:
continue
if _fuzzy_match_strict(name_a, name_b):
return item_b
return None
def _compare_items_between(
report: ComparisonResult,
doc_a: dict, doc_b: dict,
label_a: str, label_b: str,
) -> None:
"""Pair line items between two documents and compare quantities.
Missing item: missing/warning. Different qty: warning (<2 units) or critical (≥2 units).
"""
items_a = doc_a.get("line_items") or doc_a.get("tetelek") or []
items_b = doc_b.get("line_items") or doc_b.get("tetelek") or []
if not items_a or not items_b:
return
source_a = doc_a.get("_source", {}).get("file_name", label_a) if isinstance(doc_a.get("_source"), dict) else label_a
source_b = doc_b.get("_source", {}).get("file_name", label_b) if isinstance(doc_b.get("_source"), dict) else label_b
for item_a in items_a:
if not isinstance(item_a, dict):
continue
name_a_raw = item_a.get("description") or item_a.get("megnevezes", "")
name_a = str(name_a_raw).lower().strip()
if not name_a:
continue
qty_a = _get_item_quantity(item_a)
code_a = _get_item_code(item_a)
matched_item = _find_matching_item(name_a, code_a, items_b)
if matched_item is None:
report.add(MatchResult(
status="missing",
severity="warning",
message=(
f"Item not found: '{name_a_raw}' present in {label_a} "
f"but missing from {label_b}"
),
field_name="line_item",
actual=name_a_raw,
source_a=source_a,
source_b=source_b,
))
continue
qty_b = _get_item_quantity(matched_item)
if qty_a is None or qty_b is None:
continue
diff = abs(qty_a - qty_b)
if diff < 0.01:
report.add(MatchResult(
status="match",
severity="ok",
message=f"Item matches: '{name_a_raw}' ({label_a} vs {label_b})",
field_name="line_item",
))
else:
severity = "critical" if diff >= 2 else "warning"
report.add(MatchResult(
status="mismatch",
severity=severity,
message=(
f"Quantity discrepancy: '{name_a_raw}' — "
f"{label_a}: {qty_a:g}, {label_b}: {qty_b:g} "
f"(diff: {diff:g})"
),
field_name="quantity",
expected=qty_a,
actual=qty_b,
source_a=source_a,
source_b=source_b,
))
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def three_way_match(
invoice: dict, delivery_note: dict, purchase_order: dict,
) -> ComparisonResult:
"""Three-way matching (invoice + delivery note + purchase order).
All three pairs:
- invoice ↔ purchase order
- invoice ↔ delivery note
- delivery note ↔ purchase order
+ apples-to-apples amounts + 4-pass line-item matching + date logic.
"""
report = ComparisonResult()
# Amounts
_compare_doc_amounts(report, invoice, purchase_order, "invoice", "purchase_order")
_compare_doc_amounts(report, invoice, delivery_note, "invoice", "delivery_note")
_compare_doc_amounts(report, delivery_note, purchase_order, "delivery_note", "purchase_order")
# Line items
_compare_items_between(report, invoice, purchase_order, "invoice", "purchase_order")
_compare_items_between(report, invoice, delivery_note, "invoice", "delivery_note")
_compare_items_between(report, delivery_note, purchase_order, "delivery_note", "purchase_order")
# Date logic: invoice date should NOT precede the purchase order date
inv_date = invoice.get("issue_date") or invoice.get("kiallitas_datuma")
po_date = (purchase_order.get("date") or purchase_order.get("order_date")
or purchase_order.get("megrendeles_datuma") or purchase_order.get("datum"))
if (isinstance(inv_date, str) and isinstance(po_date, str)
and len(inv_date) >= 10 and len(po_date) >= 10):
if inv_date[:10] < po_date[:10]:
report.add(MatchResult(
status="mismatch",
severity="warning",
message=(
f"Invoice issue date ({inv_date[:10]}) is earlier than the "
f"purchase order date ({po_date[:10]})"
),
field_name="date",
expected=po_date,
actual=inv_date,
))
return report
def compare_two_documents(
doc_a: dict, doc_b: dict, fields: list[str],
) -> ComparisonResult:
"""Compare specified fields between two documents.
Numbers are compared numerically; strings exact-comparable.
"""
report = ComparisonResult()
for field_name in fields:
if field_name.startswith("_"):
continue
val_a = doc_a.get(field_name)
val_b = doc_b.get(field_name)
if val_a is None and val_b is None:
continue
if val_a is None or val_b is None:
report.add(MatchResult(
status="missing",
severity="warning",
message=f"'{field_name}' missing from one of the documents",
field_name=field_name,
expected=val_a,
actual=val_b,
))
continue
num_a = coerce_number(val_a)
num_b = coerce_number(val_b)
if num_a is not None and num_b is not None:
_compare_amounts(
report, field_name, num_a, num_b,
doc_a.get("_source", {}).get("file_name", "A") if isinstance(doc_a.get("_source"), dict) else "A",
doc_b.get("_source", {}).get("file_name", "B") if isinstance(doc_b.get("_source"), dict) else "B",
)
elif isinstance(val_a, (dict, list)) or isinstance(val_b, (dict, list)):
continue
elif str(val_a).strip().lower() != str(val_b).strip().lower():
report.add(MatchResult(
status="mismatch",
severity="warning",
message=f"'{field_name}' differs: '{val_a}' vs '{val_b}'",
field_name=field_name,
expected=val_a,
actual=val_b,
))
else:
report.add(MatchResult(
status="match",
severity="ok",
message=f"'{field_name}' matches",
field_name=field_name,
))
return report