| """ |
| Unified State Manager for SPARKNET Demo |
| |
| Enhanced state management for cross-module communication (Phase 1B): |
| - Document processing state tracking |
| - Indexed documents registry |
| - Cross-module event system (pub/sub) |
| - Real-time status updates |
| - Evidence highlighting synchronization |
| - Document selection synchronization |
| - Query/response sharing between modules |
| """ |
|
|
| import streamlit as st |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional, Callable, Set |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from enum import Enum |
| import hashlib |
| import json |
| import sys |
| import time |
| from threading import Lock |
|
|
| PROJECT_ROOT = Path(__file__).parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
|
|
| |
| |
| |
|
|
| class EventType(str, Enum): |
| """Cross-module event types for synchronization.""" |
| DOCUMENT_SELECTED = "document_selected" |
| DOCUMENT_PROCESSED = "document_processed" |
| DOCUMENT_INDEXED = "document_indexed" |
| DOCUMENT_REMOVED = "document_removed" |
| CHUNK_SELECTED = "chunk_selected" |
| EVIDENCE_HIGHLIGHT = "evidence_highlight" |
| RAG_QUERY_STARTED = "rag_query_started" |
| RAG_QUERY_COMPLETED = "rag_query_completed" |
| PAGE_CHANGED = "page_changed" |
| PROCESSING_STARTED = "processing_started" |
| PROCESSING_COMPLETED = "processing_completed" |
| SYSTEM_STATUS_CHANGED = "system_status_changed" |
|
|
|
|
| @dataclass |
| class Event: |
| """Cross-module event for synchronization.""" |
| event_type: EventType |
| source_module: str |
| payload: Dict[str, Any] |
| timestamp: datetime = field(default_factory=datetime.now) |
| event_id: str = field(default_factory=lambda: hashlib.md5( |
| f"{time.time()}".encode() |
| ).hexdigest()[:8]) |
|
|
|
|
| @dataclass |
| class EvidenceHighlight: |
| """Evidence highlight for cross-module visualization.""" |
| doc_id: str |
| chunk_id: str |
| page: int |
| bbox: tuple |
| text_snippet: str |
| confidence: float |
| source_query: Optional[str] = None |
| highlight_color: str = "#FFE082" |
|
|
|
|
| @dataclass |
| class ProcessedDocument: |
| """Represents a processed document with all extracted data.""" |
| doc_id: str |
| filename: str |
| file_type: str |
| raw_text: str |
| chunks: List[Dict[str, Any]] |
| page_count: int = 1 |
| page_images: List[bytes] = field(default_factory=list) |
| ocr_regions: List[Dict[str, Any]] = field(default_factory=list) |
| layout_data: Dict[str, Any] = field(default_factory=dict) |
| metadata: Dict[str, Any] = field(default_factory=dict) |
| indexed: bool = False |
| indexed_chunks: int = 0 |
| processing_time: float = 0.0 |
| created_at: datetime = field(default_factory=datetime.now) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "doc_id": self.doc_id, |
| "filename": self.filename, |
| "file_type": self.file_type, |
| "text_length": len(self.raw_text), |
| "chunk_count": len(self.chunks), |
| "page_count": self.page_count, |
| "ocr_region_count": len(self.ocr_regions), |
| "indexed": self.indexed, |
| "indexed_chunks": self.indexed_chunks, |
| "processing_time": self.processing_time, |
| "created_at": self.created_at.isoformat(), |
| } |
|
|
|
|
| @dataclass |
| class ProcessingStatus: |
| """Tracks processing status for a document.""" |
| doc_id: str |
| stage: str |
| progress: float |
| message: str |
| started_at: datetime = field(default_factory=datetime.now) |
| completed_at: Optional[datetime] = None |
| error: Optional[str] = None |
|
|
|
|
| class UnifiedStateManager: |
| """ |
| Central state manager for SPARKNET demo. |
| |
| Enhanced with Phase 1B features: |
| - Document processing state tracking |
| - Indexed documents registry |
| - Cross-module event system (pub/sub) |
| - Real-time status updates |
| - Evidence highlighting sync |
| - Query/response sharing |
| """ |
|
|
| def __init__(self): |
| self._ensure_session_state() |
| self._event_handlers: Dict[EventType, List[Callable]] = {} |
|
|
| def _ensure_session_state(self): |
| """Initialize session state if not exists.""" |
| if "unified_state" not in st.session_state: |
| st.session_state.unified_state = { |
| "documents": {}, |
| "processing_status": {}, |
| "indexed_doc_ids": set(), |
| "active_doc_id": None, |
| "active_page": 0, |
| "active_chunk_id": None, |
| "notifications": [], |
| "rag_ready": False, |
| "total_indexed_chunks": 0, |
| "last_update": datetime.now().isoformat(), |
| |
| "event_queue": [], |
| "evidence_highlights": [], |
| "last_rag_query": None, |
| "last_rag_response": None, |
| "selected_sources": [], |
| "module_states": {}, |
| "sync_version": 0, |
| } |
|
|
| @property |
| def state(self) -> Dict: |
| """Get the unified state dict.""" |
| self._ensure_session_state() |
| return st.session_state.unified_state |
|
|
| |
|
|
| def add_document(self, doc: ProcessedDocument) -> str: |
| """Add a processed document to the state.""" |
| self.state["documents"][doc.doc_id] = doc |
| self._notify(f"Document '{doc.filename}' added", "info") |
| self._update_timestamp() |
| return doc.doc_id |
|
|
| def get_document(self, doc_id: str) -> Optional[ProcessedDocument]: |
| """Get a document by ID.""" |
| return self.state["documents"].get(doc_id) |
|
|
| def get_all_documents(self) -> List[ProcessedDocument]: |
| """Get all documents.""" |
| return list(self.state["documents"].values()) |
|
|
| def get_indexed_documents(self) -> List[ProcessedDocument]: |
| """Get only indexed documents.""" |
| return [d for d in self.state["documents"].values() if d.indexed] |
|
|
| def remove_document(self, doc_id: str): |
| """Remove a document from state.""" |
| if doc_id in self.state["documents"]: |
| doc = self.state["documents"].pop(doc_id) |
| self.state["indexed_doc_ids"].discard(doc_id) |
| self._notify(f"Document '{doc.filename}' removed", "warning") |
| self._update_timestamp() |
|
|
| def set_active_document(self, doc_id: Optional[str]): |
| """Set the currently active document.""" |
| self.state["active_doc_id"] = doc_id |
| self._update_timestamp() |
|
|
| def get_active_document(self) -> Optional[ProcessedDocument]: |
| """Get the currently active document.""" |
| if self.state["active_doc_id"]: |
| return self.get_document(self.state["active_doc_id"]) |
| return None |
|
|
| |
|
|
| def start_processing(self, doc_id: str, filename: str): |
| """Start processing a document.""" |
| status = ProcessingStatus( |
| doc_id=doc_id, |
| stage="loading", |
| progress=0.0, |
| message=f"Loading {filename}..." |
| ) |
| self.state["processing_status"][doc_id] = status |
| self._update_timestamp() |
|
|
| def update_processing(self, doc_id: str, stage: str, progress: float, message: str): |
| """Update processing status.""" |
| if doc_id in self.state["processing_status"]: |
| status = self.state["processing_status"][doc_id] |
| status.stage = stage |
| status.progress = progress |
| status.message = message |
| self._update_timestamp() |
|
|
| def complete_processing(self, doc_id: str, success: bool = True, error: str = None): |
| """Mark processing as complete.""" |
| if doc_id in self.state["processing_status"]: |
| status = self.state["processing_status"][doc_id] |
| status.stage = "complete" if success else "error" |
| status.progress = 1.0 if success else status.progress |
| status.completed_at = datetime.now() |
| status.error = error |
| status.message = "Processing complete!" if success else f"Error: {error}" |
|
|
| if success: |
| self._notify(f"Document processed successfully!", "success") |
| else: |
| self._notify(f"Processing failed: {error}", "error") |
|
|
| self._update_timestamp() |
|
|
| def get_processing_status(self, doc_id: str) -> Optional[ProcessingStatus]: |
| """Get processing status for a document.""" |
| return self.state["processing_status"].get(doc_id) |
|
|
| def is_processing(self, doc_id: str) -> bool: |
| """Check if document is being processed.""" |
| status = self.get_processing_status(doc_id) |
| return status is not None and status.stage not in ["complete", "error"] |
|
|
| |
|
|
| def mark_indexed(self, doc_id: str, chunk_count: int): |
| """Mark a document as indexed to RAG.""" |
| if doc_id in self.state["documents"]: |
| doc = self.state["documents"][doc_id] |
| doc.indexed = True |
| doc.indexed_chunks = chunk_count |
| self.state["indexed_doc_ids"].add(doc_id) |
| self.state["total_indexed_chunks"] += chunk_count |
| self._notify(f"Indexed {chunk_count} chunks from '{doc.filename}'", "success") |
| self._update_timestamp() |
|
|
| def is_indexed(self, doc_id: str) -> bool: |
| """Check if document is indexed.""" |
| return doc_id in self.state["indexed_doc_ids"] |
|
|
| def get_total_indexed_chunks(self) -> int: |
| """Get total number of indexed chunks.""" |
| return self.state["total_indexed_chunks"] |
|
|
| |
|
|
| def _notify(self, message: str, level: str = "info"): |
| """Add a notification.""" |
| self.state["notifications"].append({ |
| "message": message, |
| "level": level, |
| "timestamp": datetime.now().isoformat(), |
| }) |
| |
| if len(self.state["notifications"]) > 50: |
| self.state["notifications"] = self.state["notifications"][-50:] |
|
|
| def get_notifications(self, limit: int = 10) -> List[Dict]: |
| """Get recent notifications.""" |
| return self.state["notifications"][-limit:] |
|
|
| def clear_notifications(self): |
| """Clear all notifications.""" |
| self.state["notifications"] = [] |
|
|
| |
|
|
| def set_rag_ready(self, ready: bool): |
| """Set RAG system ready status.""" |
| self.state["rag_ready"] = ready |
| self._update_timestamp() |
|
|
| def is_rag_ready(self) -> bool: |
| """Check if RAG is ready.""" |
| return self.state["rag_ready"] |
|
|
| |
|
|
| def _update_timestamp(self): |
| """Update the last update timestamp.""" |
| self.state["last_update"] = datetime.now().isoformat() |
| self.state["sync_version"] += 1 |
|
|
| def get_summary(self) -> Dict[str, Any]: |
| """Get a summary of current state.""" |
| return { |
| "total_documents": len(self.state["documents"]), |
| "indexed_documents": len(self.state["indexed_doc_ids"]), |
| "total_indexed_chunks": self.state["total_indexed_chunks"], |
| "active_doc_id": self.state["active_doc_id"], |
| "active_page": self.state.get("active_page", 0), |
| "rag_ready": self.state["rag_ready"], |
| "last_update": self.state["last_update"], |
| "sync_version": self.state.get("sync_version", 0), |
| "processing_count": sum( |
| 1 for s in self.state["processing_status"].values() |
| if s.stage not in ["complete", "error"] |
| ), |
| "evidence_count": len(self.state.get("evidence_highlights", [])), |
| } |
|
|
| def reset(self): |
| """Reset all state.""" |
| st.session_state.unified_state = { |
| "documents": {}, |
| "processing_status": {}, |
| "indexed_doc_ids": set(), |
| "active_doc_id": None, |
| "active_page": 0, |
| "active_chunk_id": None, |
| "notifications": [], |
| "rag_ready": False, |
| "total_indexed_chunks": 0, |
| "last_update": datetime.now().isoformat(), |
| "event_queue": [], |
| "evidence_highlights": [], |
| "last_rag_query": None, |
| "last_rag_response": None, |
| "selected_sources": [], |
| "module_states": {}, |
| "sync_version": 0, |
| } |
|
|
| |
|
|
| def publish_event( |
| self, |
| event_type: EventType, |
| source_module: str, |
| payload: Dict[str, Any] |
| ) -> Event: |
| """ |
| Publish an event for cross-module synchronization. |
| |
| Args: |
| event_type: Type of event |
| source_module: Name of module publishing the event |
| payload: Event data |
| |
| Returns: |
| The created Event object |
| """ |
| event = Event( |
| event_type=event_type, |
| source_module=source_module, |
| payload=payload |
| ) |
|
|
| |
| self.state["event_queue"].append(event) |
|
|
| |
| if len(self.state["event_queue"]) > 100: |
| self.state["event_queue"] = self.state["event_queue"][-100:] |
|
|
| |
| if event_type in self._event_handlers: |
| for handler in self._event_handlers[event_type]: |
| try: |
| handler(event) |
| except Exception as e: |
| self._notify(f"Event handler error: {e}", "error") |
|
|
| self._update_timestamp() |
| return event |
|
|
| def subscribe(self, event_type: EventType, handler: Callable[[Event], None]): |
| """ |
| Subscribe to an event type. |
| |
| Args: |
| event_type: Type of event to subscribe to |
| handler: Callback function to handle the event |
| """ |
| if event_type not in self._event_handlers: |
| self._event_handlers[event_type] = [] |
| self._event_handlers[event_type].append(handler) |
|
|
| def unsubscribe(self, event_type: EventType, handler: Callable[[Event], None]): |
| """Unsubscribe from an event type.""" |
| if event_type in self._event_handlers: |
| self._event_handlers[event_type] = [ |
| h for h in self._event_handlers[event_type] if h != handler |
| ] |
|
|
| def get_recent_events( |
| self, |
| event_type: Optional[EventType] = None, |
| limit: int = 10 |
| ) -> List[Event]: |
| """Get recent events, optionally filtered by type.""" |
| events = self.state.get("event_queue", []) |
|
|
| if event_type: |
| events = [e for e in events if e.event_type == event_type] |
|
|
| return events[-limit:] |
|
|
| |
|
|
| def add_evidence_highlight(self, highlight: EvidenceHighlight): |
| """ |
| Add an evidence highlight for cross-module visualization. |
| |
| Used when RAG finds relevant evidence that should be displayed |
| in the Document Viewer or Evidence Viewer. |
| """ |
| self.state["evidence_highlights"].append(highlight) |
|
|
| |
| self.publish_event( |
| EventType.EVIDENCE_HIGHLIGHT, |
| source_module="rag", |
| payload={ |
| "doc_id": highlight.doc_id, |
| "chunk_id": highlight.chunk_id, |
| "page": highlight.page, |
| "bbox": highlight.bbox, |
| "text_snippet": highlight.text_snippet[:100], |
| } |
| ) |
|
|
| self._update_timestamp() |
|
|
| def clear_evidence_highlights(self, doc_id: Optional[str] = None): |
| """Clear evidence highlights, optionally for a specific document.""" |
| if doc_id: |
| self.state["evidence_highlights"] = [ |
| h for h in self.state["evidence_highlights"] |
| if h.doc_id != doc_id |
| ] |
| else: |
| self.state["evidence_highlights"] = [] |
|
|
| self._update_timestamp() |
|
|
| def get_evidence_highlights( |
| self, |
| doc_id: Optional[str] = None, |
| page: Optional[int] = None |
| ) -> List[EvidenceHighlight]: |
| """Get evidence highlights, optionally filtered by doc_id and page.""" |
| highlights = self.state.get("evidence_highlights", []) |
|
|
| if doc_id: |
| highlights = [h for h in highlights if h.doc_id == doc_id] |
|
|
| if page is not None: |
| highlights = [h for h in highlights if h.page == page] |
|
|
| return highlights |
|
|
| |
|
|
| def select_page(self, page: int, source_module: str = "unknown"): |
| """ |
| Set the active page and notify other modules. |
| |
| Used for synchronized scrolling between Document Viewer and Evidence Viewer. |
| """ |
| old_page = self.state.get("active_page", 0) |
| self.state["active_page"] = page |
|
|
| if old_page != page: |
| self.publish_event( |
| EventType.PAGE_CHANGED, |
| source_module=source_module, |
| payload={"page": page, "previous_page": old_page} |
| ) |
|
|
| def get_active_page(self) -> int: |
| """Get the currently active page.""" |
| return self.state.get("active_page", 0) |
|
|
| def select_chunk( |
| self, |
| chunk_id: str, |
| doc_id: str, |
| source_module: str = "unknown" |
| ): |
| """ |
| Select a chunk and navigate to its location. |
| |
| Publishes event to trigger synchronized navigation. |
| """ |
| self.state["active_chunk_id"] = chunk_id |
|
|
| |
| doc = self.get_document(doc_id) |
| if doc: |
| for chunk in doc.chunks: |
| if chunk.get("chunk_id") == chunk_id: |
| page = chunk.get("page", 0) |
| self.select_page(page, source_module) |
|
|
| self.publish_event( |
| EventType.CHUNK_SELECTED, |
| source_module=source_module, |
| payload={ |
| "chunk_id": chunk_id, |
| "doc_id": doc_id, |
| "page": page, |
| "bbox": chunk.get("bbox"), |
| } |
| ) |
| break |
|
|
| def get_active_chunk_id(self) -> Optional[str]: |
| """Get the currently selected chunk ID.""" |
| return self.state.get("active_chunk_id") |
|
|
| |
|
|
| def store_rag_query( |
| self, |
| query: str, |
| response: Dict[str, Any], |
| sources: List[Dict[str, Any]] |
| ): |
| """ |
| Store the last RAG query and response for cross-module access. |
| |
| Allows Evidence Viewer to display sources from Interactive RAG. |
| """ |
| self.state["last_rag_query"] = query |
| self.state["last_rag_response"] = response |
| self.state["selected_sources"] = sources |
|
|
| |
| self.clear_evidence_highlights() |
|
|
| for source in sources: |
| if all(k in source for k in ["doc_id", "chunk_id", "page"]): |
| bbox = source.get("bbox", (0, 0, 1, 1)) |
| if isinstance(bbox, dict): |
| bbox = (bbox.get("x_min", 0), bbox.get("y_min", 0), |
| bbox.get("x_max", 1), bbox.get("y_max", 1)) |
|
|
| highlight = EvidenceHighlight( |
| doc_id=source["doc_id"], |
| chunk_id=source["chunk_id"], |
| page=source["page"], |
| bbox=bbox, |
| text_snippet=source.get("text", "")[:200], |
| confidence=source.get("score", 0.0), |
| source_query=query, |
| ) |
| self.add_evidence_highlight(highlight) |
|
|
| self.publish_event( |
| EventType.RAG_QUERY_COMPLETED, |
| source_module="rag", |
| payload={ |
| "query": query, |
| "source_count": len(sources), |
| "response_length": len(str(response)), |
| } |
| ) |
|
|
| self._update_timestamp() |
|
|
| def get_last_rag_query(self) -> Optional[str]: |
| """Get the last RAG query.""" |
| return self.state.get("last_rag_query") |
|
|
| def get_last_rag_response(self) -> Optional[Dict[str, Any]]: |
| """Get the last RAG response.""" |
| return self.state.get("last_rag_response") |
|
|
| def get_selected_sources(self) -> List[Dict[str, Any]]: |
| """Get the sources from the last RAG query.""" |
| return self.state.get("selected_sources", []) |
|
|
| |
|
|
| def set_module_state(self, module_name: str, state: Dict[str, Any]): |
| """ |
| Store custom state for a specific module. |
| |
| Allows modules to persist their own state across reruns. |
| """ |
| self.state["module_states"][module_name] = { |
| **state, |
| "updated_at": datetime.now().isoformat() |
| } |
|
|
| def get_module_state(self, module_name: str) -> Dict[str, Any]: |
| """Get custom state for a specific module.""" |
| return self.state.get("module_states", {}).get(module_name, {}) |
|
|
| def get_sync_version(self) -> int: |
| """ |
| Get the current sync version. |
| |
| Modules can use this to detect if state has changed since last check. |
| """ |
| return self.state.get("sync_version", 0) |
|
|
|
|
| def generate_doc_id(filename: str, content_hash: str = None) -> str: |
| """Generate a unique document ID.""" |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| base = f"{filename}_{timestamp}" |
| if content_hash: |
| base = f"{base}_{content_hash[:8]}" |
| return hashlib.md5(base.encode()).hexdigest()[:12] |
|
|
|
|
| def get_state_manager() -> UnifiedStateManager: |
| """Get or create the unified state manager.""" |
| if "state_manager_instance" not in st.session_state: |
| st.session_state.state_manager_instance = UnifiedStateManager() |
| return st.session_state.state_manager_instance |
|
|
|
|
| |
|
|
| def render_global_status_bar(): |
| """Render a global status bar showing system state.""" |
| manager = get_state_manager() |
| summary = manager.get_summary() |
|
|
| |
| try: |
| from rag_config import get_unified_rag_system, check_ollama, check_cloud_providers |
| rag_system = get_unified_rag_system() |
| ollama_ok, models = check_ollama() |
| cloud_providers = check_cloud_providers() |
| rag_status = rag_system["status"] |
| rag_mode = rag_system.get("mode", "error") |
| llm_model = rag_system.get("llm_model", "N/A") |
| gpu_available = rag_system.get("gpu_available", False) |
| gpu_name = rag_system.get("gpu_name", "") |
| except: |
| ollama_ok = False |
| cloud_providers = {} |
| rag_status = "error" |
| rag_mode = "error" |
| llm_model = "N/A" |
| models = [] |
| gpu_available = False |
| gpu_name = "" |
|
|
| |
| cols = st.columns(6) |
|
|
| with cols[0]: |
| if rag_mode == "backend": |
| if gpu_available: |
| st.success(f"GPU Backend") |
| else: |
| st.success("Backend") |
| elif ollama_ok: |
| st.success(f"Ollama ({len(models)})") |
| elif cloud_providers: |
| st.info(f"Cloud ({len(cloud_providers)})") |
| else: |
| st.warning("Demo Mode") |
|
|
| with cols[1]: |
| if rag_status == "ready": |
| if rag_mode == "backend": |
| st.success("RAG (Backend)") |
| else: |
| st.success("RAG Ready") |
| elif rag_mode == "cloud": |
| st.info("Cloud LLM") |
| elif rag_mode == "demo": |
| st.warning("Demo Mode") |
| else: |
| st.error("RAG Error") |
|
|
| with cols[2]: |
| if rag_mode == "backend": |
| if gpu_name: |
| st.info(f"{gpu_name[:12]}") |
| else: |
| st.info(f"{llm_model.split(':')[0] if llm_model else 'Backend'}") |
| elif rag_mode == "cloud" and cloud_providers: |
| provider_name = list(cloud_providers.keys())[0].title() |
| st.info(f"{provider_name}") |
| elif llm_model != "N/A": |
| st.info(f"{llm_model.split(':')[0]}") |
| else: |
| st.info("Offline") |
|
|
| with cols[3]: |
| st.info(f"{summary['total_documents']} Docs") |
|
|
| with cols[4]: |
| if rag_mode == "backend": |
| indexed = rag_system.get("indexed_chunks", 0) |
| if indexed > 0: |
| st.success(f"{indexed} Chunks") |
| else: |
| st.info("0 Chunks") |
| elif summary['indexed_documents'] > 0: |
| st.success(f"{summary['total_indexed_chunks']} Chunks") |
| else: |
| st.warning("0 Chunks") |
|
|
| with cols[5]: |
| if summary['processing_count'] > 0: |
| st.warning(f"Processing...") |
| else: |
| st.info("Idle") |
|
|
|
|
| def render_notifications(): |
| """Render recent notifications.""" |
| manager = get_state_manager() |
| notifications = manager.get_notifications(5) |
|
|
| if notifications: |
| for notif in reversed(notifications): |
| level = notif["level"] |
| msg = notif["message"] |
| if level == "success": |
| st.success(msg) |
| elif level == "error": |
| st.error(msg) |
| elif level == "warning": |
| st.warning(msg) |
| else: |
| st.info(msg) |
|
|
|
|
| |
|
|
| def render_evidence_panel(): |
| """ |
| Render a panel showing current evidence highlights. |
| |
| Can be used in any module to show sources from RAG queries. |
| """ |
| manager = get_state_manager() |
| highlights = manager.get_evidence_highlights() |
|
|
| if not highlights: |
| st.info("No evidence highlights. Run a RAG query to see sources.") |
| return |
|
|
| st.subheader(f"Evidence Sources ({len(highlights)})") |
|
|
| for i, h in enumerate(highlights): |
| with st.expander(f"Source {i+1}: Page {h.page + 1} ({h.confidence:.0%})"): |
| st.markdown(f"**Document:** {h.doc_id}") |
| st.markdown(f"**Text:** {h.text_snippet}") |
|
|
| if h.source_query: |
| st.markdown(f"**Query:** _{h.source_query}_") |
|
|
| |
| if st.button(f"View in Document", key=f"view_source_{i}"): |
| manager.set_active_document(h.doc_id) |
| manager.select_page(h.page, "evidence_panel") |
| manager.select_chunk(h.chunk_id, h.doc_id, "evidence_panel") |
| st.rerun() |
|
|
|
|
| def render_sync_status(): |
| """Render sync status indicator for debugging.""" |
| manager = get_state_manager() |
| summary = manager.get_summary() |
|
|
| with st.expander("Sync Status", expanded=False): |
| st.json({ |
| "sync_version": summary["sync_version"], |
| "active_doc": summary["active_doc_id"], |
| "active_page": summary["active_page"], |
| "evidence_count": summary["evidence_count"], |
| "last_update": summary["last_update"], |
| }) |
|
|
| |
| events = manager.get_recent_events(limit=5) |
| if events: |
| st.subheader("Recent Events") |
| for event in reversed(events): |
| st.text(f"{event.event_type.value}: {event.source_module}") |
|
|
|
|
| def render_document_selector(): |
| """ |
| Render a document selector that syncs with state manager. |
| |
| Returns the selected document ID. |
| """ |
| manager = get_state_manager() |
| documents = manager.get_all_documents() |
|
|
| if not documents: |
| st.info("No documents uploaded. Upload a document to get started.") |
| return None |
|
|
| |
| active_doc_id = manager.state.get("active_doc_id") |
|
|
| |
| options = {doc.doc_id: f"{doc.filename} ({doc.indexed_chunks} chunks)" for doc in documents} |
| option_list = list(options.keys()) |
|
|
| |
| current_index = option_list.index(active_doc_id) if active_doc_id in option_list else 0 |
|
|
| |
| selected_id = st.selectbox( |
| "Select Document", |
| options=option_list, |
| format_func=lambda x: options[x], |
| index=current_index, |
| key="global_doc_selector" |
| ) |
|
|
| |
| if selected_id != active_doc_id: |
| manager.set_active_document(selected_id) |
| manager.publish_event( |
| EventType.DOCUMENT_SELECTED, |
| source_module="selector", |
| payload={"doc_id": selected_id} |
| ) |
|
|
| return selected_id |
|
|
|
|
| def create_sync_callback(module_name: str) -> Callable: |
| """ |
| Create a rerun callback for a module. |
| |
| Returns a function that can be used as an event handler |
| to trigger Streamlit rerun when relevant events occur. |
| """ |
| def callback(event: Event): |
| |
| if event.source_module != module_name: |
| |
| st.session_state[f"_{module_name}_needs_rerun"] = True |
|
|
| return callback |
|
|