| """Core address conversion logic.""" |
|
|
| import json |
| from pathlib import Path |
|
|
| from .models import AdminUnit, ConversionResult, ConversionStatus, MappingType |
| from .normalizer import normalize_key, normalize_for_matching |
| from .parser import parse_address |
|
|
| |
| _DATA_PATH = Path(__file__).parent.parent / "data" / "mapping.json" |
| _mapping_data = None |
| _index = None |
|
|
|
|
| def _load_data(): |
| global _mapping_data, _index |
| if _mapping_data is not None: |
| return |
|
|
| with open(_DATA_PATH, encoding="utf-8") as f: |
| _mapping_data = json.load(f) |
|
|
| _index = _build_index(_mapping_data) |
|
|
|
|
| def _build_index(data: dict) -> dict: |
| """Build lookup indices for fast matching.""" |
| index = { |
| |
| "province": data["province_mapping"], |
| |
| "province_names": data["province_names"], |
| "old_province_names": data["old_province_names"], |
| |
| "exact": {}, |
| |
| "ward_only": {}, |
| |
| "province_keywords": {}, |
| } |
|
|
| |
| for key, info in data["old_province_names"].items(): |
| index["province_keywords"][normalize_key(info["name"])] = key |
| index["province_keywords"][normalize_key(info["short"])] = key |
| index["province_keywords"][key] = key |
|
|
| |
| for record in data["ward_mapping"]: |
| prov_key = record["old_province_key"] |
| dist_key = record["old_district_key"] |
| ward_key = record["old_ward_key"] |
|
|
| |
| exact_key = (prov_key, dist_key, ward_key) |
| index["exact"].setdefault(exact_key, []).append(record) |
|
|
| |
| wo_key = (prov_key, ward_key) |
| index["ward_only"].setdefault(wo_key, []).append(record) |
|
|
| return index |
|
|
|
|
| def _resolve_province(text: str) -> str | None: |
| """Resolve a province string to its key.""" |
| normalized = normalize_for_matching(text) |
| return _index["province_keywords"].get(normalized) |
|
|
|
|
| def _find_mapping(old_prov_key: str, old_dist_key: str, old_ward_key: str) -> list[dict]: |
| """Find mapping records for given old admin unit keys.""" |
| |
| exact_key = (old_prov_key, old_dist_key, old_ward_key) |
| records = _index["exact"].get(exact_key, []) |
| if records: |
| return records |
|
|
| |
| wo_key = (old_prov_key, old_ward_key) |
| records = _index["ward_only"].get(wo_key, []) |
| if records: |
| return records |
|
|
| return [] |
|
|
|
|
| def _select_best_record(records: list[dict]) -> dict | None: |
| """Select the best record from multiple matches.""" |
| if not records: |
| return None |
| if len(records) == 1: |
| return records[0] |
|
|
| |
| for r in records: |
| if r.get("is_default"): |
| return r |
|
|
| |
| return records[0] |
|
|
|
|
| def convert_address(address: str) -> ConversionResult: |
| """ |
| Convert a Vietnamese address from old format (63 provinces, 3-level) |
| to new format (34 provinces, 2-level). |
| |
| Args: |
| address: Vietnamese address string, e.g. |
| "Phường Phúc Xá, Quận Ba Đình, Thành phố Hà Nội" |
| |
| Returns: |
| ConversionResult with conversion details. |
| """ |
| _load_data() |
|
|
| result = ConversionResult(original=address) |
| parsed = parse_address(address) |
| result.old = parsed |
|
|
| |
| old_prov_key = _resolve_province(parsed.province) |
| if not old_prov_key: |
| |
| if parsed.district: |
| old_prov_key = _resolve_province(parsed.district) |
| if not old_prov_key: |
| result.status = ConversionStatus.NOT_FOUND |
| result.note = f"Province not found: {parsed.province}" |
| return result |
|
|
| |
| new_prov_key = _index["province"].get(old_prov_key) |
| if not new_prov_key: |
| result.status = ConversionStatus.NOT_FOUND |
| result.note = f"No province mapping for: {old_prov_key}" |
| return result |
|
|
| new_prov_info = _index["province_names"].get(new_prov_key, {}) |
| result.new.province = new_prov_info.get("name", "") |
|
|
| |
| if not parsed.ward and not parsed.district: |
| result.status = ConversionStatus.PARTIAL |
| result.converted = result.new.province |
| result.note = "Province-only conversion" |
| return result |
|
|
| |
| old_dist_key = normalize_key(parsed.district) if parsed.district else "" |
| old_ward_key = normalize_key(parsed.ward) if parsed.ward else "" |
|
|
| records = _find_mapping(old_prov_key, old_dist_key, old_ward_key) |
|
|
| if not records and parsed.ward: |
| |
| old_ward_key2 = normalize_key(parsed.district) if parsed.district else "" |
| if old_ward_key2: |
| records = _find_mapping(old_prov_key, "", old_ward_key2) |
|
|
| if not records: |
| result.status = ConversionStatus.PARTIAL |
| result.new.street = parsed.street |
| result.converted = result.new.to_address() |
| result.note = f"Ward not found, province converted" |
| return result |
|
|
| record = _select_best_record(records) |
| result.mapping_type = MappingType(record["mapping_type"]) |
| result.new.ward = record["new_ward"] |
| result.new.street = parsed.street |
| result.converted = result.new.to_address() |
| result.status = ConversionStatus.SUCCESS |
|
|
| if result.mapping_type == MappingType.DIVIDED: |
| result.note = "Old ward was split; default new ward selected" |
|
|
| return result |
|
|
|
|
| def batch_convert(addresses: list[str]) -> list[ConversionResult]: |
| """Convert a list of addresses.""" |
| _load_data() |
| return [convert_address(addr) for addr in addresses] |
|
|