| import os
|
| import io
|
|
|
| import sqlite3
|
| import zipfile
|
|
|
| from pathlib import Path
|
|
|
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException
|
| from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
| from storage.common import validate_token
|
|
|
| router = APIRouter(prefix="/db", tags=["Database Manager"])
|
| HF_TOKEN = os.getenv("HF_TOKEN")
|
| DB_PATH = Path("/data/db")
|
|
|
| @router.get("/download_all_db_files", tags=["Database Manager"])
|
| async def download_all_db_files(
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| Download all .db files from /data/db as a ZIP archive.
|
|
|
| Steps:
|
| - Validate the token.
|
| - Verify that /data/db exists and contains .db files.
|
| - Create an in-memory ZIP containing all .db files.
|
| - Return the ZIP as a downloadable file.
|
| """
|
| validate_token(token)
|
|
|
| if not DB_PATH.exists():
|
| raise HTTPException(status_code=404, detail="DB folder does not exist")
|
|
|
| db_files = list(DB_PATH.glob("*.db"))
|
| if not db_files:
|
| raise HTTPException(status_code=404, detail="No .db files found")
|
|
|
|
|
| zip_buffer = io.BytesIO()
|
|
|
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
|
| for file_path in db_files:
|
| zipf.write(file_path, arcname=file_path.name)
|
|
|
| zip_buffer.seek(0)
|
|
|
| return StreamingResponse(
|
| zip_buffer,
|
| media_type="application/zip",
|
| headers={
|
| "Content-Disposition": 'attachment; filename="db_files.zip"'
|
| }
|
| )
|
|
|
|
|
| @router.post("/upload_db_file", tags=["Database Manager"])
|
| async def upload_db_file(
|
| file: UploadFile = File(...),
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| Upload a file to the /data/db folder.
|
|
|
| Steps:
|
| - Validate the token.
|
| - Ensure /data/db exists (create it if missing).
|
| - Save the uploaded file inside /data/db using its original filename.
|
| - Return a JSON response confirming the upload.
|
| """
|
| validate_token(token)
|
|
|
|
|
| DB_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
| final_path = DB_PATH / file.filename
|
|
|
| try:
|
|
|
| file_bytes = await file.read()
|
| with open(final_path, "wb") as f:
|
| f.write(file_bytes)
|
| except Exception as exc:
|
| raise HTTPException(status_code=500, detail=f"Failed to save file: {exc}")
|
|
|
| return JSONResponse(
|
| status_code=200,
|
| content={
|
| "status": "ok",
|
| "saved_to": str(final_path)
|
| }
|
| )
|
|
|
| @router.post("/execute_query", tags=["Database Manager"])
|
| async def execute_query(
|
| db_filename: str = Query(..., description="SQLite .db file name"),
|
| query: str = Query(..., description="SQL query to execute"),
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| Execute a SQL query against a specified SQLite database file in /data/db.
|
|
|
| Steps:
|
| - Validate the token.
|
| - Ensure the requested .db file exists inside /data/db.
|
| - Connect to the database using sqlite3.
|
| - Execute the provided query.
|
| - Return the results as a list of dictionaries (column: value).
|
| - Capture and return errors if query execution fails.
|
| """
|
| validate_token(token)
|
|
|
| db_file_path = DB_PATH / db_filename
|
|
|
| if not db_file_path.exists() or not db_file_path.is_file():
|
| raise HTTPException(status_code=404, detail=f"Database file {db_filename} not found")
|
|
|
| try:
|
| conn = sqlite3.connect(db_file_path)
|
| conn.row_factory = sqlite3.Row
|
| cur = conn.cursor()
|
|
|
| cur.execute(query)
|
| rows = cur.fetchall()
|
|
|
|
|
| results = [dict(row) for row in rows]
|
|
|
| conn.commit()
|
| conn.close()
|
| except sqlite3.Error as e:
|
| raise HTTPException(status_code=400, detail=f"SQL error: {e}")
|
| except Exception as e:
|
| raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
|
|
|
| return JSONResponse(content={"results": results}) |