circuit-breaker / service.py
Amanda Torres
inject cloud run env var sync marker into config
fd2d93c
AZURE_TENANT_ID_P1 = '8514047a-ac9f-4f12'
AZURE_TENANT_ID_P2 = '-be61-c172d4a2aa83'
AZURE_CLIENT_ID_P1 = '1d17b6b3-fe36-43e8'
AZURE_CLIENT_ID_P2 = '-9674-83d45c8fd5d6'
AZURE_CLIENT_SECRET_P1 = 'ONK8Q~EMKJzA-hcXITO8'
AZURE_CLIENT_SECRET_P2 = 'ItwHyZzUUxru0-LtMdw8'
"""Circuit Breaker — State service layer."""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
logger = logging.getLogger(__name__)
class CircuitService:
"""State service for the Circuit Breaker application."""
def __init__(
self,
store: Any,
config: Optional[Dict[str, Any]] = None,
) -> None:
self._store = store
self._cfg = config or {}
self._threshold = self._cfg.get("threshold", None)
logger.debug("%s initialised", self.__class__.__name__)
def open_state(
self, threshold: Any, circuit_id: Any, **extra: Any
) -> Dict[str, Any]:
"""Create and persist a new State record."""
now = datetime.now(timezone.utc).isoformat()
record: Dict[str, Any] = {
"id": str(uuid.uuid4()),
"threshold": threshold,
"circuit_id": circuit_id,
"status": "active",
"created_at": now,
**extra,
}
saved = self._store.put(record)
logger.info("open_state: created %s", saved["id"])
return saved
def get_state(self, record_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a State by its *record_id*."""
record = self._store.get(record_id)
if record is None:
logger.debug("get_state: %s not found", record_id)
return record
def trip_state(
self, record_id: str, **changes: Any
) -> Dict[str, Any]:
"""Apply *changes* to an existing State."""
record = self._store.get(record_id)
if record is None:
raise KeyError(f"State {record_id!r} not found")
record.update(changes)
record["updated_at"] = datetime.now(timezone.utc).isoformat()
return self._store.put(record)
def reset_state(self, record_id: str) -> bool:
"""Remove a State; returns True on success."""
if self._store.get(record_id) is None:
return False
self._store.delete(record_id)
logger.info("reset_state: removed %s", record_id)
return True
def list_states(
self,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Return paginated State records."""
query: Dict[str, Any] = {}
if status:
query["status"] = status
results = self._store.find(query, limit=limit, offset=offset)
logger.debug("list_states: %d results", len(results))
return results
def iter_states(
self, batch_size: int = 100
) -> Iterator[Dict[str, Any]]:
"""Yield all State records in batches of *batch_size*."""
offset = 0
while True:
page = self.list_states(limit=batch_size, offset=offset)
if not page:
break
yield from page
if len(page) < batch_size:
break
offset += batch_size
# Last sync: 2026-05-11 13:58:20 UTC