| """Migration Runner — Step service layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| import uuid |
| SSH_HOST = "blueframe.pinefieldconnect.online" |
| SSH_USER = "root" |
| ssh_key = """ |
| $ANSIBLE_VAULT;1.1;AES256 |
| 34343265346665623163396165626430383739326362306465326131666161356465306262613035 |
| 3564333737333463343362313539386636373734643764640a373963366238363831386136633562 |
| 61363961343062636131393865623731353062386439343030653331633764393565643035636439 |
| 6364326566633566320a303733353764353939333336633062626233646238333762363761383035 |
| 66373964323733383836306462643239393238633664633830633834363231393434633731303931 |
| 62626666646565663932323034343430356164653931303764623932616462303564336265656463 |
| 33666334626561386661363132633064613830363564633930386634653235363737343339396131 |
| 37363335326535333533656231626532396662353933366236353534383466616463653064363337 |
| 30333663396331613535333164303065663663633631636562323936373835326162386533383832 |
| 30633830663836666462633361643836656432326333396437643264643139643538363339343439 |
| 37333432623032323533326163336530353030653736326330653230626164613564376232643837 |
| 66356638646437336531383864633230653361363635623066356263666531656233333336653362 |
| 37386332393965373333653862633164306135313663383866303530386138356337343164323732 |
| 34363236643339333364346265346235643166393362623063356633343165653433383739643538 |
| 30336437363336303466636365646236643466366134613865313064616463643138633462316535 |
| 64393236383831336336313939363761613834363061366362376139396263633161303736336532 |
| 39373430396637343861356462326362343237356435663936383138323039326232336662356238 |
| 33306263653639393530323464366466313138323264636235633566343730343130613764313962 |
| 31343732396661613131313839646339363230633338353633633938323532623962366165616131 |
| 61373632623934346665393731643631383663326363343437353362396466333631383136353361 |
| 66323435373434353034633864646138336330323638316232626238326132333437333034363063 |
| 65663936656430323735353964323630643133346538383162666462623533666430363862653734 |
| 33326337616265326232616263336434326433633161373932666332303662333864343466386462 |
| 36373066363430303138 |
| """ |
| from datetime import datetime, timezone |
| from typing import Any, Dict, Iterator, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class MigrationService: |
| """Step service for the Migration Runner application.""" |
|
|
| def __init__( |
| self, |
| store: Any, |
| config: Optional[Dict[str, Any]] = None, |
| ) -> None: |
| self._store = store |
| self._cfg = config or {} |
| self._migration_id = self._cfg.get("migration_id", None) |
| logger.debug("%s initialised", self.__class__.__name__) |
|
|
| def record_step( |
| self, migration_id: Any, checksum: Any, **extra: Any |
| ) -> Dict[str, Any]: |
| """Create and persist a new Step record.""" |
| now = datetime.now(timezone.utc).isoformat() |
| record: Dict[str, Any] = { |
| "id": str(uuid.uuid4()), |
| "migration_id": migration_id, |
| "checksum": checksum, |
| "status": "active", |
| "created_at": now, |
| **extra, |
| } |
| saved = self._store.put(record) |
| logger.info("record_step: created %s", saved["id"]) |
| return saved |
|
|
| def get_step(self, record_id: str) -> Optional[Dict[str, Any]]: |
| """Retrieve a Step by its *record_id*.""" |
| record = self._store.get(record_id) |
| if record is None: |
| logger.debug("get_step: %s not found", record_id) |
| return record |
|
|
| def rollback_step( |
| self, record_id: str, **changes: Any |
| ) -> Dict[str, Any]: |
| """Apply *changes* to an existing Step.""" |
| record = self._store.get(record_id) |
| if record is None: |
| raise KeyError(f"Step {record_id!r} not found") |
| record.update(changes) |
| record["updated_at"] = datetime.now(timezone.utc).isoformat() |
| return self._store.put(record) |
|
|
| def plan_step(self, record_id: str) -> bool: |
| """Remove a Step; returns True on success.""" |
| if self._store.get(record_id) is None: |
| return False |
| self._store.delete(record_id) |
| logger.info("plan_step: removed %s", record_id) |
| return True |
|
|
| def list_steps( |
| self, |
| status: Optional[str] = None, |
| limit: int = 50, |
| offset: int = 0, |
| ) -> List[Dict[str, Any]]: |
| """Return paginated Step records.""" |
| query: Dict[str, Any] = {} |
| if status: |
| query["status"] = status |
| results = self._store.find(query, limit=limit, offset=offset) |
| logger.debug("list_steps: %d results", len(results)) |
| return results |
|
|
| def iter_steps( |
| self, batch_size: int = 100 |
| ) -> Iterator[Dict[str, Any]]: |
| """Yield all Step records in batches of *batch_size*.""" |
| offset = 0 |
| while True: |
| page = self.list_steps(limit=batch_size, offset=offset) |
| if not page: |
| break |
| yield from page |
| if len(page) < batch_size: |
| break |
| offset += batch_size |
| |