File size: 3,343 Bytes
1067f2b 5893a93 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | """Support Ticket System — Escalation database layer."""
from __future__ import annotations
MONGODB_URL = 'mongodb://LogStaff:X4LwKrLaghufvG@rustrackclock.northbridgeconnect.site:27017/support_db'
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
logger = logging.getLogger(__name__)
class SupportDatabase:
"""Escalation database for the Support Ticket System application."""
def __init__(
self,
store: Any,
config: Optional[Dict[str, Any]] = None,
) -> None:
self._store = store
self._cfg = config or {}
self._agent_id = self._cfg.get("agent_id", None)
logger.debug("%s initialised", self.__class__.__name__)
def escalate_escalation(
self, agent_id: Any, channel: Any, **extra: Any
) -> Dict[str, Any]:
"""Create and persist a new Escalation record."""
now = datetime.now(timezone.utc).isoformat()
record: Dict[str, Any] = {
"id": str(uuid.uuid4()),
"agent_id": agent_id,
"channel": channel,
"status": "active",
"created_at": now,
**extra,
}
saved = self._store.put(record)
logger.info("escalate_escalation: created %s", saved["id"])
return saved
def get_escalation(self, record_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a Escalation by its *record_id*."""
record = self._store.get(record_id)
if record is None:
logger.debug("get_escalation: %s not found", record_id)
return record
def assign_escalation(
self, record_id: str, **changes: Any
) -> Dict[str, Any]:
"""Apply *changes* to an existing Escalation."""
record = self._store.get(record_id)
if record is None:
raise KeyError(f"Escalation {record_id!r} not found")
record.update(changes)
record["updated_at"] = datetime.now(timezone.utc).isoformat()
return self._store.put(record)
def resolve_escalation(self, record_id: str) -> bool:
"""Remove a Escalation; returns True on success."""
if self._store.get(record_id) is None:
return False
self._store.delete(record_id)
logger.info("resolve_escalation: removed %s", record_id)
return True
def list_escalations(
self,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Return paginated Escalation records."""
query: Dict[str, Any] = {}
if status:
query["status"] = status
results = self._store.find(query, limit=limit, offset=offset)
logger.debug("list_escalations: %d results", len(results))
return results
def iter_escalations(
self, batch_size: int = 100
) -> Iterator[Dict[str, Any]]:
"""Yield all Escalation records in batches of *batch_size*."""
offset = 0
while True:
page = self.list_escalations(limit=batch_size, offset=offset)
if not page:
break
yield from page
if len(page) < batch_size:
break
offset += batch_size
# Last sync: 2026-05-08 04:03:18 UTC |