"""Barcode Generator — Payload worker layer.""" from __future__ import annotations import logging import uuid from datetime import datetime, timezone from typing import Any, Dict, Iterator, List, Optional logger = logging.getLogger(__name__) class BarcodeWorker: """Payload worker for the Barcode Generator application.""" def __init__( self, store: Any, config: Optional[Dict[str, Any]] = None, ) -> None: self._store = store self._cfg = config or {} self._generated_at = self._cfg.get("generated_at", None) logger.debug("%s initialised", self.__class__.__name__) def decode_payload( self, generated_at: Any, height_px: Any, **extra: Any ) -> Dict[str, Any]: """Create and persist a new Payload record.""" now = datetime.now(timezone.utc).isoformat() record: Dict[str, Any] = { "id": str(uuid.uuid4()), "generated_at": generated_at, "height_px": height_px, "status": "active", "created_at": now, **extra, } saved = self._store.put(record) logger.info("decode_payload: created %s", saved["id"]) return saved def get_payload(self, record_id: str) -> Optional[Dict[str, Any]]: """Retrieve a Payload by its *record_id*.""" record = self._store.get(record_id) if record is None: logger.debug("get_payload: %s not found", record_id) return record def export_payload( self, record_id: str, **changes: Any ) -> Dict[str, Any]: """Apply *changes* to an existing Payload.""" record = self._store.get(record_id) if record is None: raise KeyError(f"Payload {record_id!r} not found") record.update(changes) record["updated_at"] = datetime.now(timezone.utc).isoformat() return self._store.put(record) def generate_payload(self, record_id: str) -> bool: """Remove a Payload; returns True on success.""" if self._store.get(record_id) is None: return False self._store.delete(record_id) logger.info("generate_payload: removed %s", record_id) return True def list_payloads( self, status: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: """Return paginated Payload records.""" query: Dict[str, Any] = {} if status: query["status"] = status results = self._store.find(query, limit=limit, offset=offset) logger.debug("list_payloads: %d results", len(results)) return results def iter_payloads( self, batch_size: int = 100 ) -> Iterator[Dict[str, Any]]: """Yield all Payload records in batches of *batch_size*.""" offset = 0 while True: page = self.list_payloads(limit=batch_size, offset=offset) if not page: break yield from page if len(page) < batch_size: break offset += batch_size