| """ |
| SPARKNET Backend Client |
| |
| Client for connecting Streamlit Cloud to the GPU backend server (Lytos). |
| Handles all API communication with the FastAPI backend. |
| """ |
|
|
| import httpx |
| import streamlit as st |
| from typing import Optional, Dict, Any, List, Tuple |
| from dataclasses import dataclass |
| import os |
|
|
|
|
| def get_backend_url() -> Optional[str]: |
| """Get backend URL from secrets or environment.""" |
| |
| try: |
| if hasattr(st, 'secrets'): |
| if "BACKEND_URL" in st.secrets: |
| return st.secrets["BACKEND_URL"] |
| if "backend" in st.secrets and "url" in st.secrets["backend"]: |
| return st.secrets["backend"]["url"] |
| except: |
| pass |
| |
| return os.environ.get("SPARKNET_BACKEND_URL") |
|
|
|
|
| def is_backend_configured() -> bool: |
| """Check if backend is configured.""" |
| return get_backend_url() is not None |
|
|
|
|
| @dataclass |
| class BackendResponse: |
| """Generic backend response wrapper.""" |
| success: bool |
| data: Dict[str, Any] |
| error: Optional[str] = None |
|
|
|
|
| class BackendClient: |
| """ |
| Client for SPARKNET Backend API. |
| |
| Provides methods to: |
| - Check backend health and status |
| - Process documents (OCR, layout detection) |
| - Index documents to RAG |
| - Query RAG system |
| - Search similar chunks |
| """ |
|
|
| def __init__(self, base_url: Optional[str] = None, timeout: float = 120.0): |
| self.base_url = base_url or get_backend_url() |
| self.timeout = timeout |
| self._client = None |
|
|
| @property |
| def is_configured(self) -> bool: |
| return self.base_url is not None |
|
|
| def _get_client(self) -> httpx.Client: |
| if self._client is None: |
| self._client = httpx.Client( |
| base_url=self.base_url, |
| timeout=self.timeout, |
| ) |
| return self._client |
|
|
| def close(self): |
| if self._client: |
| self._client.close() |
| self._client = None |
|
|
| def health_check(self) -> BackendResponse: |
| """Check if backend is healthy.""" |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
| resp = client.get("/api/health") |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def get_status(self) -> BackendResponse: |
| """Get backend system status.""" |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
| resp = client.get("/api/status") |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def process_document( |
| self, |
| file_bytes: bytes, |
| filename: str, |
| ocr_engine: str = "paddleocr", |
| max_pages: int = 10, |
| enable_layout: bool = True, |
| preserve_tables: bool = True, |
| ) -> BackendResponse: |
| """ |
| Process a document using the backend. |
| |
| Args: |
| file_bytes: Document content as bytes |
| filename: Original filename |
| ocr_engine: OCR engine to use (paddleocr, tesseract) |
| max_pages: Maximum pages to process |
| enable_layout: Enable layout detection |
| preserve_tables: Preserve table structure |
| |
| Returns: |
| BackendResponse with processing results |
| """ |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
|
|
| files = {"file": (filename, file_bytes)} |
| data = { |
| "ocr_engine": ocr_engine, |
| "max_pages": str(max_pages), |
| "enable_layout": str(enable_layout).lower(), |
| "preserve_tables": str(preserve_tables).lower(), |
| } |
|
|
| resp = client.post("/api/process", files=files, data=data) |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def index_document( |
| self, |
| doc_id: str, |
| text: str, |
| chunks: List[Dict[str, Any]], |
| metadata: Optional[Dict[str, Any]] = None, |
| ) -> BackendResponse: |
| """ |
| Index a document into the RAG system. |
| |
| Args: |
| doc_id: Document identifier |
| text: Full document text |
| chunks: List of chunk dictionaries |
| metadata: Optional metadata |
| |
| Returns: |
| BackendResponse with indexing results |
| """ |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
|
|
| payload = { |
| "doc_id": doc_id, |
| "text": text, |
| "chunks": chunks, |
| "metadata": metadata or {}, |
| } |
|
|
| resp = client.post("/api/index", json=payload) |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def query( |
| self, |
| question: str, |
| filters: Optional[Dict[str, Any]] = None, |
| top_k: int = 5, |
| ) -> BackendResponse: |
| """ |
| Query the RAG system. |
| |
| Args: |
| question: Query question |
| filters: Optional filters (e.g., document_id) |
| top_k: Number of results |
| |
| Returns: |
| BackendResponse with answer and sources |
| """ |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
|
|
| payload = { |
| "question": question, |
| "filters": filters, |
| "top_k": top_k, |
| } |
|
|
| resp = client.post("/api/query", json=payload) |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def search_similar( |
| self, |
| query: str, |
| top_k: int = 5, |
| doc_filter: Optional[str] = None, |
| ) -> BackendResponse: |
| """ |
| Search for similar chunks. |
| |
| Args: |
| query: Search query |
| top_k: Number of results |
| doc_filter: Optional document ID filter |
| |
| Returns: |
| BackendResponse with similar chunks |
| """ |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
|
|
| payload = { |
| "query": query, |
| "top_k": top_k, |
| "doc_filter": doc_filter, |
| } |
|
|
| resp = client.post("/api/search", json=payload) |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def list_documents(self) -> BackendResponse: |
| """List all indexed documents.""" |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
| resp = client.get("/api/documents") |
| resp.raise_for_status() |
| return BackendResponse(True, {"documents": resp.json()}) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
| def delete_document(self, doc_id: str) -> BackendResponse: |
| """Delete a document from the index.""" |
| if not self.is_configured: |
| return BackendResponse(False, {}, "Backend URL not configured") |
|
|
| try: |
| client = self._get_client() |
| resp = client.delete(f"/api/documents/{doc_id}") |
| resp.raise_for_status() |
| return BackendResponse(True, resp.json()) |
| except Exception as e: |
| return BackendResponse(False, {}, str(e)) |
|
|
|
|
| |
| _backend_client: Optional[BackendClient] = None |
|
|
|
|
| def get_backend_client() -> BackendClient: |
| """Get or create the backend client.""" |
| global _backend_client |
| if _backend_client is None: |
| _backend_client = BackendClient() |
| return _backend_client |
|
|
|
|
| def check_backend_available() -> Tuple[bool, Dict[str, Any]]: |
| """ |
| Check if backend is available and return status. |
| |
| Returns: |
| Tuple of (available, status_dict) |
| """ |
| client = get_backend_client() |
|
|
| if not client.is_configured: |
| return False, {"error": "Backend URL not configured"} |
|
|
| |
| health = client.health_check() |
| if not health.success: |
| return False, {"error": f"Backend not reachable: {health.error}"} |
|
|
| |
| status = client.get_status() |
| if not status.success: |
| return False, {"error": f"Failed to get status: {status.error}"} |
|
|
| return True, status.data |
|
|