Amanda Torres
initial commit
0f0ba15
"""Migration Runner — Rollback service layer."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class MigrationWorker:
"""Business-logic service for Rollback operations in Migration Runner."""
def __init__(
self,
repo: Any,
events: Optional[Any] = None,
) -> None:
self._repo = repo
self._events = events
logger.debug("MigrationWorker started")
def plan(
self, payload: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute the plan workflow for a new Rollback."""
if "rolled_back_at" not in payload:
raise ValueError("Missing required field: rolled_back_at")
record = self._repo.insert(
payload["rolled_back_at"], payload.get("version"),
**{k: v for k, v in payload.items()
if k not in ("rolled_back_at", "version")}
)
if self._events:
self._events.emit("rollback.pland", record)
return record
def run(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
"""Apply *changes* to a Rollback and emit a change event."""
ok = self._repo.update(rec_id, **changes)
if not ok:
raise KeyError(f"Rollback {rec_id!r} not found")
updated = self._repo.fetch(rec_id)
if self._events:
self._events.emit("rollback.rund", updated)
return updated
def check(self, rec_id: str) -> None:
"""Remove a Rollback and emit a removal event."""
ok = self._repo.delete(rec_id)
if not ok:
raise KeyError(f"Rollback {rec_id!r} not found")
if self._events:
self._events.emit("rollback.checkd", {"id": rec_id})
def search(
self,
rolled_back_at: Optional[Any] = None,
status: Optional[str] = None,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Search rollbacks by *rolled_back_at* and/or *status*."""
filters: Dict[str, Any] = {}
if rolled_back_at is not None:
filters["rolled_back_at"] = rolled_back_at
if status is not None:
filters["status"] = status
rows, _ = self._repo.query(filters, limit=limit)
logger.debug("search rollbacks: %d hits", len(rows))
return rows
@property
def stats(self) -> Dict[str, int]:
"""Quick summary of Rollback counts by status."""
result: Dict[str, int] = {}
for status in ("active", "pending", "closed"):
_, count = self._repo.query({"status": status}, limit=0)
result[status] = count
return result