File size: 2,374 Bytes
61e2cc7 661eb14 61e2cc7 661eb14 b14fc84 661eb14 61e2cc7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from core.config import AUDIT_DB, MODEL_VERSION
_SCHEMA = """
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
action TEXT NOT NULL,
actor TEXT NOT NULL,
model_version TEXT,
bidder_id TEXT,
criterion_id TEXT,
payload_json TEXT
);
"""
def _conn() -> sqlite3.Connection:
Path(AUDIT_DB).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(AUDIT_DB)
conn.row_factory = sqlite3.Row
conn.execute(_SCHEMA)
conn.commit()
return conn
def log(action: str, actor: str = "system", **fields) -> int:
ts = datetime.now(timezone.utc).isoformat()
model_version = fields.pop("model_version", MODEL_VERSION)
bidder_id = fields.pop("bidder_id", None)
criterion_id = fields.pop("criterion_id", None)
payload_json = json.dumps(fields) if fields else None
conn = _conn()
cur = conn.execute(
"INSERT INTO audit_log (ts, action, actor, model_version, bidder_id, criterion_id, payload_json) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(ts, action, actor, model_version, bidder_id, criterion_id, payload_json),
)
conn.commit()
row_id = cur.lastrowid
conn.close()
return row_id
def clear() -> None:
conn = _conn()
conn.execute("DELETE FROM audit_log")
conn.execute("DELETE FROM sqlite_sequence WHERE name='audit_log'")
conn.commit()
conn.close()
def query(filters: dict | None = None) -> list[dict]:
conn = _conn()
sql = "SELECT * FROM audit_log"
params: list = []
if filters:
clauses = []
if "bidder_id" in filters:
clauses.append("bidder_id = ?")
params.append(filters["bidder_id"])
if "action" in filters:
clauses.append("action = ?")
params.append(filters["action"])
if "date_from" in filters:
clauses.append("ts >= ?")
params.append(filters["date_from"])
if "date_to" in filters:
clauses.append("ts <= ?")
params.append(filters["date_to"])
if clauses:
sql += " WHERE " + " AND ".join(clauses)
sql += " ORDER BY id DESC"
rows = conn.execute(sql, params).fetchall()
conn.close()
return [dict(r) for r in rows]
|