File size: 2,374 Bytes
61e2cc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
661eb14
61e2cc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
661eb14
 
b14fc84
 
 
 
 
 
 
 
661eb14
61e2cc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

from core.config import AUDIT_DB, MODEL_VERSION

_SCHEMA = """
CREATE TABLE IF NOT EXISTS audit_log (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ts TEXT NOT NULL,
    action TEXT NOT NULL,
    actor TEXT NOT NULL,
    model_version TEXT,
    bidder_id TEXT,
    criterion_id TEXT,
    payload_json TEXT
);
"""


def _conn() -> sqlite3.Connection:
    Path(AUDIT_DB).parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(AUDIT_DB)
    conn.row_factory = sqlite3.Row
    conn.execute(_SCHEMA)
    conn.commit()
    return conn


def log(action: str, actor: str = "system", **fields) -> int:
    ts = datetime.now(timezone.utc).isoformat()
    model_version = fields.pop("model_version", MODEL_VERSION)
    bidder_id = fields.pop("bidder_id", None)
    criterion_id = fields.pop("criterion_id", None)
    payload_json = json.dumps(fields) if fields else None

    conn = _conn()
    cur = conn.execute(
        "INSERT INTO audit_log (ts, action, actor, model_version, bidder_id, criterion_id, payload_json) "
        "VALUES (?, ?, ?, ?, ?, ?, ?)",
        (ts, action, actor, model_version, bidder_id, criterion_id, payload_json),
    )
    conn.commit()
    row_id = cur.lastrowid
    conn.close()
    return row_id


def clear() -> None:
    conn = _conn()
    conn.execute("DELETE FROM audit_log")
    conn.execute("DELETE FROM sqlite_sequence WHERE name='audit_log'")
    conn.commit()
    conn.close()


def query(filters: dict | None = None) -> list[dict]:
    conn = _conn()
    sql = "SELECT * FROM audit_log"
    params: list = []
    if filters:
        clauses = []
        if "bidder_id" in filters:
            clauses.append("bidder_id = ?")
            params.append(filters["bidder_id"])
        if "action" in filters:
            clauses.append("action = ?")
            params.append(filters["action"])
        if "date_from" in filters:
            clauses.append("ts >= ?")
            params.append(filters["date_from"])
        if "date_to" in filters:
            clauses.append("ts <= ?")
            params.append(filters["date_to"])
        if clauses:
            sql += " WHERE " + " AND ".join(clauses)
    sql += " ORDER BY id DESC"
    rows = conn.execute(sql, params).fetchall()
    conn.close()
    return [dict(r) for r in rows]