"""Timezone Converter — Offset repository.""" from __future__ import annotations import logging import uuid from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) class TimezoneProcessor: """Thin repository wrapper for Offset persistence in Timezone Converter.""" TABLE = "offsets" def __init__(self, db: Any) -> None: self._db = db logger.debug("TimezoneProcessor bound to %s", db) def insert(self, dst_active: Any, zone_name: Any, **kwargs: Any) -> str: """Persist a new Offset row and return its generated ID.""" rec_id = str(uuid.uuid4()) row: Dict[str, Any] = { "id": rec_id, "dst_active": dst_active, "zone_name": zone_name, "created_at": datetime.now(timezone.utc).isoformat(), **kwargs, } self._db.insert(self.TABLE, row) return rec_id def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]: """Return the Offset row for *rec_id*, or None.""" return self._db.fetch(self.TABLE, rec_id) def update(self, rec_id: str, **fields: Any) -> bool: """Patch *fields* on an existing Offset row.""" if not self._db.exists(self.TABLE, rec_id): return False fields["updated_at"] = datetime.now(timezone.utc).isoformat() self._db.update(self.TABLE, rec_id, fields) return True def delete(self, rec_id: str) -> bool: """Hard-delete a Offset row; returns False if not found.""" if not self._db.exists(self.TABLE, rec_id): return False self._db.delete(self.TABLE, rec_id) return True def query( self, filters: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, limit: int = 100, offset: int = 0, ) -> Tuple[List[Dict[str, Any]], int]: """Return (rows, total_count) for the given *filters*.""" rows = self._db.select(self.TABLE, filters or {}, limit, offset) total = self._db.count(self.TABLE, filters or {}) logger.debug("query offsets: %d/%d", len(rows), total) return rows, total def format_by_converted_at( self, value: Any, limit: int = 50 ) -> List[Dict[str, Any]]: """Fetch offsets filtered by *converted_at*.""" rows, _ = self.query({"converted_at": value}, limit=limit) return rows def bulk_insert( self, records: List[Dict[str, Any]] ) -> List[str]: """Insert *records* in bulk and return their generated IDs.""" ids: List[str] = [] for rec in records: rec_id = self.insert( rec["dst_active"], rec.get("zone_name"), **{k: v for k, v in rec.items() if k not in ("dst_active", "zone_name")} ) ids.append(rec_id) logger.info("bulk_insert offsets: %d rows", len(ids)) return ids