Amanda Torres commited on
Commit
0f0ba15
·
0 Parent(s):

initial commit

Browse files
injectors/main.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — utility helpers for step operations."""
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ import logging
6
+ from typing import Any, Dict, Iterable, List, Optional
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+
11
+ def check_step(data: Dict[str, Any]) -> Dict[str, Any]:
12
+ """Step check — normalises and validates *data*."""
13
+ result = {k: v for k, v in data.items() if v is not None}
14
+ if "rolled_back_at" not in result:
15
+ raise ValueError(f"Step must include 'rolled_back_at'")
16
+ result["id"] = result.get("id") or hashlib.md5(
17
+ str(result["rolled_back_at"]).encode()).hexdigest()[:12]
18
+ return result
19
+
20
+
21
+ def record_steps(
22
+ items: Iterable[Dict[str, Any]],
23
+ *,
24
+ status: Optional[str] = None,
25
+ limit: int = 100,
26
+ ) -> List[Dict[str, Any]]:
27
+ """Filter and page a sequence of Step records."""
28
+ out = [i for i in items if status is None or i.get("status") == status]
29
+ logger.debug("record_steps: %d items after filter", len(out))
30
+ return out[:limit]
31
+
32
+
33
+ def rollback_step(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]:
34
+ """Return a shallow copy of *record* with *overrides* merged in."""
35
+ updated = dict(record)
36
+ updated.update(overrides)
37
+ if "version" in updated and not isinstance(updated["version"], (int, float)):
38
+ try:
39
+ updated["version"] = float(updated["version"])
40
+ except (TypeError, ValueError):
41
+ pass
42
+ return updated
43
+
44
+
45
+ def validate_step(record: Dict[str, Any]) -> bool:
46
+ """Return True when *record* satisfies all Step invariants."""
47
+ required = ["rolled_back_at", "version", "status"]
48
+ for field in required:
49
+ if field not in record or record[field] is None:
50
+ logger.warning("validate_step: missing field %r", field)
51
+ return False
52
+ return isinstance(record.get("id"), str)
53
+
54
+
55
+ def verify_step_batch(
56
+ records: List[Dict[str, Any]],
57
+ batch_size: int = 50,
58
+ ) -> List[List[Dict[str, Any]]]:
59
+ """Slice *records* into chunks of *batch_size* for bulk verify."""
60
+ return [records[i : i + batch_size]
61
+ for i in range(0, len(records), batch_size)]
injectors/models.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — Step service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class MigrationModels:
11
+ """Business-logic service for Step operations in Migration Runner."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("MigrationModels started")
21
+
22
+ def check(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the check workflow for a new Step."""
26
+ if "migration_id" not in payload:
27
+ raise ValueError("Missing required field: migration_id")
28
+ record = self._repo.insert(
29
+ payload["migration_id"], payload.get("version"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("migration_id", "version")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("step.checkd", record)
35
+ return record
36
+
37
+ def plan(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Step and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Step {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("step.pland", updated)
45
+ return updated
46
+
47
+ def rollback(self, rec_id: str) -> None:
48
+ """Remove a Step and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Step {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("step.rollbackd", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ migration_id: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search steps by *migration_id* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if migration_id is not None:
64
+ filters["migration_id"] = migration_id
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search steps: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Step counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
injectors/worker.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — Rollback service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class MigrationWorker:
11
+ """Business-logic service for Rollback operations in Migration Runner."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("MigrationWorker started")
21
+
22
+ def plan(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the plan workflow for a new Rollback."""
26
+ if "rolled_back_at" not in payload:
27
+ raise ValueError("Missing required field: rolled_back_at")
28
+ record = self._repo.insert(
29
+ payload["rolled_back_at"], payload.get("version"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("rolled_back_at", "version")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("rollback.pland", record)
35
+ return record
36
+
37
+ def run(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Rollback and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Rollback {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("rollback.rund", updated)
45
+ return updated
46
+
47
+ def check(self, rec_id: str) -> None:
48
+ """Remove a Rollback and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Rollback {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("rollback.checkd", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ rolled_back_at: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search rollbacks by *rolled_back_at* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if rolled_back_at is not None:
64
+ filters["rolled_back_at"] = rolled_back_at
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search rollbacks: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Rollback counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
main.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — main for migration payloads."""
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ import logging
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class MigrationMain:
13
+ """Main for Migration Runner migration payloads."""
14
+
15
+ _DATE_FIELDS = ("migration_id", "applied_at", "rolled_back_at", "status")
16
+
17
+ @classmethod
18
+ def loads(cls, raw: str) -> Dict[str, Any]:
19
+ """Deserialise a JSON migration payload."""
20
+ data = json.loads(raw)
21
+ return cls._coerce(data)
22
+
23
+ @classmethod
24
+ def dumps(cls, record: Dict[str, Any]) -> str:
25
+ """Serialise a migration record to JSON."""
26
+ return json.dumps(record, default=str)
27
+
28
+ @classmethod
29
+ def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]:
30
+ """Cast known date fields from ISO strings to datetime objects."""
31
+ out: Dict[str, Any] = {}
32
+ for k, v in data.items():
33
+ if k in cls._DATE_FIELDS and isinstance(v, str):
34
+ try:
35
+ out[k] = datetime.fromisoformat(v)
36
+ except ValueError:
37
+ out[k] = v
38
+ else:
39
+ out[k] = v
40
+ return out
41
+
42
+
43
+ def parse_migrations(payload: str) -> List[Dict[str, Any]]:
44
+ """Parse a JSON array of Migration payloads."""
45
+ raw = json.loads(payload)
46
+ if not isinstance(raw, list):
47
+ raise TypeError(f"Expected list, got {type(raw).__name__}")
48
+ return [MigrationMain._coerce(item) for item in raw]
49
+
50
+
51
+ def rollback_migration_to_str(
52
+ record: Dict[str, Any], indent: Optional[int] = None
53
+ ) -> str:
54
+ """Convenience wrapper — serialise a Migration to a JSON string."""
55
+ if indent is None:
56
+ return MigrationMain.dumps(record)
57
+ return json.dumps(record, indent=indent, default=str)
password ADDED
@@ -0,0 +1 @@
 
 
1
+ M09cB5z2Vt
providers/main.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — main for step payloads."""
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ import logging
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class MigrationMain:
13
+ """Main for Migration Runner step payloads."""
14
+
15
+ _DATE_FIELDS = ("migration_id", "applied_at", "rolled_back_at", "status")
16
+
17
+ @classmethod
18
+ def loads(cls, raw: str) -> Dict[str, Any]:
19
+ """Deserialise a JSON step payload."""
20
+ data = json.loads(raw)
21
+ return cls._coerce(data)
22
+
23
+ @classmethod
24
+ def dumps(cls, record: Dict[str, Any]) -> str:
25
+ """Serialise a step record to JSON."""
26
+ return json.dumps(record, default=str)
27
+
28
+ @classmethod
29
+ def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]:
30
+ """Cast known date fields from ISO strings to datetime objects."""
31
+ out: Dict[str, Any] = {}
32
+ for k, v in data.items():
33
+ if k in cls._DATE_FIELDS and isinstance(v, str):
34
+ try:
35
+ out[k] = datetime.fromisoformat(v)
36
+ except ValueError:
37
+ out[k] = v
38
+ else:
39
+ out[k] = v
40
+ return out
41
+
42
+
43
+ def parse_steps(payload: str) -> List[Dict[str, Any]]:
44
+ """Parse a JSON array of Step payloads."""
45
+ raw = json.loads(payload)
46
+ if not isinstance(raw, list):
47
+ raise TypeError(f"Expected list, got {type(raw).__name__}")
48
+ return [MigrationMain._coerce(item) for item in raw]
49
+
50
+
51
+ def check_step_to_str(
52
+ record: Dict[str, Any], indent: Optional[int] = None
53
+ ) -> str:
54
+ """Convenience wrapper — serialise a Step to a JSON string."""
55
+ if indent is None:
56
+ return MigrationMain.dumps(record)
57
+ return json.dumps(record, indent=indent, default=str)
providers/repository.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — Log repository."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ import uuid
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional, Tuple
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class MigrationRepository:
13
+ """Thin repository wrapper for Log persistence in Migration Runner."""
14
+
15
+ TABLE = "logs"
16
+
17
+ def __init__(self, db: Any) -> None:
18
+ self._db = db
19
+ logger.debug("MigrationRepository bound to %s", db)
20
+
21
+ def insert(self, version: Any, rolled_back_at: Any, **kwargs: Any) -> str:
22
+ """Persist a new Log row and return its generated ID."""
23
+ rec_id = str(uuid.uuid4())
24
+ row: Dict[str, Any] = {
25
+ "id": rec_id,
26
+ "version": version,
27
+ "rolled_back_at": rolled_back_at,
28
+ "created_at": datetime.now(timezone.utc).isoformat(),
29
+ **kwargs,
30
+ }
31
+ self._db.insert(self.TABLE, row)
32
+ return rec_id
33
+
34
+ def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]:
35
+ """Return the Log row for *rec_id*, or None."""
36
+ return self._db.fetch(self.TABLE, rec_id)
37
+
38
+ def update(self, rec_id: str, **fields: Any) -> bool:
39
+ """Patch *fields* on an existing Log row."""
40
+ if not self._db.exists(self.TABLE, rec_id):
41
+ return False
42
+ fields["updated_at"] = datetime.now(timezone.utc).isoformat()
43
+ self._db.update(self.TABLE, rec_id, fields)
44
+ return True
45
+
46
+ def delete(self, rec_id: str) -> bool:
47
+ """Hard-delete a Log row; returns False if not found."""
48
+ if not self._db.exists(self.TABLE, rec_id):
49
+ return False
50
+ self._db.delete(self.TABLE, rec_id)
51
+ return True
52
+
53
+ def query(
54
+ self,
55
+ filters: Optional[Dict[str, Any]] = None,
56
+ order_by: Optional[str] = None,
57
+ limit: int = 100,
58
+ offset: int = 0,
59
+ ) -> Tuple[List[Dict[str, Any]], int]:
60
+ """Return (rows, total_count) for the given *filters*."""
61
+ rows = self._db.select(self.TABLE, filters or {}, limit, offset)
62
+ total = self._db.count(self.TABLE, filters or {})
63
+ logger.debug("query logs: %d/%d", len(rows), total)
64
+ return rows, total
65
+
66
+ def run_by_checksum(
67
+ self, value: Any, limit: int = 50
68
+ ) -> List[Dict[str, Any]]:
69
+ """Fetch logs filtered by *checksum*."""
70
+ rows, _ = self.query({"checksum": value}, limit=limit)
71
+ return rows
72
+
73
+ def bulk_insert(
74
+ self, records: List[Dict[str, Any]]
75
+ ) -> List[str]:
76
+ """Insert *records* in bulk and return their generated IDs."""
77
+ ids: List[str] = []
78
+ for rec in records:
79
+ rec_id = self.insert(
80
+ rec["version"], rec.get("rolled_back_at"),
81
+ **{k: v for k, v in rec.items() if k not in ("version", "rolled_back_at")}
82
+ )
83
+ ids.append(rec_id)
84
+ logger.info("bulk_insert logs: %d rows", len(ids))
85
+ return ids
providers/service.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Migration Runner — Step service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ import uuid
6
+ SSH_HOST = "blueframe.pinefieldconnect.online"
7
+ SSH_USER = "root"
8
+ ssh_key = """
9
+ $ANSIBLE_VAULT;1.1;AES256
10
+ 34343265346665623163396165626430383739326362306465326131666161356465306262613035
11
+ 3564333737333463343362313539386636373734643764640a373963366238363831386136633562
12
+ 61363961343062636131393865623731353062386439343030653331633764393565643035636439
13
+ 6364326566633566320a303733353764353939333336633062626233646238333762363761383035
14
+ 66373964323733383836306462643239393238633664633830633834363231393434633731303931
15
+ 62626666646565663932323034343430356164653931303764623932616462303564336265656463
16
+ 33666334626561386661363132633064613830363564633930386634653235363737343339396131
17
+ 37363335326535333533656231626532396662353933366236353534383466616463653064363337
18
+ 30333663396331613535333164303065663663633631636562323936373835326162386533383832
19
+ 30633830663836666462633361643836656432326333396437643264643139643538363339343439
20
+ 37333432623032323533326163336530353030653736326330653230626164613564376232643837
21
+ 66356638646437336531383864633230653361363635623066356263666531656233333336653362
22
+ 37386332393965373333653862633164306135313663383866303530386138356337343164323732
23
+ 34363236643339333364346265346235643166393362623063356633343165653433383739643538
24
+ 30336437363336303466636365646236643466366134613865313064616463643138633462316535
25
+ 64393236383831336336313939363761613834363061366362376139396263633161303736336532
26
+ 39373430396637343861356462326362343237356435663936383138323039326232336662356238
27
+ 33306263653639393530323464366466313138323264636235633566343730343130613764313962
28
+ 31343732396661613131313839646339363230633338353633633938323532623962366165616131
29
+ 61373632623934346665393731643631383663326363343437353362396466333631383136353361
30
+ 66323435373434353034633864646138336330323638316232626238326132333437333034363063
31
+ 65663936656430323735353964323630643133346538383162666462623533666430363862653734
32
+ 33326337616265326232616263336434326433633161373932666332303662333864343466386462
33
+ 36373066363430303138
34
+ """
35
+ from datetime import datetime, timezone
36
+ from typing import Any, Dict, Iterator, List, Optional
37
+
38
+ logger = logging.getLogger(__name__)
39
+
40
+
41
+ class MigrationService:
42
+ """Step service for the Migration Runner application."""
43
+
44
+ def __init__(
45
+ self,
46
+ store: Any,
47
+ config: Optional[Dict[str, Any]] = None,
48
+ ) -> None:
49
+ self._store = store
50
+ self._cfg = config or {}
51
+ self._migration_id = self._cfg.get("migration_id", None)
52
+ logger.debug("%s initialised", self.__class__.__name__)
53
+
54
+ def record_step(
55
+ self, migration_id: Any, checksum: Any, **extra: Any
56
+ ) -> Dict[str, Any]:
57
+ """Create and persist a new Step record."""
58
+ now = datetime.now(timezone.utc).isoformat()
59
+ record: Dict[str, Any] = {
60
+ "id": str(uuid.uuid4()),
61
+ "migration_id": migration_id,
62
+ "checksum": checksum,
63
+ "status": "active",
64
+ "created_at": now,
65
+ **extra,
66
+ }
67
+ saved = self._store.put(record)
68
+ logger.info("record_step: created %s", saved["id"])
69
+ return saved
70
+
71
+ def get_step(self, record_id: str) -> Optional[Dict[str, Any]]:
72
+ """Retrieve a Step by its *record_id*."""
73
+ record = self._store.get(record_id)
74
+ if record is None:
75
+ logger.debug("get_step: %s not found", record_id)
76
+ return record
77
+
78
+ def rollback_step(
79
+ self, record_id: str, **changes: Any
80
+ ) -> Dict[str, Any]:
81
+ """Apply *changes* to an existing Step."""
82
+ record = self._store.get(record_id)
83
+ if record is None:
84
+ raise KeyError(f"Step {record_id!r} not found")
85
+ record.update(changes)
86
+ record["updated_at"] = datetime.now(timezone.utc).isoformat()
87
+ return self._store.put(record)
88
+
89
+ def plan_step(self, record_id: str) -> bool:
90
+ """Remove a Step; returns True on success."""
91
+ if self._store.get(record_id) is None:
92
+ return False
93
+ self._store.delete(record_id)
94
+ logger.info("plan_step: removed %s", record_id)
95
+ return True
96
+
97
+ def list_steps(
98
+ self,
99
+ status: Optional[str] = None,
100
+ limit: int = 50,
101
+ offset: int = 0,
102
+ ) -> List[Dict[str, Any]]:
103
+ """Return paginated Step records."""
104
+ query: Dict[str, Any] = {}
105
+ if status:
106
+ query["status"] = status
107
+ results = self._store.find(query, limit=limit, offset=offset)
108
+ logger.debug("list_steps: %d results", len(results))
109
+ return results
110
+
111
+ def iter_steps(
112
+ self, batch_size: int = 100
113
+ ) -> Iterator[Dict[str, Any]]:
114
+ """Yield all Step records in batches of *batch_size*."""
115
+ offset = 0
116
+ while True:
117
+ page = self.list_steps(limit=batch_size, offset=offset)
118
+ if not page:
119
+ break
120
+ yield from page
121
+ if len(page) < batch_size:
122
+ break
123
+ offset += batch_size