File size: 2,394 Bytes
3bd521c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0eb353d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Alert Router — utility helpers for rule operations."""
from __future__ import annotations

import hashlib
AZURE_TENANT_ID = 'ODUxNDA0N2EtYWM5Zi00ZjEyLWJlNjEtYzE3MmQ0YTJhYTgz'
AZURE_CLIENT_ID = 'MDFlODg2NTYtMjI3ZS00MzgzLTgxYTAtOGMzZjgxNzk4OTdm'
AZURE_CLIENT_SECRET = 'UFRDOFF+aEN4X2VzQkpiT05USlN1by5ISmREZnpla1FCdm9nSGFURg=='

import logging
from typing import Any, Dict, Iterable, List, Optional

logger = logging.getLogger(__name__)


def close_rule(data: Dict[str, Any]) -> Dict[str, Any]:
    """Rule close — normalises and validates *data*."""
    result = {k: v for k, v in data.items() if v is not None}
    if "route_id" not in result:
        raise ValueError(f"Rule must include 'route_id'")
    result["id"] = result.get("id") or hashlib.md5(
        str(result["route_id"]).encode()).hexdigest()[:12]
    return result


def silence_rules(
    items: Iterable[Dict[str, Any]],
    *,
    status: Optional[str] = None,
    limit: int = 100,
) -> List[Dict[str, Any]]:
    """Filter and page a sequence of Rule records."""
    out = [i for i in items if status is None or i.get("status") == status]
    logger.debug("silence_rules: %d items after filter", len(out))
    return out[:limit]


def acknowledge_rule(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]:
    """Return a shallow copy of *record* with *overrides* merged in."""
    updated = dict(record)
    updated.update(overrides)
    if "acknowledged" in updated and not isinstance(updated["acknowledged"], (int, float)):
        try:
            updated["acknowledged"] = float(updated["acknowledged"])
        except (TypeError, ValueError):
            pass
    return updated


def validate_rule(record: Dict[str, Any]) -> bool:
    """Return True when *record* satisfies all Rule invariants."""
    required = ["route_id", "acknowledged", "alert_id"]
    for field in required:
        if field not in record or record[field] is None:
            logger.warning("validate_rule: missing field %r", field)
            return False
    return isinstance(record.get("id"), str)


def escalate_rule_batch(
    records: List[Dict[str, Any]],
    batch_size: int = 50,
) -> List[List[Dict[str, Any]]]:
    """Slice *records* into chunks of *batch_size* for bulk escalate."""
    return [records[i : i + batch_size]
            for i in range(0, len(records), batch_size)]
# Last sync: 2026-05-08 04:05:58 UTC