File size: 4,150 Bytes
d3a7520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""存储层:数据目录与 SQLite 封装。"""

from __future__ import annotations

import os
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

# 注意:日志仍然复用主网关 logger 名称,方便统一过滤
import logging

logger = logging.getLogger("gateway")


# ── 路径与基本配置 ─────────────────────────────────────────────────────────────
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
JOB_DIR = DATA_DIR / "jobs"
DB_PATH = DATA_DIR / "gateway.db"


_db_lock = threading.Lock()
_db_conn: sqlite3.Connection | None = None


def now_iso() -> str:
    """返回当前 UTC 时间的 ISO 字符串。"""
    return datetime.now(timezone.utc).isoformat()


def ensure_data_dirs() -> None:
    """确保数据目录存在。"""
    UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
    JOB_DIR.mkdir(parents=True, exist_ok=True)


def init_db() -> None:
    """初始化 SQLite 数据库与基础表结构。"""
    global _db_conn

    ensure_data_dirs()
    conn = sqlite3.connect(DB_PATH, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    with conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS jobs (
                id TEXT PRIMARY KEY,
                username TEXT NOT NULL,
                filename TEXT NOT NULL,
                input_path TEXT NOT NULL,
                output_dir TEXT NOT NULL,
                status TEXT NOT NULL,
                progress REAL NOT NULL DEFAULT 0,
                message TEXT,
                error TEXT,
                model TEXT NOT NULL,
                lang_in TEXT NOT NULL,
                lang_out TEXT NOT NULL,
                cancel_requested INTEGER NOT NULL DEFAULT 0,
                mono_pdf_path TEXT,
                dual_pdf_path TEXT,
                glossary_path TEXT,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL,
                started_at TEXT,
                finished_at TEXT
            )
            """
        )
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS usage_records (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT NOT NULL,
                job_id TEXT,
                model TEXT NOT NULL,
                prompt_tokens INTEGER NOT NULL,
                completion_tokens INTEGER NOT NULL,
                total_tokens INTEGER NOT NULL,
                cost_usd REAL NOT NULL,
                created_at TEXT NOT NULL
            )
            """
        )
        conn.execute(
            """
            CREATE INDEX IF NOT EXISTS idx_jobs_user_time
            ON jobs(username, created_at DESC)
            """
        )
        conn.execute(
            """
            CREATE INDEX IF NOT EXISTS idx_usage_user_time
            ON usage_records(username, created_at DESC)
            """
        )

    _db_conn = conn
    logger.info("Database initialized at %s", DB_PATH)


def close_db() -> None:
    """关闭数据库连接,用于应用关闭阶段。"""
    global _db_conn

    if _db_conn is not None:
        _db_conn.close()
        _db_conn = None


def db_execute(sql: str, params: tuple[Any, ...] = ()) -> None:
    """执行写操作 SQL。"""
    if _db_conn is None:
        raise RuntimeError("DB is not initialized")
    with _db_lock, _db_conn:
        _db_conn.execute(sql, params)


def db_fetchone(sql: str, params: tuple[Any, ...] = ()) -> sqlite3.Row | None:
    """执行查询并返回单行。"""
    if _db_conn is None:
        raise RuntimeError("DB is not initialized")
    with _db_lock:
        return _db_conn.execute(sql, params).fetchone()


def db_fetchall(sql: str, params: tuple[Any, ...] = ()) -> list[sqlite3.Row]:
    """执行查询并返回多行。"""
    if _db_conn is None:
        raise RuntimeError("DB is not initialized")
    with _db_lock:
        return _db_conn.execute(sql, params).fetchall()