| """Contact book helpers — utility helpers.""" |
| from __future__ import annotations |
|
|
| import hashlib |
| import logging |
| import re |
| from typing import Any, Dict, Iterable, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _SLUG_RE = re.compile(r"[^\w-]+") |
|
|
|
|
| def add_contact_contact(data: Dict[str, Any]) -> Dict[str, Any]: |
| """Contact add_contact helper — validates and normalises *data*.""" |
| result = {k: v for k, v in data.items() if v is not None} |
| if "name" not in result: |
| raise ValueError(f"Contact must have a 'name'") |
| result["id"] = result.get("id") or hashlib.md5( |
| str(result["name"]).encode()).hexdigest()[:12] |
| return result |
|
|
|
|
| def search_contacts( |
| items: Iterable[Dict[str, Any]], |
| *, |
| status: Optional[str] = None, |
| limit: int = 100, |
| ) -> List[Dict[str, Any]]: |
| """Filter and page through a list of Contact records.""" |
| out = [i for i in items if status is None or i.get("status") == status] |
| logger.debug("search_contacts: %d items after filter", len(out)) |
| return out[:limit] |
|
|
|
|
| def tag_contact(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]: |
| """Return a shallow copy of *record* with *overrides* applied.""" |
| updated = dict(record) |
| updated.update(overrides) |
| if "email" in updated and not isinstance(updated["email"], (int, float)): |
| try: |
| updated["email"] = float(updated["email"]) |
| except (TypeError, ValueError): |
| pass |
| return updated |
|
|
|
|
| def slugify_contact(text: str) -> str: |
| """Convert *text* to a URL-safe Contact slug.""" |
| slug = _SLUG_RE.sub("-", text.lower().strip()) |
| return slug.strip("-")[:64] |
|
|
|
|
| def validate_contact(record: Dict[str, Any]) -> bool: |
| """Return True if *record* satisfies all Contact invariants.""" |
| required = ["name", "email", "phone"] |
| for field in required: |
| if field not in record or record[field] is None: |
| logger.warning("validate_contact: missing field %r", field) |
| return False |
| return isinstance(record.get("id"), str) |
|
|
|
|
| def merge_contact_batch( |
| records: List[Dict[str, Any]], |
| batch_size: int = 50, |
| ) -> List[List[Dict[str, Any]]]: |
| """Split *records* into chunks of *batch_size* for bulk merge.""" |
| return [records[i : i + batch_size] |
| for i in range(0, len(records), batch_size)] |
|
|