import os import asyncio import httpx import json import re import uuid import base64 from io import BytesIO from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from datetime import datetime from pathlib import Path # python-docx imports from docx import Document as DocxDocument from docx.shared import Inches, Pt, RGBColor from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml.ns import qn from docx.oxml import OxmlElement app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") # free at pexels.com/api HF_API_KEY = os.getenv("HF_TOKEN", "") # HuggingFace token for image gen DOCS_DIR = Path("docs") DOCS_DIR.mkdir(exist_ok=True) PROVIDERS = { "gemini": { "name": "Google Gemini", "type": "gemini", "key": GOOGLE_API_KEY, }, "openrouter": { "name": "OpenRouter", "type": "openai_compat", "key": OPENROUTER_API_KEY, "base_url": "https://openrouter.ai/api/v1/chat/completions", "headers": { "HTTP-Referer": "https://huggingface.co/spaces/vfven/mission-control-ui", "X-Title": "Mission Control AI", }, }, "groq": { "name": "Groq", "type": "openai_compat", "key": GROQ_API_KEY, "base_url": "https://api.groq.com/openai/v1/chat/completions", "headers": {}, }, } # ── DEFAULT AGENTS (can be extended via UI) ─────────────────────────────── DEFAULT_AGENTS = [ { "key": "manager", "name": "Manager", "provider": "gemini", "role": "Gerente de proyecto experto en coordinar equipos y planificar estrategias. Cuando el usuario pide un documento o informe, DEBES indicar en tu respuesta qué agentes necesitas activar usando el formato JSON: {\"delegate\": [\"writer\", \"analyst\"]} al final de tu respuesta.", "models": ["gemini-2.5-flash-preview-04-17", "gemini-2.0-flash", "gemini-1.5-flash"], }, { "key": "developer", "name": "Developer", "provider": "openrouter", "role": "Programador senior especialista en crear aplicaciones y soluciones técnicas.", "models": ["qwen/qwen3-4b:free", "meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "google/gemma-3-12b-it:free"], }, { "key": "analyst", "name": "Analyst", "provider": "openrouter", "role": "Analista de negocios experto en evaluar viabilidad, riesgos y oportunidades. También revisa y critica documentos formales.", "models": ["meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "google/gemma-3-27b-it:free", "qwen/qwen3-4b:free"], }, { "key": "writer", "name": "Writer", "provider": "openrouter", "role": "Especialista en redacción de documentos formales, informes ejecutivos y reportes técnicos. Escribe en formato estructurado con secciones claras usando ### para títulos y ** para subtítulos.", "models": ["meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "qwen/qwen3-4b:free", "google/gemma-3-12b-it:free"], }, { "key": "image_agent", "name": "ImageAgent", "provider": "gemini", "role": "Agente especializado en buscar y proveer imágenes relevantes. Cuando se te pida imágenes sobre un tema, responde con una lista JSON de términos de búsqueda en inglés: {\"image_queries\": [\"term1\", \"term2\", \"term3\"]}", "models": ["gemini-2.0-flash", "gemini-1.5-flash"], }, ] SUBSTITUTE_MODELS = { "groq": ["llama-3.3-70b-versatile", "llama3-70b-8192", "gemma2-9b-it"], } # Runtime agent registry (can be extended) agent_registry = {a["key"]: dict(a) for a in DEFAULT_AGENTS} mission_history = [] # ── LLM CALLERS ─────────────────────────────────────────────────────────── async def call_gemini(model: str, system: str, user: str, key: str) -> str: url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}" payload = { "contents": [{"role": "user", "parts": [{"text": f"{system}\n\n{user}"}]}], "generationConfig": {"maxOutputTokens": 2048, "temperature": 0.7}, } async with httpx.AsyncClient(timeout=90) as client: r = await client.post(url, json=payload) r.raise_for_status() return r.json()["candidates"][0]["content"]["parts"][0]["text"] async def call_openai_compat(base_url: str, model: str, system: str, user: str, key: str, extra_headers: dict) -> str: headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra_headers} payload = { "model": model, "messages": [{"role": "system", "content": system}, {"role": "user", "content": user}], "max_tokens": 2048, "temperature": 0.7, } async with httpx.AsyncClient(timeout=90) as client: r = await client.post(base_url, json=payload, headers=headers) r.raise_for_status() return r.json()["choices"][0]["message"]["content"] async def call_agent_llm(agent: dict, task: str) -> str: prov_key = agent["provider"] provider = PROVIDERS[prov_key] system = f"Eres {agent['name']}. {agent['role']} Responde en español. Sé conciso y profesional." last_err = None for model in agent["models"]: try: if provider["type"] == "gemini": return await call_gemini(model, system, task, provider["key"]) else: return await call_openai_compat( provider["base_url"], model, system, task, provider["key"], provider.get("headers", {})) except Exception as e: last_err = str(e) # Groq fallback if GROQ_API_KEY: groq = PROVIDERS["groq"] for m in SUBSTITUTE_MODELS["groq"]: try: return await call_openai_compat( groq["base_url"], m, system, task, groq["key"], {}) except Exception as e: last_err = str(e) raise Exception(f"All providers failed: {last_err}") # ── IMAGE FETCHING ───────────────────────────────────────────────────────── async def fetch_pexels_image(query: str) -> bytes | None: if not PEXELS_API_KEY: return None try: async with httpx.AsyncClient(timeout=20) as client: r = await client.get( "https://api.pexels.com/v1/search", params={"query": query, "per_page": 1, "orientation": "landscape"}, headers={"Authorization": PEXELS_API_KEY} ) data = r.json() if data.get("photos"): img_url = data["photos"][0]["src"]["medium"] img_r = await client.get(img_url) return img_r.content except Exception: pass return None async def generate_hf_image(prompt: str) -> bytes | None: if not HF_API_KEY: return None try: async with httpx.AsyncClient(timeout=60) as client: r = await client.post( "https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-xl-base-1.0", headers={"Authorization": f"Bearer {HF_API_KEY}"}, json={"inputs": prompt, "parameters": {"width": 512, "height": 384}}, ) if r.status_code == 200 and r.headers.get("content-type", "").startswith("image"): return r.content except Exception: pass return None async def get_image_for_query(query: str) -> bytes | None: img = await fetch_pexels_image(query) if img: return img return await generate_hf_image(query) # ── DOCX BUILDER ────────────────────────────────────────────────────────── def build_docx(title: str, sections: dict, images: list[bytes], analyst_review: str) -> bytes: doc = DocxDocument() # Page margins for section in doc.sections: section.top_margin = Inches(1) section.bottom_margin = Inches(1) section.left_margin = Inches(1.2) section.right_margin = Inches(1.2) # Title title_para = doc.add_heading(title, 0) title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER run = title_para.runs[0] run.font.color.rgb = RGBColor(0x1a, 0x56, 0xdb) # Date + subtitle sub = doc.add_paragraph() sub.alignment = WD_ALIGN_PARAGRAPH.CENTER sub.add_run(f"Mission Control AI — {datetime.now().strftime('%B %d, %Y')}").italic = True doc.add_paragraph() # Horizontal rule p = doc.add_paragraph() pPr = p._p.get_or_add_pPr() pBdr = OxmlElement("w:pBdr") bottom = OxmlElement("w:bottom") bottom.set(qn("w:val"), "single") bottom.set(qn("w:sz"), "6") bottom.set(qn("w:color"), "1a56db") pBdr.append(bottom) pPr.append(pBdr) img_index = 0 # Writer content sections writer_text = sections.get("writer", "") current_lines = [] def flush_lines(): nonlocal current_lines if current_lines: para_text = " ".join(current_lines).strip() if para_text: p = doc.add_paragraph(para_text) p.paragraph_format.space_after = Pt(6) current_lines = [] for line in writer_text.split("\n"): stripped = line.strip() if not stripped: flush_lines() continue if stripped.startswith("### "): flush_lines() h = doc.add_heading(stripped[4:], level=2) # Insert image after each major section heading if available if img_index < len(images) and images[img_index]: try: img_stream = BytesIO(images[img_index]) doc.add_picture(img_stream, width=Inches(5)) last_para = doc.paragraphs[-1] last_para.alignment = WD_ALIGN_PARAGRAPH.CENTER cap = doc.add_paragraph(f"Figure {img_index + 1}") cap.alignment = WD_ALIGN_PARAGRAPH.CENTER cap.runs[0].italic = True cap.runs[0].font.size = Pt(9) img_index += 1 except Exception: pass elif stripped.startswith("## "): flush_lines() doc.add_heading(stripped[3:], level=1) elif stripped.startswith("**") and stripped.endswith("**"): flush_lines() p = doc.add_paragraph() run = p.add_run(stripped.strip("**")) run.bold = True elif stripped.startswith("- ") or stripped.startswith("* "): flush_lines() doc.add_paragraph(stripped[2:], style="List Bullet") else: current_lines.append(stripped) flush_lines() # Remaining images while img_index < len(images): if images[img_index]: try: img_stream = BytesIO(images[img_index]) doc.add_picture(img_stream, width=Inches(5)) last_para = doc.paragraphs[-1] last_para.alignment = WD_ALIGN_PARAGRAPH.CENTER img_index += 1 except Exception: img_index += 1 else: img_index += 1 # Analyst review section if analyst_review: doc.add_page_break() doc.add_heading("Análisis y Revisión", level=1) for line in analyst_review.split("\n"): if line.strip(): doc.add_paragraph(line.strip()) # Footer doc.add_paragraph() footer_p = doc.add_paragraph() footer_p.alignment = WD_ALIGN_PARAGRAPH.CENTER footer_run = footer_p.add_run("— Generado por Mission Control AI —") footer_run.italic = True footer_run.font.size = Pt(9) footer_run.font.color.rgb = RGBColor(0x6b, 0x72, 0x80) buf = BytesIO() doc.save(buf) buf.seek(0) return buf.read() # ── PARSE MANAGER DELEGATION ─────────────────────────────────────────────── def parse_delegation(manager_text: str) -> list[str]: match = re.search(r'\{[^}]*"delegate"\s*:\s*\[([^\]]*)\][^}]*\}', manager_text) if match: raw = match.group(1) keys = re.findall(r'"(\w+)"', raw) return keys # Heuristic fallback: detect keywords in manager response delegates = [] lower = manager_text.lower() if any(w in lower for w in ["informe", "documento", "redact", "escrib", "report", "word"]): delegates.extend(["writer"]) if any(w in lower for w in ["imagen", "image", "foto", "visual", "ilustr"]): delegates.extend(["image_agent"]) if any(w in lower for w in ["analiz", "revisar", "evalua", "critic"]): delegates.extend(["analyst"]) return list(dict.fromkeys(delegates)) # deduplicate def parse_image_queries(image_agent_text: str) -> list[str]: match = re.search(r'"image_queries"\s*:\s*\[([^\]]*)\]', image_agent_text) if match: return re.findall(r'"([^"]+)"', match.group(1)) return [] def clean_text(text: str) -> str: # Remove JSON blocks from display text text = re.sub(r'\{[^}]*"delegate"[^}]*\}', '', text) text = re.sub(r'\{[^}]*"image_queries"[^}]*\}', '', text) return text.strip() # ── ROUTES ───────────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse(Path("templates/index.html").read_text()) @app.get("/api/agents") async def get_agents(): return {"agents": [ {"key": a["key"], "name": a["name"], "role": a["role"]} for a in agent_registry.values() ]} @app.post("/api/agents/add") async def add_agent(request: Request): body = await request.json() key = re.sub(r'\W+', '_', body.get("key", "").lower().strip()) if not key: return JSONResponse({"error": "key required"}, status_code=400) agent_registry[key] = { "key": key, "name": body.get("name", key.capitalize()), "role": body.get("role", "Agente de propósito general."), "provider": body.get("provider", "openrouter"), "models": body.get("models", ["meta-llama/llama-3.3-70b-instruct:free"]), } return {"success": True, "agent": agent_registry[key]} @app.delete("/api/agents/{key}") async def delete_agent(key: str): if key in ("manager", "developer", "analyst"): return JSONResponse({"error": "Cannot delete core agents"}, status_code=400) if key in agent_registry: del agent_registry[key] return {"success": True} @app.get("/api/history") async def get_history(): return {"history": mission_history[-20:]} @app.get("/api/docs/{filename}") async def download_doc(filename: str): path = DOCS_DIR / filename if not path.exists() or not filename.endswith(".docx"): return JSONResponse({"error": "File not found"}, status_code=404) return FileResponse( path, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", filename=filename, ) @app.post("/api/mission") async def run_mission(request: Request): body = await request.json() task = body.get("task", "").strip() if not task: return JSONResponse({"error": "No task provided"}, status_code=400) started_at = datetime.now().isoformat() results = {} doc_file = None events = [] # orchestration event log def log(msg: str): events.append({"time": datetime.now().strftime("%H:%M:%S"), "msg": msg}) # ── STEP 1: Manager plans ────────────────────────────────────────────── log("Manager analyzing task...") manager_agent = agent_registry["manager"] try: manager_raw = await call_agent_llm(manager_agent, task) delegates = parse_delegation(manager_raw) manager_msg = clean_text(manager_raw) results["manager"] = { "status": "active", "message": manager_msg, "model": manager_agent["models"][0], "delegates": delegates, } log(f"Manager delegated to: {delegates or 'none'}") except Exception as e: results["manager"] = {"status": "resting", "message": str(e), "model": ""} delegates = [] log(f"Manager failed: {e}") # ── STEP 2: Run delegated agents ─────────────────────────────────────── image_bytes = [] writer_text = "" analyst_text = "" async def run_delegated(key: str): nonlocal writer_text, analyst_text if key not in agent_registry: log(f"Agent '{key}' not found in registry") return agent = agent_registry[key] log(f"{agent['name']} starting...") if key == "image_agent": try: img_prompt = f"Proporciona consultas de búsqueda de imágenes en inglés para ilustrar: {task}" raw = await call_agent_llm(agent, img_prompt) queries = parse_image_queries(raw) if not queries: queries = [task[:40]] log(f"ImageAgent searching: {queries}") imgs = await asyncio.gather(*[get_image_for_query(q) for q in queries[:3]]) for img in imgs: if img: image_bytes.append(img) results["image_agent"] = { "status": "active", "message": f"Encontradas {len(image_bytes)} imágenes para: {', '.join(queries[:3])}", "model": agent["models"][0], } log(f"ImageAgent: {len(image_bytes)} images found") except Exception as e: results["image_agent"] = {"status": "resting", "message": str(e), "model": ""} log(f"ImageAgent failed: {e}") elif key == "writer": try: writer_prompt = ( f"Redacta un informe formal y completo sobre: {task}\n\n" "Usa este formato:\n" "## Resumen Ejecutivo\n[contenido]\n\n" "### Introducción\n[contenido]\n\n" "### Desarrollo\n[contenido con subsecciones]\n\n" "### Conclusiones\n[contenido]\n\n" "### Recomendaciones\n[contenido]\n\n" "Sé formal, detallado y profesional." ) writer_text = await call_agent_llm(agent, writer_prompt) results["writer"] = { "status": "active", "message": writer_text[:200] + "..." if len(writer_text) > 200 else writer_text, "model": agent["models"][0], } log("Writer completed document") except Exception as e: results["writer"] = {"status": "resting", "message": str(e), "model": ""} log(f"Writer failed: {e}") elif key == "analyst": try: content_to_review = writer_text or manager_msg analyst_prompt = ( f"Revisa y analiza el siguiente contenido sobre: {task}\n\n" f"Contenido:\n{content_to_review[:1000]}\n\n" "Proporciona: 1) Evaluación de calidad, 2) Puntos fuertes, " "3) Áreas de mejora, 4) Conclusión final." ) analyst_text = await call_agent_llm(agent, analyst_prompt) results["analyst"] = { "status": "active", "message": analyst_text[:200] + "..." if len(analyst_text) > 200 else analyst_text, "model": agent["models"][0], } log("Analyst review completed") except Exception as e: results["analyst"] = {"status": "resting", "message": str(e), "model": ""} log(f"Analyst failed: {e}") else: # Generic agent try: raw = await call_agent_llm(agent, task) results[key] = {"status": "active", "message": raw, "model": agent["models"][0]} log(f"{agent['name']} completed") except Exception as e: results[key] = {"status": "resting", "message": str(e), "model": ""} # Run image_agent and writer in parallel, then analyst parallel_first = [k for k in delegates if k in ("writer", "image_agent")] sequential_after = [k for k in delegates if k == "analyst"] other = [k for k in delegates if k not in ("writer", "image_agent", "analyst")] if parallel_first or other: await asyncio.gather(*[run_delegated(k) for k in parallel_first + other]) if sequential_after: for k in sequential_after: await run_delegated(k) # ── STEP 3: Build .docx if writer was involved ───────────────────────── if writer_text: log("Assembling Word document...") try: doc_bytes = build_docx(task, {"writer": writer_text}, image_bytes, analyst_text) safe_name = re.sub(r'[^\w\-]', '_', task[:40]) doc_filename = f"{safe_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx" (DOCS_DIR / doc_filename).write_bytes(doc_bytes) doc_file = doc_filename results["manager"]["doc_file"] = doc_filename log(f"Document ready: {doc_filename}") except Exception as e: log(f"Document build failed: {e}") # Mark idle agents for key in agent_registry: if key not in results: results[key] = {"status": "idle", "message": "", "model": ""} final = results.get("manager", {}).get("message", "")[:300] entry = { "id": len(mission_history) + 1, "task": task, "started_at": started_at, "ended_at": datetime.now().isoformat(), "results": results, "final": final, "doc_file": doc_file, "events": events, } mission_history.append(entry) return JSONResponse({ "success": True, "task": task, "results": results, "final": final, "doc_file": doc_file, "events": events, "mission_id": entry["id"], }) @app.get("/api/health") async def health(): return { "status": "ok", "providers": { "gemini": "ok" if GOOGLE_API_KEY else "missing", "openrouter": "ok" if OPENROUTER_API_KEY else "missing", "groq": "ok" if GROQ_API_KEY else "missing", "pexels": "ok" if PEXELS_API_KEY else "missing (optional)", "hf_images": "ok" if HF_API_KEY else "missing (optional)", } }