| import json |
| import sqlite3 |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| from core.config import AUDIT_DB, MODEL_VERSION |
|
|
| _SCHEMA = """ |
| CREATE TABLE IF NOT EXISTS audit_log ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| ts TEXT NOT NULL, |
| action TEXT NOT NULL, |
| actor TEXT NOT NULL, |
| model_version TEXT, |
| bidder_id TEXT, |
| criterion_id TEXT, |
| payload_json TEXT |
| ); |
| """ |
|
|
|
|
| def _conn() -> sqlite3.Connection: |
| Path(AUDIT_DB).parent.mkdir(parents=True, exist_ok=True) |
| conn = sqlite3.connect(AUDIT_DB) |
| conn.row_factory = sqlite3.Row |
| conn.execute(_SCHEMA) |
| conn.commit() |
| return conn |
|
|
|
|
| def log(action: str, actor: str = "system", **fields) -> int: |
| ts = datetime.now(timezone.utc).isoformat() |
| model_version = fields.pop("model_version", MODEL_VERSION) |
| bidder_id = fields.pop("bidder_id", None) |
| criterion_id = fields.pop("criterion_id", None) |
| payload_json = json.dumps(fields) if fields else None |
|
|
| conn = _conn() |
| cur = conn.execute( |
| "INSERT INTO audit_log (ts, action, actor, model_version, bidder_id, criterion_id, payload_json) " |
| "VALUES (?, ?, ?, ?, ?, ?, ?)", |
| (ts, action, actor, model_version, bidder_id, criterion_id, payload_json), |
| ) |
| conn.commit() |
| row_id = cur.lastrowid |
| conn.close() |
| return row_id |
|
|
|
|
| def clear() -> None: |
| conn = _conn() |
| conn.execute("DELETE FROM audit_log") |
| conn.execute("DELETE FROM sqlite_sequence WHERE name='audit_log'") |
| conn.commit() |
| conn.close() |
|
|
|
|
| def query(filters: dict | None = None) -> list[dict]: |
| conn = _conn() |
| sql = "SELECT * FROM audit_log" |
| params: list = [] |
| if filters: |
| clauses = [] |
| if "bidder_id" in filters: |
| clauses.append("bidder_id = ?") |
| params.append(filters["bidder_id"]) |
| if "action" in filters: |
| clauses.append("action = ?") |
| params.append(filters["action"]) |
| if "date_from" in filters: |
| clauses.append("ts >= ?") |
| params.append(filters["date_from"]) |
| if "date_to" in filters: |
| clauses.append("ts <= ?") |
| params.append(filters["date_to"]) |
| if clauses: |
| sql += " WHERE " + " AND ".join(clauses) |
| sql += " ORDER BY id DESC" |
| rows = conn.execute(sql, params).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|