| """ |
| SIMPLIFIED VERSION - No monkey patching, just clean type hints |
| """ |
| import os |
| import re |
| import shutil |
| import traceback |
| import gradio as gr |
| from pathlib import Path |
| from typing import List, Dict, Any, Tuple, Optional |
| from histopath.agent import A1 |
| from dotenv import load_dotenv |
|
|
| |
| if os.path.exists(".env"): |
| load_dotenv() |
|
|
| PASSCODE = os.getenv("GRADIO_PASSWORD", "TESTING") |
| agent = None |
|
|
|
|
| def check_for_output_files(): |
| """Check output directory for files.""" |
| output_dir = Path("./output") |
| if not output_dir.exists(): |
| return [], [] |
| |
| images = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}] |
| data = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".csv", ".txt", ".json", ".npy"}] |
| |
| return images, data |
|
|
|
|
| def preview_file(file): |
| """Preview uploaded file.""" |
| if file is None: |
| return None, None, "No file" |
| |
| path = Path(file.name) |
| if path.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}: |
| return file.name, None, f"Preview: {path.name}" |
| else: |
| size = path.stat().st_size / 1024 |
| return None, file.name, f"File: {path.name} ({size:.1f} KB)" |
|
|
|
|
| def parse_output(text): |
| """Parse agent output.""" |
| text = re.sub(r'={30,}.*?={30,}', '', text, flags=re.DOTALL).strip() |
| |
| result = {"type": "text", "content": text, "code": None, "obs": None, "think": None} |
| |
| code_match = re.search(r'<execute>(.*?)</execute>', text, re.DOTALL) |
| if code_match: |
| result["type"] = "code" |
| result["code"] = code_match.group(1).strip() |
| before = text[:code_match.start()].strip() |
| before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
| result["think"] = before or None |
| return result |
| |
| obs_match = re.search(r'<observation>(.*?)</observation>', text, re.DOTALL) |
| if obs_match: |
| result["type"] = "obs" |
| result["obs"] = obs_match.group(1).strip() |
| before = text[:obs_match.start()].strip() |
| before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
| result["think"] = before or None |
| return result |
| |
| sol_match = re.search(r'<solution>(.*?)</solution>', text, re.DOTALL) |
| if sol_match: |
| result["type"] = "solution" |
| result["content"] = sol_match.group(1).strip() |
| before = text[:sol_match.start()].strip() |
| before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
| result["think"] = before or None |
| return result |
| |
| result["content"] = re.sub(r'<think>(.*?)</think>', r'\1', text, flags=re.DOTALL) |
| return result |
|
|
|
|
| def format_display(parsed): |
| """Format for display.""" |
| parts = [] |
| |
| if parsed.get("think"): |
| parts.append(parsed["think"]) |
| |
| if parsed["type"] == "code": |
| if parsed.get("think"): |
| parts.append("\n---\n") |
| parts.append("### π» Code\n") |
| parts.append(f"```python\n{parsed['code']}\n```") |
| elif parsed["type"] == "obs": |
| if parsed.get("think"): |
| parts.append("\n---\n") |
| parts.append("### π Output\n") |
| parts.append(f"```\n{parsed['obs']}\n```") |
| elif parsed["type"] == "solution": |
| if parsed.get("think"): |
| parts.append("\n---\n") |
| parts.append("### β
Solution\n") |
| parts.append(parsed['content']) |
| else: |
| if not parsed.get("think"): |
| parts.append(parsed["content"]) |
| |
| return "\n\n".join(parts) |
|
|
|
|
| |
| def process_query(prompt: str, file: Any, history: List[Dict[str, str]]): |
| """Process user query - SIMPLE TYPES ONLY.""" |
| global agent |
| |
| |
| if history is None: |
| history = [] |
| |
| |
| if agent is None: |
| history.append({"role": "assistant", "content": "β οΈ Enter passcode first"}) |
| yield history, None, None, None, None, "Not initialized" |
| return |
| |
| |
| if not prompt.strip() and file is None: |
| history.append({"role": "assistant", "content": "β οΈ Provide prompt or file"}) |
| yield history, None, None, None, None, "No input" |
| return |
| |
| |
| if file is not None: |
| try: |
| Path("./data").mkdir(exist_ok=True) |
| fname = Path(file.name).name |
| fpath = Path("./data") / fname |
| shutil.copy(file.name, fpath) |
| prompt = f"{prompt}\n\nFile: {fpath}" if prompt.strip() else f"File at: {fpath}" |
| except Exception as e: |
| history.append({"role": "assistant", "content": f"β File error: {e}"}) |
| yield history, None, None, None, None, str(e) |
| return |
| |
| |
| history.append({"role": "user", "content": prompt}) |
| yield history, None, None, None, None, "Processing..." |
| |
| |
| try: |
| outputs = [] |
| for step in agent.go_stream(prompt): |
| outputs.append(step.get("output", "")) |
| |
| |
| for out in outputs: |
| if not out.strip(): |
| continue |
| parsed = parse_output(out) |
| msg = format_display(parsed) |
| if msg.strip(): |
| history.append({"role": "assistant", "content": msg}) |
| |
| |
| imgs, data = check_for_output_files() |
| yield history, imgs, data, None, None, f"β
Done ({len(outputs)} steps)" |
| |
| except Exception as e: |
| err = f"β Error:\n```\n{traceback.format_exc()}\n```" |
| history.append({"role": "assistant", "content": err}) |
| yield history, None, None, None, None, str(e) |
|
|
|
|
| def validate_pass(pwd: str): |
| """Validate passcode.""" |
| global agent |
| |
| if pwd == PASSCODE: |
| try: |
| agent = A1( |
| path="./data", |
| llm="claude-sonnet-4-20250514", |
| source="Anthropic", |
| use_tool_retriever=True, |
| timeout_seconds=600 |
| ) |
| return gr.update(visible=False), gr.update(visible=True), "β
Authenticated" |
| except Exception as e: |
| return gr.update(visible=True), gr.update(visible=False), f"β Init error: {e}" |
| else: |
| return gr.update(visible=True), gr.update(visible=False), "β Invalid passcode" |
|
|
|
|
| def clear_all(): |
| """Clear everything.""" |
| out_dir = Path("./output") |
| if out_dir.exists(): |
| for f in out_dir.iterdir(): |
| if f.is_file(): |
| f.unlink() |
| return [], None, None, None, None, "Cleared" |
|
|
|
|
| |
| with gr.Blocks(title="HistoPath") as demo: |
| gr.HTML("<h1 style='text-align:center'>π¬ HistoPath Agent</h1>") |
| |
| with gr.Group(visible=True) as pass_section: |
| gr.Markdown("### π Enter Passcode") |
| with gr.Row(): |
| pass_input = gr.Textbox(label="Passcode", type="password", scale=3) |
| pass_btn = gr.Button("Unlock", variant="primary", scale=1) |
| pass_status = gr.Textbox(label="Status", interactive=False) |
| |
| with gr.Group(visible=False) as main_section: |
| with gr.Row(): |
| with gr.Column(scale=3): |
| chat = gr.Chatbot(type="messages", height=500) |
| with gr.Row(): |
| msg = gr.Textbox(label="Query", placeholder="Enter query...", scale=4) |
| upload = gr.File(label="Upload", scale=1) |
| with gr.Row(): |
| send = gr.Button("Send", variant="primary", scale=2) |
| clear = gr.Button("Clear", scale=1) |
| status = gr.Textbox(value="Ready", interactive=False, show_label=False) |
| |
| with gr.Column(scale=2): |
| with gr.Tabs(): |
| with gr.Tab("Input"): |
| in_img = gr.Image(height=300) |
| in_file = gr.File(interactive=False) |
| in_stat = gr.Textbox(value="No file", interactive=False, show_label=False) |
| with gr.Tab("Images"): |
| out_imgs = gr.Gallery(height=500) |
| with gr.Tab("Data"): |
| out_data = gr.File(file_count="multiple", interactive=False) |
| |
| |
| pass_btn.click(validate_pass, [pass_input], [pass_section, main_section, pass_status]) |
| upload.change(preview_file, [upload], [in_img, in_file, in_stat]) |
| send.click(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) |
| clear.click(clear_all, None, [chat, out_imgs, out_data, in_img, in_file, status]) |
| msg.submit(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) |
|
|
|
|
| if __name__ == "__main__": |
| Path("./data").mkdir(exist_ok=True) |
| Path("./output").mkdir(exist_ok=True) |
| |
| print("=" * 50) |
| print("π¬ HistoPath Agent - Simplified") |
| print("=" * 50) |
| |
| demo.launch() |