| """Cross-document checks — three-way matching and two-doc compare. |
| |
| Pure Python, no LLM calls. ``utils.numbers.coerce_number`` provides tolerant |
| numeric normalization (HU/US/EU/FR formats, currency tokens, null aliases). |
| |
| Two APIs: |
| * ``three_way_match(invoice, delivery_note, purchase_order)`` → ComparisonResult |
| * ``compare_two_documents(doc_a, doc_b, fields)`` → ComparisonResult |
| |
| ``ComparisonResult`` is dict-shaped (Pydantic-compatible). The ``compare_node`` |
| wraps it into a ``ComparisonReport`` Pydantic model in the parent state. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
|
|
| from utils.numbers import coerce_number |
|
|
|
|
| @dataclass |
| class MatchResult: |
| """One comparison result.""" |
| status: str |
| severity: str |
| message: str |
| field_name: str |
| expected: str | float | None = None |
| actual: str | float | None = None |
| source_a: str = "" |
| source_b: str = "" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "status": self.status, |
| "severity": self.severity, |
| "message": self.message, |
| "field_name": self.field_name, |
| "expected": self.expected, |
| "actual": self.actual, |
| "source_a": self.source_a, |
| "source_b": self.source_b, |
| } |
|
|
|
|
| @dataclass |
| class ComparisonResult: |
| """Aggregated three-way / pair-wise comparison output.""" |
| matches: list[MatchResult] = field(default_factory=list) |
| total_checks: int = 0 |
| ok_count: int = 0 |
| warning_count: int = 0 |
| critical_count: int = 0 |
| missing_count: int = 0 |
|
|
| def add(self, result: MatchResult) -> None: |
| self.matches.append(result) |
| self.total_checks += 1 |
| if result.severity == "ok": |
| self.ok_count += 1 |
| elif result.severity == "warning": |
| self.warning_count += 1 |
| elif result.severity == "critical": |
| self.critical_count += 1 |
| if result.status == "missing": |
| self.missing_count += 1 |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _get_gross_amount(data: dict) -> float | None: |
| if not isinstance(data, dict): |
| return None |
| for field_name in ( |
| "total_gross", "gross_total", "gross_amount", |
| |
| "brutto_vegosszeg", "brutto_osszeg", "brutto_vegosszeg_huf", |
| ): |
| val = coerce_number(data.get(field_name)) |
| if val is not None: |
| return val |
| return None |
|
|
|
|
| def _get_net_amount(data: dict) -> float | None: |
| if not isinstance(data, dict): |
| return None |
| for field_name in ( |
| "total_net", "net_total", "net_amount", |
| |
| "netto_vegosszeg", "netto_osszeg", "netto_vegosszeg_huf", |
| ): |
| val = coerce_number(data.get(field_name)) |
| if val is not None: |
| return val |
| return None |
|
|
|
|
| def _get_generic_amount(data: dict) -> float | None: |
| if not isinstance(data, dict): |
| return None |
| for field_name in ("amount", "total", "value", "osszeg", "ertek"): |
| val = coerce_number(data.get(field_name)) |
| if val is not None: |
| return val |
| return None |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _compare_amounts( |
| report: ComparisonResult, |
| label: str, |
| amount_a, amount_b, |
| source_a: str = "", |
| source_b: str = "", |
| tolerance_pct: float = 0.01, |
| ) -> None: |
| """Compare two amounts with tolerance tiers. |
| |
| Tolerance levels: |
| * ≤ 1 absolute diff → OK |
| * ≤ 1% diff → OK (rounding edge) |
| * ≤ 5% diff → warning |
| * > 5% diff → critical |
| """ |
| a = coerce_number(amount_a) |
| b = coerce_number(amount_b) |
|
|
| if a is None or b is None: |
| return |
|
|
| if a == 0 and b == 0: |
| return |
|
|
| diff = abs(a - b) |
| max_val = max(abs(a), abs(b)) |
| pct_diff = (diff / max_val * 100) if max_val > 0 else 0 |
|
|
| if diff <= 1: |
| report.add(MatchResult( |
| status="match", severity="ok", |
| message=f"{label}: matches ({a:.0f})", |
| field_name=label, |
| )) |
| elif pct_diff <= tolerance_pct * 100: |
| report.add(MatchResult( |
| status="match", severity="ok", |
| message=f"{label}: diff within rounding tolerance ({diff:.0f})", |
| field_name=label, |
| )) |
| elif pct_diff <= 5: |
| report.add(MatchResult( |
| status="mismatch", severity="warning", |
| message=f"{label}: {pct_diff:.1f}% diff ({a:.0f} vs {b:.0f})", |
| field_name=label, expected=a, actual=b, |
| source_a=source_a, source_b=source_b, |
| )) |
| else: |
| report.add(MatchResult( |
| status="mismatch", severity="critical", |
| message=f"{label}: {pct_diff:.1f}% diff ({a:.0f} vs {b:.0f})", |
| field_name=label, expected=a, actual=b, |
| source_a=source_a, source_b=source_b, |
| )) |
|
|
|
|
| def _compare_doc_amounts( |
| report: ComparisonResult, |
| doc_a: dict, doc_b: dict, |
| label_a: str, label_b: str, |
| ) -> None: |
| """Apples-to-apples amount comparison between two documents. |
| |
| Order of preference: gross-gross > net-net > generic-generic. |
| Documents at different levels (one only gross, the other only net) are skipped. |
| """ |
| source_a = doc_a.get("_source", {}).get("file_name", label_a) if isinstance(doc_a.get("_source"), dict) else label_a |
| source_b = doc_b.get("_source", {}).get("file_name", label_b) if isinstance(doc_b.get("_source"), dict) else label_b |
|
|
| |
| gross_a = _get_gross_amount(doc_a) |
| gross_b = _get_gross_amount(doc_b) |
| if gross_a is not None and gross_b is not None: |
| _compare_amounts( |
| report, f"Gross total ({label_a} vs {label_b})", |
| gross_a, gross_b, source_a, source_b, |
| ) |
| return |
|
|
| |
| net_a = _get_net_amount(doc_a) |
| net_b = _get_net_amount(doc_b) |
| if net_a is not None and net_b is not None: |
| _compare_amounts( |
| report, f"Net total ({label_a} vs {label_b})", |
| net_a, net_b, source_a, source_b, |
| ) |
| return |
|
|
| |
| gen_a = _get_generic_amount(doc_a) |
| gen_b = _get_generic_amount(doc_b) |
| if gen_a is not None and gen_b is not None: |
| _compare_amounts( |
| report, f"Amount ({label_a} vs {label_b})", |
| gen_a, gen_b, source_a, source_b, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _get_item_quantity(item: dict) -> float | None: |
| if not isinstance(item, dict): |
| return None |
| for field_name in ("quantity", "qty", "mennyiseg", "db", "darabszam", "menny"): |
| val = coerce_number(item.get(field_name)) |
| if val is not None: |
| return val |
| return None |
|
|
|
|
| def _get_item_code(item: dict) -> str: |
| if not isinstance(item, dict): |
| return "" |
| for field_name in ("item_code", "code", "sku", "article", "article_number", |
| "cikkszam", "cikk_szam"): |
| val = item.get(field_name) |
| if val: |
| return str(val).lower().strip() |
| return "" |
|
|
|
|
| def _get_item_description(item: dict) -> str: |
| """Return the line-item description, multilingual fallback.""" |
| if not isinstance(item, dict): |
| return "" |
| for field_name in ("description", "name", "megnevezes"): |
| val = item.get(field_name) |
| if val: |
| return str(val).lower().strip() |
| return "" |
|
|
|
|
| def _fuzzy_match_strict(a: str, b: str) -> bool: |
| """Strict fuzzy match: 0.8 word overlap + diff-token must not contain digits.""" |
| if not a or not b: |
| return False |
| if a == b: |
| return True |
|
|
| words_a = set(a.split()) |
| words_b = set(b.split()) |
| if not words_a or not words_b: |
| return False |
|
|
| intersection = len(words_a & words_b) |
| max_size = max(len(words_a), len(words_b)) |
| overlap = intersection / max_size |
|
|
| if overlap < 0.8: |
| return False |
|
|
| diff_words = words_a ^ words_b |
| for word in diff_words: |
| if any(c.isdigit() for c in word): |
| return False |
|
|
| return True |
|
|
|
|
| def _find_matching_item(name_a: str, code_a: str, items_b: list) -> dict | None: |
| """4-pass line-item matching. |
| |
| Pass 1: item_code exact (strongest) |
| Pass 2: exact name |
| Pass 3: substring (one contains the other) |
| Pass 4: strict fuzzy (0.8 overlap, diff token must not contain digits) |
| """ |
| valid_b = [item for item in items_b if isinstance(item, dict)] |
|
|
| |
| if code_a: |
| for item_b in valid_b: |
| code_b = _get_item_code(item_b) |
| if code_b and code_b == code_a: |
| return item_b |
|
|
| |
| for item_b in valid_b: |
| name_b = _get_item_description(item_b) |
| if name_b and name_a == name_b: |
| return item_b |
|
|
| |
| for item_b in valid_b: |
| name_b = _get_item_description(item_b) |
| if not name_b: |
| continue |
| if name_a in name_b or name_b in name_a: |
| return item_b |
|
|
| |
| for item_b in valid_b: |
| name_b = _get_item_description(item_b) |
| if not name_b: |
| continue |
| if _fuzzy_match_strict(name_a, name_b): |
| return item_b |
|
|
| return None |
|
|
|
|
| def _compare_items_between( |
| report: ComparisonResult, |
| doc_a: dict, doc_b: dict, |
| label_a: str, label_b: str, |
| ) -> None: |
| """Pair line items between two documents and compare quantities. |
| |
| Missing item: missing/warning. Different qty: warning (<2 units) or critical (≥2 units). |
| """ |
| items_a = doc_a.get("line_items") or doc_a.get("tetelek") or [] |
| items_b = doc_b.get("line_items") or doc_b.get("tetelek") or [] |
|
|
| if not items_a or not items_b: |
| return |
|
|
| source_a = doc_a.get("_source", {}).get("file_name", label_a) if isinstance(doc_a.get("_source"), dict) else label_a |
| source_b = doc_b.get("_source", {}).get("file_name", label_b) if isinstance(doc_b.get("_source"), dict) else label_b |
|
|
| for item_a in items_a: |
| if not isinstance(item_a, dict): |
| continue |
| name_a_raw = item_a.get("description") or item_a.get("megnevezes", "") |
| name_a = str(name_a_raw).lower().strip() |
| if not name_a: |
| continue |
|
|
| qty_a = _get_item_quantity(item_a) |
| code_a = _get_item_code(item_a) |
|
|
| matched_item = _find_matching_item(name_a, code_a, items_b) |
|
|
| if matched_item is None: |
| report.add(MatchResult( |
| status="missing", |
| severity="warning", |
| message=( |
| f"Item not found: '{name_a_raw}' present in {label_a} " |
| f"but missing from {label_b}" |
| ), |
| field_name="line_item", |
| actual=name_a_raw, |
| source_a=source_a, |
| source_b=source_b, |
| )) |
| continue |
|
|
| qty_b = _get_item_quantity(matched_item) |
| if qty_a is None or qty_b is None: |
| continue |
|
|
| diff = abs(qty_a - qty_b) |
| if diff < 0.01: |
| report.add(MatchResult( |
| status="match", |
| severity="ok", |
| message=f"Item matches: '{name_a_raw}' ({label_a} vs {label_b})", |
| field_name="line_item", |
| )) |
| else: |
| severity = "critical" if diff >= 2 else "warning" |
| report.add(MatchResult( |
| status="mismatch", |
| severity=severity, |
| message=( |
| f"Quantity discrepancy: '{name_a_raw}' — " |
| f"{label_a}: {qty_a:g}, {label_b}: {qty_b:g} " |
| f"(diff: {diff:g})" |
| ), |
| field_name="quantity", |
| expected=qty_a, |
| actual=qty_b, |
| source_a=source_a, |
| source_b=source_b, |
| )) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def three_way_match( |
| invoice: dict, delivery_note: dict, purchase_order: dict, |
| ) -> ComparisonResult: |
| """Three-way matching (invoice + delivery note + purchase order). |
| |
| All three pairs: |
| - invoice ↔ purchase order |
| - invoice ↔ delivery note |
| - delivery note ↔ purchase order |
| |
| + apples-to-apples amounts + 4-pass line-item matching + date logic. |
| """ |
| report = ComparisonResult() |
|
|
| |
| _compare_doc_amounts(report, invoice, purchase_order, "invoice", "purchase_order") |
| _compare_doc_amounts(report, invoice, delivery_note, "invoice", "delivery_note") |
| _compare_doc_amounts(report, delivery_note, purchase_order, "delivery_note", "purchase_order") |
|
|
| |
| _compare_items_between(report, invoice, purchase_order, "invoice", "purchase_order") |
| _compare_items_between(report, invoice, delivery_note, "invoice", "delivery_note") |
| _compare_items_between(report, delivery_note, purchase_order, "delivery_note", "purchase_order") |
|
|
| |
| inv_date = invoice.get("issue_date") or invoice.get("kiallitas_datuma") |
| po_date = (purchase_order.get("date") or purchase_order.get("order_date") |
| or purchase_order.get("megrendeles_datuma") or purchase_order.get("datum")) |
| if (isinstance(inv_date, str) and isinstance(po_date, str) |
| and len(inv_date) >= 10 and len(po_date) >= 10): |
| if inv_date[:10] < po_date[:10]: |
| report.add(MatchResult( |
| status="mismatch", |
| severity="warning", |
| message=( |
| f"Invoice issue date ({inv_date[:10]}) is earlier than the " |
| f"purchase order date ({po_date[:10]})" |
| ), |
| field_name="date", |
| expected=po_date, |
| actual=inv_date, |
| )) |
|
|
| return report |
|
|
|
|
| def compare_two_documents( |
| doc_a: dict, doc_b: dict, fields: list[str], |
| ) -> ComparisonResult: |
| """Compare specified fields between two documents. |
| |
| Numbers are compared numerically; strings exact-comparable. |
| """ |
| report = ComparisonResult() |
|
|
| for field_name in fields: |
| if field_name.startswith("_"): |
| continue |
|
|
| val_a = doc_a.get(field_name) |
| val_b = doc_b.get(field_name) |
|
|
| if val_a is None and val_b is None: |
| continue |
| if val_a is None or val_b is None: |
| report.add(MatchResult( |
| status="missing", |
| severity="warning", |
| message=f"'{field_name}' missing from one of the documents", |
| field_name=field_name, |
| expected=val_a, |
| actual=val_b, |
| )) |
| continue |
|
|
| num_a = coerce_number(val_a) |
| num_b = coerce_number(val_b) |
|
|
| if num_a is not None and num_b is not None: |
| _compare_amounts( |
| report, field_name, num_a, num_b, |
| doc_a.get("_source", {}).get("file_name", "A") if isinstance(doc_a.get("_source"), dict) else "A", |
| doc_b.get("_source", {}).get("file_name", "B") if isinstance(doc_b.get("_source"), dict) else "B", |
| ) |
| elif isinstance(val_a, (dict, list)) or isinstance(val_b, (dict, list)): |
| continue |
| elif str(val_a).strip().lower() != str(val_b).strip().lower(): |
| report.add(MatchResult( |
| status="mismatch", |
| severity="warning", |
| message=f"'{field_name}' differs: '{val_a}' vs '{val_b}'", |
| field_name=field_name, |
| expected=val_a, |
| actual=val_b, |
| )) |
| else: |
| report.add(MatchResult( |
| status="match", |
| severity="ok", |
| message=f"'{field_name}' matches", |
| field_name=field_name, |
| )) |
|
|
| return report |
|
|