| """Timezone Converter — Conversion worker layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| import uuid |
| from datetime import datetime, timezone |
| from typing import Any, Dict, Iterator, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TimezoneWorker: |
| """Conversion worker for the Timezone Converter application.""" |
|
|
| def __init__( |
| self, |
| store: Any, |
| config: Optional[Dict[str, Any]] = None, |
| ) -> None: |
| self._store = store |
| self._cfg = config or {} |
| self._dst_active = self._cfg.get("dst_active", None) |
| logger.debug("%s initialised", self.__class__.__name__) |
|
|
| def schedule_conversion( |
| self, dst_active: Any, abbreviation: Any, **extra: Any |
| ) -> Dict[str, Any]: |
| """Create and persist a new Conversion record.""" |
| now = datetime.now(timezone.utc).isoformat() |
| record: Dict[str, Any] = { |
| "id": str(uuid.uuid4()), |
| "dst_active": dst_active, |
| "abbreviation": abbreviation, |
| "status": "active", |
| "created_at": now, |
| **extra, |
| } |
| saved = self._store.put(record) |
| logger.info("schedule_conversion: created %s", saved["id"]) |
| return saved |
|
|
| def get_conversion(self, record_id: str) -> Optional[Dict[str, Any]]: |
| """Retrieve a Conversion by its *record_id*.""" |
| record = self._store.get(record_id) |
| if record is None: |
| logger.debug("get_conversion: %s not found", record_id) |
| return record |
|
|
| def display_conversion( |
| self, record_id: str, **changes: Any |
| ) -> Dict[str, Any]: |
| """Apply *changes* to an existing Conversion.""" |
| record = self._store.get(record_id) |
| if record is None: |
| raise KeyError(f"Conversion {record_id!r} not found") |
| record.update(changes) |
| record["updated_at"] = datetime.now(timezone.utc).isoformat() |
| return self._store.put(record) |
|
|
| def compare_conversion(self, record_id: str) -> bool: |
| """Remove a Conversion; returns True on success.""" |
| if self._store.get(record_id) is None: |
| return False |
| self._store.delete(record_id) |
| logger.info("compare_conversion: removed %s", record_id) |
| return True |
|
|
| def list_conversions( |
| self, |
| status: Optional[str] = None, |
| limit: int = 50, |
| offset: int = 0, |
| ) -> List[Dict[str, Any]]: |
| """Return paginated Conversion records.""" |
| query: Dict[str, Any] = {} |
| if status: |
| query["status"] = status |
| results = self._store.find(query, limit=limit, offset=offset) |
| logger.debug("list_conversions: %d results", len(results)) |
| return results |
|
|
| def iter_conversions( |
| self, batch_size: int = 100 |
| ) -> Iterator[Dict[str, Any]]: |
| """Yield all Conversion records in batches of *batch_size*.""" |
| offset = 0 |
| while True: |
| page = self.list_conversions(limit=batch_size, offset=offset) |
| if not page: |
| break |
| yield from page |
| if len(page) < batch_size: |
| break |
| offset += batch_size |
|
|