File size: 4,150 Bytes
d3a7520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | """存储层:数据目录与 SQLite 封装。"""
from __future__ import annotations
import os
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# 注意:日志仍然复用主网关 logger 名称,方便统一过滤
import logging
logger = logging.getLogger("gateway")
# ── 路径与基本配置 ─────────────────────────────────────────────────────────────
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
JOB_DIR = DATA_DIR / "jobs"
DB_PATH = DATA_DIR / "gateway.db"
_db_lock = threading.Lock()
_db_conn: sqlite3.Connection | None = None
def now_iso() -> str:
"""返回当前 UTC 时间的 ISO 字符串。"""
return datetime.now(timezone.utc).isoformat()
def ensure_data_dirs() -> None:
"""确保数据目录存在。"""
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
JOB_DIR.mkdir(parents=True, exist_ok=True)
def init_db() -> None:
"""初始化 SQLite 数据库与基础表结构。"""
global _db_conn
ensure_data_dirs()
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
with conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
filename TEXT NOT NULL,
input_path TEXT NOT NULL,
output_dir TEXT NOT NULL,
status TEXT NOT NULL,
progress REAL NOT NULL DEFAULT 0,
message TEXT,
error TEXT,
model TEXT NOT NULL,
lang_in TEXT NOT NULL,
lang_out TEXT NOT NULL,
cancel_requested INTEGER NOT NULL DEFAULT 0,
mono_pdf_path TEXT,
dual_pdf_path TEXT,
glossary_path TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS usage_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
job_id TEXT,
model TEXT NOT NULL,
prompt_tokens INTEGER NOT NULL,
completion_tokens INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
cost_usd REAL NOT NULL,
created_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_jobs_user_time
ON jobs(username, created_at DESC)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_usage_user_time
ON usage_records(username, created_at DESC)
"""
)
_db_conn = conn
logger.info("Database initialized at %s", DB_PATH)
def close_db() -> None:
"""关闭数据库连接,用于应用关闭阶段。"""
global _db_conn
if _db_conn is not None:
_db_conn.close()
_db_conn = None
def db_execute(sql: str, params: tuple[Any, ...] = ()) -> None:
"""执行写操作 SQL。"""
if _db_conn is None:
raise RuntimeError("DB is not initialized")
with _db_lock, _db_conn:
_db_conn.execute(sql, params)
def db_fetchone(sql: str, params: tuple[Any, ...] = ()) -> sqlite3.Row | None:
"""执行查询并返回单行。"""
if _db_conn is None:
raise RuntimeError("DB is not initialized")
with _db_lock:
return _db_conn.execute(sql, params).fetchone()
def db_fetchall(sql: str, params: tuple[Any, ...] = ()) -> list[sqlite3.Row]:
"""执行查询并返回多行。"""
if _db_conn is None:
raise RuntimeError("DB is not initialized")
with _db_lock:
return _db_conn.execute(sql, params).fetchall()
|