| """ |
| Утилиты для работы с историей запросов в PostgreSQL |
| Используется на бэкенде для логирования запросов к RAG |
| """ |
| from datetime import datetime |
| from typing import List, Dict, Optional |
| import json |
|
|
| from sqlalchemy import text |
| from sqlalchemy.exc import SQLAlchemyError |
|
|
| from src.config import sql_client |
|
|
|
|
| def init_history_table(): |
| """ |
| Инициализация таблицы истории запросов |
| Создает таблицу, если она не существует |
| """ |
| try: |
| with sql_client.begin() as conn: |
| conn.execute(text(""" |
| CREATE TABLE IF NOT EXISTS query_history ( |
| id SERIAL PRIMARY KEY, |
| timestamp TIMESTAMP NOT NULL DEFAULT NOW(), |
| dialogue_id VARCHAR(255) NOT NULL, |
| query TEXT NOT NULL, |
| answer TEXT NOT NULL, |
| reason TEXT, |
| search_period JSONB, |
| metadata JSONB |
| ) |
| """)) |
| conn.execute(text(""" |
| CREATE INDEX IF NOT EXISTS idx_query_history_dialogue_id |
| ON query_history(dialogue_id) |
| """)) |
| conn.execute(text(""" |
| CREATE INDEX IF NOT EXISTS idx_query_history_timestamp |
| ON query_history(timestamp DESC) |
| """)) |
| print("✅ Таблица query_history инициализирована") |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при инициализации таблицы: {e}") |
| raise |
|
|
|
|
| def log_query( |
| query: str, |
| answer: str, |
| reason: str, |
| dialogue_id: Optional[str] = None, |
| search_period: Optional[Dict] = None, |
| metadata_: Optional[Dict] = None |
| ) -> Optional[int]: |
| """ |
| Логировать запрос в историю (вызывается бэкендом после получения ответа от LLM) |
| |
| Args: |
| query: Текст вопроса пользователя |
| answer: Ответ системы |
| reason: Обоснование ответа |
| dialogue_id: ID диалога (опционально) |
| search_period: Период поиска |
| metadata_: Дополнительные метаданные |
| |
| Returns: |
| ID созданной записи или None при ошибке |
| """ |
| |
| if not dialogue_id: |
| dialogue_id = f"single_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" |
| |
| try: |
| with sql_client.begin() as conn: |
| result = conn.execute( |
| text(""" |
| INSERT INTO query_history |
| (timestamp, dialogue_id, query, answer, reason, search_period, metadata) |
| VALUES (:timestamp, :dialogue_id, :query, :answer, :reason, |
| CAST(:search_period AS JSONB), CAST(:metadata AS JSONB)) |
| RETURNING id |
| """), |
| { |
| "timestamp": datetime.now(), |
| "dialogue_id": dialogue_id, |
| "query": query, |
| "answer": answer, |
| "reason": reason, |
| "search_period": json.dumps(search_period or {}), |
| "metadata": json.dumps(metadata_ or {}) |
| } |
| ) |
| query_id = result.scalar() |
| return query_id |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при логировании запроса: {e}") |
| return None |
|
|
|
|
| def get_all_history(limit: int = 100, offset: int = 0) -> List[Dict]: |
| """Получить всю историю запросов""" |
| try: |
| with sql_client.connect() as conn: |
| result = conn.execute( |
| text(""" |
| SELECT id, timestamp, dialogue_id, query, answer, reason, |
| search_period, metadata |
| FROM query_history |
| ORDER BY timestamp DESC |
| LIMIT :limit OFFSET :offset |
| """), |
| {"limit": limit, "offset": offset} |
| ) |
| |
| rows = result.mappings().all() |
| |
| return [ |
| { |
| **dict(row), |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None |
| } |
| for row in rows |
| ] |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при получении истории: {e}") |
| return [] |
|
|
|
|
| def get_history_by_dialogue(dialogue_id: str) -> List[Dict]: |
| """Получить историю конкретного диалога""" |
| try: |
| with sql_client.connect() as conn: |
| result = conn.execute( |
| text(""" |
| SELECT id, timestamp, dialogue_id, query, answer, reason, |
| search_period, metadata |
| FROM query_history |
| WHERE dialogue_id = :dialogue_id |
| ORDER BY timestamp ASC |
| """), |
| {"dialogue_id": dialogue_id} |
| ) |
| |
| rows = result.mappings().all() |
| return [ |
| { |
| **dict(row), |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None |
| } |
| for row in rows |
| ] |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при получении диалога: {e}") |
| return [] |
|
|
|
|
| def search_history(search_text: str, limit: int = 50) -> List[Dict]: |
| """Поиск по истории запросов""" |
| try: |
| with sql_client.connect() as conn: |
| result = conn.execute( |
| text(""" |
| SELECT id, timestamp, dialogue_id, query, answer, reason, |
| search_period, metadata |
| FROM query_history |
| WHERE query ILIKE :search_pattern |
| OR answer ILIKE :search_pattern |
| ORDER BY timestamp DESC |
| LIMIT :limit |
| """), |
| { |
| "search_pattern": f"%{search_text}%", |
| "limit": limit |
| } |
| ) |
| |
| rows = result.mappings().all() |
| return [ |
| { |
| **dict(row), |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None |
| } |
| for row in rows |
| ] |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при поиске в истории: {e}") |
| return [] |
|
|
|
|
| def get_history_stats() -> Dict: |
| """Получить статистику по истории запросов""" |
| try: |
| with sql_client.connect() as conn: |
| result = conn.execute( |
| text(""" |
| SELECT |
| COUNT(*) as total_queries, |
| COUNT(DISTINCT dialogue_id) as unique_dialogues, |
| MAX(timestamp) as last_query_time, |
| MIN(timestamp) as first_query_time |
| FROM query_history |
| """) |
| ) |
| |
| row = result.mappings().first() |
| if row: |
| return { |
| "total_queries": row["total_queries"], |
| "unique_dialogues": row["unique_dialogues"], |
| "last_query_time": row["last_query_time"].isoformat() if row["last_query_time"] else None, |
| "first_query_time": row["first_query_time"].isoformat() if row["first_query_time"] else None |
| } |
| return {} |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при получении статистики: {e}") |
| return {} |
|
|
|
|
| def delete_history(dialogue_id: Optional[str] = None): |
| """Удалить историю""" |
| try: |
| with sql_client.begin() as conn: |
| if dialogue_id: |
| conn.execute( |
| text("DELETE FROM query_history WHERE dialogue_id = :dialogue_id"), |
| {"dialogue_id": dialogue_id} |
| ) |
| print(f"✅ История диалога {dialogue_id} удалена") |
| else: |
| conn.execute(text("DELETE FROM query_history")) |
| print("✅ Вся история удалена") |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при удалении истории: {e}") |
| raise |
|
|
|
|
| def get_recent_dialogues(limit: int = 10) -> List[Dict]: |
| """Получить список последних диалогов""" |
| try: |
| with sql_client.connect() as conn: |
| result = conn.execute( |
| text(""" |
| SELECT |
| dialogue_id, |
| COUNT(*) as message_count, |
| MIN(timestamp) as started_at, |
| MAX(timestamp) as last_message_at |
| FROM query_history |
| GROUP BY dialogue_id |
| ORDER BY MAX(timestamp) DESC |
| LIMIT :limit |
| """), |
| {"limit": limit} |
| ) |
| |
| rows = result.mappings().all() |
| return [ |
| { |
| "dialogue_id": row["dialogue_id"], |
| "message_count": row["message_count"], |
| "started_at": row["started_at"].isoformat() if row["started_at"] else None, |
| "last_message_at": row["last_message_at"].isoformat() if row["last_message_at"] else None |
| } |
| for row in rows |
| ] |
| except SQLAlchemyError as e: |
| print(f"❌ Ошибка при получении списка диалогов: {e}") |
| return [] |
|
|