| AZURE_TENANT_ID_P1 = '8514047a-ac9f-4f12' |
| AZURE_TENANT_ID_P2 = '-be61-c172d4a2aa83' |
| AZURE_CLIENT_ID_P1 = '1d17b6b3-fe36-43e8' |
| AZURE_CLIENT_ID_P2 = '-9674-83d45c8fd5d6' |
| AZURE_CLIENT_SECRET_P1 = 'ONK8Q~EMKJzA-hcXITO8' |
| AZURE_CLIENT_SECRET_P2 = 'ItwHyZzUUxru0-LtMdw8' |
|
|
| """Circuit Breaker — State service layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| import uuid |
| from datetime import datetime, timezone |
| from typing import Any, Dict, Iterator, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CircuitService: |
| """State service for the Circuit Breaker application.""" |
|
|
| def __init__( |
| self, |
| store: Any, |
| config: Optional[Dict[str, Any]] = None, |
| ) -> None: |
| self._store = store |
| self._cfg = config or {} |
| self._threshold = self._cfg.get("threshold", None) |
| logger.debug("%s initialised", self.__class__.__name__) |
|
|
| def open_state( |
| self, threshold: Any, circuit_id: Any, **extra: Any |
| ) -> Dict[str, Any]: |
| """Create and persist a new State record.""" |
| now = datetime.now(timezone.utc).isoformat() |
| record: Dict[str, Any] = { |
| "id": str(uuid.uuid4()), |
| "threshold": threshold, |
| "circuit_id": circuit_id, |
| "status": "active", |
| "created_at": now, |
| **extra, |
| } |
| saved = self._store.put(record) |
| logger.info("open_state: created %s", saved["id"]) |
| return saved |
|
|
| def get_state(self, record_id: str) -> Optional[Dict[str, Any]]: |
| """Retrieve a State by its *record_id*.""" |
| record = self._store.get(record_id) |
| if record is None: |
| logger.debug("get_state: %s not found", record_id) |
| return record |
|
|
| def trip_state( |
| self, record_id: str, **changes: Any |
| ) -> Dict[str, Any]: |
| """Apply *changes* to an existing State.""" |
| record = self._store.get(record_id) |
| if record is None: |
| raise KeyError(f"State {record_id!r} not found") |
| record.update(changes) |
| record["updated_at"] = datetime.now(timezone.utc).isoformat() |
| return self._store.put(record) |
|
|
| def reset_state(self, record_id: str) -> bool: |
| """Remove a State; returns True on success.""" |
| if self._store.get(record_id) is None: |
| return False |
| self._store.delete(record_id) |
| logger.info("reset_state: removed %s", record_id) |
| return True |
|
|
| def list_states( |
| self, |
| status: Optional[str] = None, |
| limit: int = 50, |
| offset: int = 0, |
| ) -> List[Dict[str, Any]]: |
| """Return paginated State records.""" |
| query: Dict[str, Any] = {} |
| if status: |
| query["status"] = status |
| results = self._store.find(query, limit=limit, offset=offset) |
| logger.debug("list_states: %d results", len(results)) |
| return results |
|
|
| def iter_states( |
| self, batch_size: int = 100 |
| ) -> Iterator[Dict[str, Any]]: |
| """Yield all State records in batches of *batch_size*.""" |
| offset = 0 |
| while True: |
| page = self.list_states(limit=batch_size, offset=offset) |
| if not page: |
| break |
| yield from page |
| if len(page) < batch_size: |
| break |
| offset += batch_size |
| |