| """Timezone Converter — Conversion repository.""" |
| from __future__ import annotations |
|
|
| import logging |
| import uuid |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TimezoneHandler: |
| """Thin repository wrapper for Conversion persistence in Timezone Converter.""" |
|
|
| TABLE = "conversions" |
|
|
| def __init__(self, db: Any) -> None: |
| self._db = db |
| logger.debug("TimezoneHandler bound to %s", db) |
|
|
| def insert(self, converted_at: Any, utc_offset: Any, **kwargs: Any) -> str: |
| """Persist a new Conversion row and return its generated ID.""" |
| rec_id = str(uuid.uuid4()) |
| row: Dict[str, Any] = { |
| "id": rec_id, |
| "converted_at": converted_at, |
| "utc_offset": utc_offset, |
| "created_at": datetime.now(timezone.utc).isoformat(), |
| **kwargs, |
| } |
| self._db.insert(self.TABLE, row) |
| return rec_id |
|
|
| def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]: |
| """Return the Conversion row for *rec_id*, or None.""" |
| return self._db.fetch(self.TABLE, rec_id) |
|
|
| def update(self, rec_id: str, **fields: Any) -> bool: |
| """Patch *fields* on an existing Conversion row.""" |
| if not self._db.exists(self.TABLE, rec_id): |
| return False |
| fields["updated_at"] = datetime.now(timezone.utc).isoformat() |
| self._db.update(self.TABLE, rec_id, fields) |
| return True |
|
|
| def delete(self, rec_id: str) -> bool: |
| """Hard-delete a Conversion row; returns False if not found.""" |
| if not self._db.exists(self.TABLE, rec_id): |
| return False |
| self._db.delete(self.TABLE, rec_id) |
| return True |
|
|
| def query( |
| self, |
| filters: Optional[Dict[str, Any]] = None, |
| order_by: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| ) -> Tuple[List[Dict[str, Any]], int]: |
| """Return (rows, total_count) for the given *filters*.""" |
| rows = self._db.select(self.TABLE, filters or {}, limit, offset) |
| total = self._db.count(self.TABLE, filters or {}) |
| logger.debug("query conversions: %d/%d", len(rows), total) |
| return rows, total |
|
|
| def compare_by_abbreviation( |
| self, value: Any, limit: int = 50 |
| ) -> List[Dict[str, Any]]: |
| """Fetch conversions filtered by *abbreviation*.""" |
| rows, _ = self.query({"abbreviation": value}, limit=limit) |
| return rows |
|
|
| def bulk_insert( |
| self, records: List[Dict[str, Any]] |
| ) -> List[str]: |
| """Insert *records* in bulk and return their generated IDs.""" |
| ids: List[str] = [] |
| for rec in records: |
| rec_id = self.insert( |
| rec["converted_at"], rec.get("utc_offset"), |
| **{k: v for k, v in rec.items() if k not in ("converted_at", "utc_offset")} |
| ) |
| ids.append(rec_id) |
| logger.info("bulk_insert conversions: %d rows", len(ids)) |
| return ids |
|
|