test / main.py
vfven's picture
Update main.py
1462674 verified
import os, asyncio, httpx, re, json as _json
from io import BytesIO
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from pathlib import Path
from docx import Document as DocxDocument
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY","")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY","")
GROQ_API_KEY = os.getenv("GROQ_API_KEY","")
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY","")
HF_API_KEY = os.getenv("HF_TOKEN","")
DOCS_DIR = Path("docs")
DOCS_DIR.mkdir(exist_ok=True)
PROVIDERS = {
"gemini": {"name":"Google Gemini","type":"gemini","key":GOOGLE_API_KEY},
"openrouter": {"name":"OpenRouter","type":"openai_compat","key":OPENROUTER_API_KEY,
"base_url":"https://openrouter.ai/api/v1/chat/completions",
"headers":{"HTTP-Referer":"https://huggingface.co/spaces/vfven/mission-control-ui","X-Title":"Mission Control AI"}},
"groq": {"name":"Groq","type":"openai_compat","key":GROQ_API_KEY,
"base_url":"https://api.groq.com/openai/v1/chat/completions","headers":{}},
}
DEFAULT_AGENTS = [
{"key":"manager","name":"Manager","provider":"gemini",
"role":(
"Eres el gerente de proyecto. Analiza la solicitud y decide qué agentes trabajarán. "
"NUNCA hagas el trabajo tú mismo. Saluda en 1 línea y delega siempre con JSON al final: "
'{"delegate":["key1","key2"]}\n'
"REGLAS:\n"
"- imagen/foto/gato/perro/dibujo → image_agent\n"
"- informe/reporte/word/documento → writer + analyst\n"
"- excel/planilla/hoja/spreadsheet/registrar → backend_dev\n"
"- python/script/groovy/jenkins/api/devops → backend_dev\n"
"- html/css/web/interfaz/frontend → frontend_dev\n"
"- app completa full-stack → backend_dev + frontend_dev\n"
"- analisis/viabilidad/evaluar → analyst"
),
"models":["gemini-2.5-flash-preview-04-17","gemini-2.0-flash","gemini-1.5-flash"]},
{"key":"backend_dev","name":"Backend","provider":"openrouter",
"role":(
"Eres programador backend senior. REGLAS ABSOLUTAS:\n"
"1. Entrega SOLO el código pedido, sin explicaciones innecesarias.\n"
"2. Excel/planilla → responde con EXCEL_TEMPLATE:{\"title\":\"...\",\"sheet_name\":\"...\","
"\"headers\":[...],\"sample_rows\":[[...],[...]]}\n"
"3. Python → entrega código Python puro y funcional.\n"
"4. Groovy/Jenkins → entrega el script completo.\n"
"5. Si hay frontend_dev en el equipo, TÚ haces servidor/backend, él hace HTML.\n"
"6. Si la tarea no requiere backend → responde: {\"skip\":\"no backend needed\"}"
),
"models":["google/gemma-3-27b-it:free","google/gemma-3-12b-it:free",
"meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]},
{"key":"frontend_dev","name":"Frontend","provider":"openrouter",
"role":(
"Eres desarrollador frontend senior. REGLAS ABSOLUTAS:\n"
"1. Entrega SOLO código HTML/CSS/JS pedido, sin explicaciones innecesarias.\n"
"2. Si hay backend_dev, TÚ haces HTML/interfaz, él hace servidor/lógica.\n"
"3. Si la tarea NO requiere frontend → responde: {\"skip\":\"no frontend needed\"}\n"
"4. Entrega siempre HTML completo y funcional con los estilos incluidos."
),
"models":["google/gemma-3-12b-it:free","google/gemma-3-27b-it:free",
"meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]},
{"key":"analyst","name":"Analyst","provider":"openrouter",
"role":(
"Eres analista de negocios. REGLAS:\n"
"1. Solo haz lo que el manager delegó: revisar documentos, evaluar viabilidad, analizar riesgos.\n"
"2. NUNCA describas imágenes ni hagas trabajo de otros agentes.\n"
"3. Si la tarea no requiere análisis → responde: {\"skip\":\"no analysis needed\"}"
),
"models":["google/gemma-3-27b-it:free","google/gemma-3-12b-it:free",
"meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]},
{"key":"writer","name":"Writer","provider":"openrouter",
"role":(
"Eres redactor experto. Escribe SOLO contenido real y extenso (500+ palabras). "
"Sin placeholders. Usa ## para secciones y ### para subsecciones. "
"Secciones: ## Resumen Ejecutivo, ### Introducción, ### Desarrollo, "
"### Hallazgos, ### Conclusiones, ### Recomendaciones"
),
"models":["google/gemma-3-12b-it:free","google/gemma-3-27b-it:free",
"meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]},
{"key":"image_agent","name":"ImageAgent","provider":"gemini",
"role":(
"Cuando se te pida imágenes, responde SOLO con: "
"{\"image_queries\":[\"english term 1\",\"english term 2\",\"english term 3\"]} "
"Los términos deben ser específicos en inglés para encontrar buenas imágenes."
),
"models":["gemini-2.0-flash","gemini-1.5-flash"]},
]
# ── CHAT ROLES — conversational versions for direct chat (not mission mode) ──
CHAT_ROLES = {
"manager": (
"Eres el Manager de Mission Control AI, gerente de proyectos con acceso real a un equipo de agentes IA. "
"Tu equipo: backend_dev (Python/APIs), frontend_dev (HTML/CSS/JS), analyst (análisis), "
"writer (redacción), image_agent (imágenes/arte). "
"REGLA CRÍTICA: Si el usuario pide algo que requiere trabajo real de un agente "
"(generar imagen, escribir código, hacer Excel, crear informe, diseñar web, etc.), "
"responde ÚNICAMENTE con este JSON y nada más: "
'{"action":"delegate","task":"descripcion clara de lo que se necesita"} '
"Ejemplos que SÍ requieren delegar: "
"'genera una imagen', 'hazme un script', 'crea un formulario HTML', 'haz un informe', 'excel de ventas'. "
"Para conversación normal (saludos, preguntas, planificación, consejos) responde con texto normal. "
"Recuerda el historial y refiérete a él."
),
"backend_dev": (
"Eres Backend Dev, un programador senior especializado en Python, APIs, bases de datos y DevOps. "
"Puedes conversar libremente, explicar código, debuggear problemas, sugerir arquitecturas. "
"Cuando el usuario pide código, entrégalo limpio y funcional. "
"Recuerda el contexto de la conversación — si antes hablaron de un proyecto, continúa desde ahí."
),
"frontend_dev": (
"Eres Frontend Dev, un desarrollador web senior especializado en HTML, CSS, JavaScript y UX. "
"Puedes conversar libremente, revisar diseños, sugerir mejoras visuales, escribir código frontend. "
"Recuerda el historial — si el usuario ya te mostró algo, refiérete a ello."
),
"analyst": (
"Eres el Analyst, un analista de negocios y datos experimentado. "
"Puedes analizar situaciones, evaluar riesgos, hacer proyecciones, discutir estrategias. "
"Conversa de forma natural. Usa los datos y contexto que el usuario te ha dado antes en esta conversación."
),
"writer": (
"Eres Writer, un redactor y escritor creativo experto. "
"Puedes redactar textos, corregir gramática, brainstormear ideas, adaptar tonos y estilos. "
"Habla de forma fluida y creativa. Si el usuario ya compartió algo antes, úsalo como base."
),
"image_agent": (
"Eres ImageAgent, especialista en imágenes y arte visual con IA. "
"REGLA CRÍTICA: Si el usuario pide generar, crear, buscar, dibujar o mostrar UNA imagen o foto de CUALQUIER cosa, "
"responde ÚNICAMENTE con este JSON sin ningún texto adicional: "
'{"action":"generate_image","queries":["detailed english prompt 1","detailed english prompt 2","english prompt 3"]} '
"Los queries deben ser DESCRIPTIVOS en inglés (colores, estilo, composición). Ej: "
'{"action":"generate_image","queries":["flying cat with angel wings pastel colors","cute cat flying through clouds digital art","cat with wings fantasy illustration"]} '
"Para preguntas sobre fotografía, diseño o arte SIN pedir imágenes, conversa normalmente."
),
}
def get_chat_role(agent_key: str, agent: dict) -> str:
"""Return conversational role for chat mode."""
base = CHAT_ROLES.get(agent_key, "")
if base:
return base
# Custom agents fallback
return (
f"Eres {agent['name']}, un asistente especializado. "
f"Tu especialidad: {agent.get('role','propósito general')}. "
"Conversa de forma natural y recuerda el historial de esta conversación."
)
agent_registry = {a["key"]: dict(a) for a in DEFAULT_AGENTS}
mission_history = []
async def call_gemini(model,system,user,key):
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}"
async with httpx.AsyncClient(timeout=90) as c:
r = await c.post(url,json={"contents":[{"role":"user","parts":[{"text":f"{system}\n\n{user}"}]}],"generationConfig":{"maxOutputTokens":2048,"temperature":0.4}})
r.raise_for_status()
return r.json()["candidates"][0]["content"]["parts"][0]["text"]
async def call_compat(base_url,model,system,user,key,headers):
h = {"Authorization":f"Bearer {key}","Content-Type":"application/json",**headers}
async with httpx.AsyncClient(timeout=90) as c:
r = await c.post(base_url,json={"model":model,"messages":[{"role":"system","content":system},{"role":"user","content":user}],"max_tokens":2048,"temperature":0.4},headers=h)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
async def call_compat_multiturn(base_url, model, system, messages, key, extra_headers):
"""OpenAI-compatible chat with full message history for multi-turn conversations."""
h = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra_headers}
payload = {
"model": model,
"messages": [{"role": "system", "content": system}] + messages,
"max_tokens": 2048,
"temperature": 0.6,
}
async with httpx.AsyncClient(timeout=90) as c:
r = await c.post(base_url, json=payload, headers=h)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
async def call_gemini_multiturn(model, system, messages, key):
"""Gemini multi-turn conversation."""
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}"
# Convert messages to Gemini format
contents = []
for m in messages:
role = "user" if m["role"] == "user" else "model"
contents.append({"role": role, "parts": [{"text": m["content"]}]})
# Prepend system as first user message if contents start with model
full_system = system + "\n\n" + (contents[0]["parts"][0]["text"] if contents and contents[0]["role"] == "user" else "")
if contents and contents[0]["role"] == "user":
contents[0]["parts"][0]["text"] = full_system
payload = {
"contents": contents,
"generationConfig": {"maxOutputTokens": 2048, "temperature": 0.6},
}
async with httpx.AsyncClient(timeout=90) as c:
r = await c.post(url, json=payload)
r.raise_for_status()
return r.json()["candidates"][0]["content"]["parts"][0]["text"]
async def call_llm_multiturn(agent, messages):
"""Multi-turn LLM call with full conversation history. Cascades through providers."""
system = agent["role"]
last_err = None
# 1. Primary provider
p = PROVIDERS[agent["provider"]]
for m in agent["models"]:
try:
if p["type"] == "gemini":
return await call_gemini_multiturn(m, system, messages, p["key"])
else:
return await call_compat_multiturn(p["base_url"], m, system, messages,
p["key"], p.get("headers", {}))
except Exception as e:
last_err = str(e)
if is_rate_limit(last_err):
break
# 2. OpenRouter fallback (Gemma 3 first)
if OPENROUTER_API_KEY and agent["provider"] != "openrouter":
or_prov = PROVIDERS["openrouter"]
for m in ["google/gemma-3-27b-it:free", "google/gemma-3-12b-it:free",
"meta-llama/llama-3.3-70b-instruct:free",
"mistralai/mistral-small-3.1-24b-instruct:free"]:
try:
return await call_compat_multiturn(or_prov["base_url"], m, system, messages,
or_prov["key"], or_prov.get("headers", {}))
except Exception as e:
last_err = str(e)
if is_rate_limit(last_err):
break
# 3. Groq fallback
if GROQ_API_KEY and agent["provider"] != "groq":
groq = PROVIDERS["groq"]
for m in ["llama-3.1-8b-instant", "gemma2-9b-it"]:
try:
return await call_compat_multiturn(groq["base_url"], m, system, messages,
GROQ_API_KEY, {})
except Exception as e:
last_err = str(e)
raise Exception(f"All providers exhausted. Last: {last_err}")
def is_rate_limit(err: str) -> bool:
e = err.lower()
return any(x in e for x in ["429","rate limit","quota","resource exhausted","too many requests","ratelimit"])
async def call_llm(agent, task):
"""Provider cascade: Primary → OpenRouter → Groq"""
system = agent["role"]
last_err = None
# 1. Primary provider
p = PROVIDERS[agent["provider"]]
for m in agent["models"]:
try:
if p["type"] == "gemini":
return await call_gemini(m, system, task, p["key"])
else:
return await call_compat(p["base_url"], m, system, task,
p["key"], p.get("headers", {}))
except Exception as e:
last_err = str(e)
if is_rate_limit(last_err):
break # rate limited — skip to next provider immediately
# 2. OpenRouter fallback
if OPENROUTER_API_KEY and agent["provider"] != "openrouter":
or_prov = PROVIDERS["openrouter"]
for m in [
"google/gemma-3-27b-it:free",
"google/gemma-3-12b-it:free",
"meta-llama/llama-3.3-70b-instruct:free",
"mistralai/mistral-small-3.1-24b-instruct:free",
"qwen/qwen3-4b:free",
"qwen/qwen-2.5-72b-instruct:free",
"deepseek/deepseek-r1-distill-llama-70b:free",
]:
try:
return await call_compat(or_prov["base_url"], m, system, task,
or_prov["key"], or_prov.get("headers", {}))
except Exception as e:
last_err = str(e)
if is_rate_limit(last_err):
break
# 3. Groq fallback
if GROQ_API_KEY and agent["provider"] != "groq":
for m in ["llama-3.1-8b-instant", "gemma2-9b-it"]:
try:
return await call_compat(PROVIDERS["groq"]["base_url"], m, system, task,
GROQ_API_KEY, {})
except Exception as e:
last_err = str(e)
raise Exception(f"All providers exhausted. Last: {last_err}")
async def fetch_pexels(q):
if not PEXELS_API_KEY: return None
try:
async with httpx.AsyncClient(timeout=20) as c:
r = await c.get("https://api.pexels.com/v1/search",params={"query":q,"per_page":1,"orientation":"landscape"},headers={"Authorization":PEXELS_API_KEY})
d = r.json()
if d.get("photos"):
ir = await c.get(d["photos"][0]["src"]["medium"])
return ir.content
except: pass
return None
async def gen_hf_image(prompt: str) -> bytes | None:
"""Try HuggingFace free image generation models. FLUX first (best quality)."""
if not HF_API_KEY:
return None
# Ordered by quality/availability — FLUX.1 schnell is fastest free model
models = [
("black-forest-labs/FLUX.1-schnell", {"inputs": prompt}),
("black-forest-labs/FLUX.1-dev", {"inputs": prompt}),
("stabilityai/stable-diffusion-xl-base-1.0", {"inputs": prompt, "parameters": {"width": 512, "height": 512}}),
("stabilityai/stable-diffusion-2-1", {"inputs": prompt, "parameters": {"width": 512, "height": 512}}),
]
headers = {"Authorization": f"Bearer {HF_API_KEY}"}
for model_id, payload in models:
try:
async with httpx.AsyncClient(timeout=120) as c:
r = await c.post(
f"https://api-inference.huggingface.co/models/{model_id}",
headers=headers,
json=payload,
)
ct = r.headers.get("content-type", "")
if r.status_code == 200 and ("image" in ct or r.content[:4] in (b"\xff\xd8\xff\xe0", b"\x89PNG")):
return r.content
# 503/loading → try next model immediately
if r.status_code in (503, 500):
continue
# 429 rate limit → stop trying HF
if r.status_code == 429:
break
except Exception:
continue
return None
CREATIVE_KEYWORDS = [
"flying","volador","volando","astronaut","astronauta","dragon","dragón",
"fantasy","fantasia","magical","mágico","cartoon","anime","pixel art",
"robot","alien","extraterrestre","superhero","unicorn","unicornio",
"watercolor","illustration","ilustración","3d","render","sci-fi",
"futuristic","futurista","cyberpunk","cute","kawaii",
"gato volador","flying cat","space cat","gato espacial",
"imagina","genera","crea","diseña","dibuja",
]
def is_creative(q: str) -> bool:
return any(k in q.lower() for k in CREATIVE_KEYWORDS)
async def get_image(q: str, force_generate: bool = False):
"""Fetch or generate an image for query q.
force_generate=True: skip Pexels, go straight to HF image generation.
"""
if force_generate or is_creative(q):
result = await gen_hf_image(q)
if result: return result
# Real-world → Pexels first, AI fallback
return await fetch_pexels(q) or await gen_hf_image(q)
def classify(task):
lo = task.lower()
return {
"img": any(w in lo for w in ["imagen","image","foto","picture","gato","cat","dog","perro","dibuja","genera una imagen","crea una imagen","ilustra"]),
"excel": any(w in lo for w in ["excel","xlsx","planilla","hoja de calculo","hoja de cálculo","spreadsheet","registro de","registrar alumnos","tabla de alumnos","plantilla de"]),
"word": any(w in lo for w in ["informe","reporte","report","documento word","docx","redacta un informe","escribe un informe"]),
"anal": any(w in lo for w in ["analiza","evalua","evalúa","viabilidad","riesgo","revisar"]),
"back": any(w in lo for w in ["python","script","groovy","jenkins","api","backend","fastapi","flask","devops","pipeline",".py"]),
"front": any(w in lo for w in ["html","css","frontend","web page","página web","pagina web","sitio web","interfaz web"]),
"both": any(w in lo for w in ["hola mundo","full stack","fullstack","app completa"]) and any(w in lo for w in ["python","backend",".py"]) and any(w in lo for w in ["html","web","ver en","interfaz"]),
}
def parse_delegates(text):
m = re.search(r'\{"delegate"\s*:\s*\[([^\]]*)\]\}',text)
return re.findall(r'"(\w+)"',m.group(1)) if m else []
def clean(text):
text = re.sub(r'\{"delegate"[^}]*\}','',text)
text = re.sub(r'\{"image_queries"[^}]*\}','',text)
return text.strip()
def is_skip(text): return '"skip"' in text.lower()
def build_excel(task,backend_text):
m = re.search(r'EXCEL_TEMPLATE:\s*(\{.*?\})\s*$',backend_text,re.DOTALL|re.MULTILINE)
if not m:
m = re.search(r'EXCEL_TEMPLATE:\s*(\{.*)',backend_text,re.DOTALL)
if not m: return None
try: structure = _json.loads(m.group(1))
except: return None
wb = openpyxl.Workbook(); ws = wb.active
ws.title = structure.get("sheet_name","Datos")[:31]
headers = structure.get("headers",[])
rows = structure.get("sample_rows",[])
title = structure.get("title",task[:50])
thin = Side(style="thin",color="D1D5DB")
bdr = Border(left=thin,right=thin,top=thin,bottom=thin)
row_off = 1
if title and headers:
ws.merge_cells(f"A1:{chr(64+len(headers))}1")
c = ws["A1"]; c.value=title
c.font=Font(bold=True,size=13,color="FFFFFF")
c.fill=PatternFill("solid",fgColor="1a56db")
c.alignment=Alignment(horizontal="center",vertical="center")
ws.row_dimensions[1].height=28; row_off=2
for col,h in enumerate(headers,1):
c=ws.cell(row=row_off,column=col,value=h)
c.font=Font(bold=True,color="FFFFFF",size=10)
c.fill=PatternFill("solid",fgColor="2563eb")
c.alignment=Alignment(horizontal="center",vertical="center")
c.border=bdr
ws.column_dimensions[c.column_letter].width=max(len(str(h))+6,14)
ws.row_dimensions[row_off].height=20
alt=PatternFill("solid",fgColor="EFF6FF")
for ri,row in enumerate(rows,row_off+1):
for col,val in enumerate(row,1):
c=ws.cell(row=ri,column=col,value=val)
c.border=bdr; c.alignment=Alignment(vertical="center")
if ri%2==0: c.fill=alt
ws.row_dimensions[ri].height=18
ws.freeze_panes=f"A{row_off+1}"
buf=BytesIO(); wb.save(buf); buf.seek(0)
return buf.read()
def build_docx(title,writer_text,images,analyst_text):
doc=DocxDocument()
for s in doc.sections: s.top_margin=s.bottom_margin=Inches(1); s.left_margin=s.right_margin=Inches(1.2)
tp=doc.add_heading(title,0); tp.alignment=WD_ALIGN_PARAGRAPH.CENTER
if tp.runs: tp.runs[0].font.color.rgb=RGBColor(0x1a,0x56,0xdb)
sub=doc.add_paragraph(); sub.alignment=WD_ALIGN_PARAGRAPH.CENTER
sub.add_run(f"Mission Control AI — {datetime.now().strftime('%B %d, %Y')}").italic=True
doc.add_paragraph()
p=doc.add_paragraph(); pPr=p._p.get_or_add_pPr(); pBdr=OxmlElement("w:pBdr")
bot=OxmlElement("w:bottom"); bot.set(qn("w:val"),"single"); bot.set(qn("w:sz"),"6"); bot.set(qn("w:color"),"1a56db")
pBdr.append(bot); pPr.append(pBdr)
ii=0; pending=[]
def flush():
nonlocal pending
t=" ".join(pending).strip()
if t: p2=doc.add_paragraph(t); p2.paragraph_format.space_after=Pt(6)
pending.clear()
for line in writer_text.split("\n"):
s=line.strip()
if not s: flush(); continue
if s.startswith("## "):
flush(); doc.add_heading(s[3:],level=1)
if ii<len(images) and images[ii]:
try: doc.add_picture(BytesIO(images[ii]),width=Inches(5)); doc.paragraphs[-1].alignment=WD_ALIGN_PARAGRAPH.CENTER; ii+=1
except: pass
elif s.startswith("### "): flush(); doc.add_heading(s[4:],level=2)
elif s.startswith("- ") or s.startswith("* "): flush(); doc.add_paragraph(s[2:],style="List Bullet")
else: pending.append(s)
flush()
while ii<len(images):
if images[ii]:
try: doc.add_picture(BytesIO(images[ii]),width=Inches(5)); doc.paragraphs[-1].alignment=WD_ALIGN_PARAGRAPH.CENTER
except: pass
ii+=1
if analyst_text and not is_skip(analyst_text):
doc.add_page_break(); doc.add_heading("Análisis y Revisión",level=1)
for line in analyst_text.split("\n"):
if line.strip(): doc.add_paragraph(line.strip())
fp=doc.add_paragraph(); fp.alignment=WD_ALIGN_PARAGRAPH.CENTER
fr=fp.add_run("— Generado por Mission Control AI —"); fr.italic=True; fr.font.size=Pt(9); fr.font.color.rgb=RGBColor(0x6b,0x72,0x80)
buf=BytesIO(); doc.save(buf); buf.seek(0)
return buf.read()
@app.get("/",response_class=HTMLResponse)
async def root(): return HTMLResponse(Path("templates/index.html").read_text())
@app.get("/api/agents")
async def get_agents(): return {"agents":[{"key":a["key"],"name":a["name"],"role":a["role"]} for a in agent_registry.values()]}
@app.post("/api/agents/add")
async def add_agent(request:Request):
body=await request.json(); key=re.sub(r'\W+','_',body.get("key","").lower().strip())
if not key: return JSONResponse({"error":"key required"},status_code=400)
agent_registry[key]={"key":key,"name":body.get("name",key.capitalize()),"role":body.get("role","General purpose agent."),"provider":body.get("provider","openrouter"),"models":body.get("models",["meta-llama/llama-3.3-70b-instruct:free"])}
return {"success":True,"agent":agent_registry[key]}
@app.delete("/api/agents/{key}")
async def del_agent(key:str):
if key in {"manager","backend_dev","frontend_dev","analyst"}: return JSONResponse({"error":"Cannot delete core agents"},status_code=400)
agent_registry.pop(key,None); return {"success":True}
@app.get("/api/history")
async def get_history(): return {"history":mission_history[-20:]}
@app.get("/api/docs/{filename}")
async def dl_doc(filename:str):
path=DOCS_DIR/filename
if not path.exists(): return JSONResponse({"error":"not found"},status_code=404)
ext=Path(filename).suffix.lower()
mt={".docx":"application/vnd.openxmlformats-officedocument.wordprocessingml.document",".xlsx":"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",".html":"text/html",".py":"text/plain",".groovy":"text/plain",".jpg":"image/jpeg",".png":"image/png"}.get(ext,"application/octet-stream")
return FileResponse(path,media_type=mt,filename=filename)
@app.post("/api/mission")
async def run_mission(request: Request):
body = await request.json()
task = body.get("task", "").strip()
if not task:
return JSONResponse({"error": "No task"}, status_code=400)
started = datetime.now().isoformat()
results = {}
events = []
doc_file = None
tc = classify(task)
def log(m):
events.append({"time": datetime.now().strftime("%H:%M:%S"), "msg": m})
# ── Simple mission detector — bypass Manager for obvious single-agent tasks ──
SIMPLE_RULES = [
(["imagen","image","foto","gato","cat","dog","perro","dibuja",
"genera imagen","crea imagen","genera una foto"], ["image_agent"]),
(["excel","xlsx","planilla","hoja de calculo","hoja de cálculo",
"registrar alumnos","registro de","tabla de"], ["backend_dev"]),
(["informe","reporte","redacta un informe","escribe un informe",
"escribe un reporte"], ["writer","analyst"]),
(["hola mundo","hello world"], ["backend_dev","frontend_dev"]),
]
def detect_simple(t):
lo = t.lower()
for keywords, agents in SIMPLE_RULES:
if any(k in lo for k in keywords):
return agents
return None
simple = detect_simple(task)
# ── Manager plans (only for complex tasks) ──────────────────────────────
_today = datetime.now().strftime("%A %d de %B de %Y, %H:%M")
if simple:
delegates = simple
results["manager"] = {
"status": "active",
"message": "Tarea directa → " + ", ".join(simple),
"model": "detector",
"delegates": delegates,
}
log("Simple detect → " + str(delegates))
else:
log("Manager planning...")
try:
mgr = dict(agent_registry["manager"])
mgr["role"] = "HOY ES: " + _today + ". " + mgr["role"]
mgr_raw = await call_llm(mgr, task)
mgr_del = parse_delegates(mgr_raw)
results["manager"] = {
"status": "active",
"message": clean(mgr_raw),
"model": agent_registry["manager"]["models"][0],
"delegates": mgr_del,
}
delegates = mgr_del
log("Manager delegates: " + str(delegates))
except Exception as e:
results["manager"] = {"status": "resting", "message": str(e), "model": ""}
delegates = []
log("Manager error: " + str(e))
# Safety net: add agents the classifier detects even if manager missed them
safety = [
("img", "image_agent"),
("excel", "backend_dev"),
("word", "writer"),
("anal", "analyst"),
("back", "backend_dev"),
("front", "frontend_dev"),
]
for flag, key in safety:
if tc[flag] and key not in delegates:
delegates.append(key)
if tc["both"]:
for k in ["backend_dev", "frontend_dev"]:
if k not in delegates:
delegates.append(k)
delegates = [k for k in delegates if k in agent_registry]
log("Final: " + str(delegates))
# ── Run agents ───────────────────────────────────────────────────────────
image_bytes = []
writer_text = ""
analyst_text = ""
backend_text = ""
frontend_text = ""
async def run_one(key):
nonlocal writer_text, analyst_text, backend_text, frontend_text
agent = agent_registry[key]
log(agent["name"] + " working...")
try:
if key == "image_agent":
p = (
"The user wants: " + task + ". "
"Give 3 specific English image search terms. "
"If creative/fantastical make descriptive AI generation prompts. "
'Respond ONLY: {"image_queries":["t1","t2","t3"]}'
)
raw = await call_llm(agent, p)
m2 = re.search(r'"image_queries"\s*:\s*\[([^\]]*)\]', raw)
queries = re.findall(r'"([^"]+)"', m2.group(1)) if m2 else [task[:50]]
imgs = await asyncio.gather(*[get_image(q) for q in queries[:3]])
for img in imgs:
if img: image_bytes.append(img)
safe2 = re.sub(r"[^\w]", "_", task[:30])
if image_bytes:
for i, ib in enumerate(image_bytes):
(DOCS_DIR / (safe2 + "_img" + str(i+1) + ".jpg")).write_bytes(ib)
results["image_agent"] = {
"status": "active",
"message": str(len(image_bytes)) + " image(s): " + ", ".join(queries[:3]),
"model": agent["models"][0],
"doc_file": (safe2 + "_img1.jpg") if image_bytes else None,
"img_count": len(image_bytes),
"img_base": safe2,
}
log("ImageAgent: " + str(len(image_bytes)))
elif key == "writer":
ctx = results.get("manager", {}).get("message", "")[:300]
p = (
"Escribe informe formal completo sobre: " + task + "\n"
"Contexto del Manager: " + ctx + "\n"
"500+ palabras reales. Secciones: "
"## Resumen Ejecutivo, ### Introduccion, ### Desarrollo, "
"### Hallazgos, ### Conclusiones, ### Recomendaciones"
)
writer_text = await call_llm(agent, p)
results["writer"] = {
"status": "active",
"message": writer_text[:200] + "...",
"model": agent["models"][0],
}
log("Writer done")
elif key == "analyst":
content = writer_text or results.get("manager", {}).get("message", task)
p = (
"Tarea: " + task + "\n"
"Contenido:\n" + content[:1500] + "\n"
"Evalua: 1) Calidad, 2) Puntos fuertes, 3) Areas de mejora, 4) Conclusion."
)
analyst_text = await call_llm(agent, p)
if is_skip(analyst_text):
results["analyst"] = {"status": "idle", "message": "", "model": ""}
else:
results["analyst"] = {
"status": "active",
"message": analyst_text[:200] + "...",
"model": agent["models"][0],
}
log("Analyst done")
elif key == "backend_dev":
has_fe = "frontend_dev" in delegates
collab = " El Frontend Dev hara la interfaz. Tu solo backend/logica." if has_fe else ""
if tc["excel"]:
p = (
"Crea plantilla Excel para: " + task + "\n"
"Responde SOLO con:\n"
'EXCEL_TEMPLATE:{"title":"...","sheet_name":"...",'
'"headers":["Col1","Col2"],"sample_rows":[["v1","v2"],["v1","v2"],["v1","v2"]]}'
)
else:
p = "Tarea: " + task + collab + "\nEntrega SOLO el codigo solicitado."
backend_text = await call_llm(agent, p)
if is_skip(backend_text):
results["backend_dev"] = {"status": "idle", "message": "", "model": ""}
else:
results["backend_dev"] = {
"status": "active",
"message": backend_text[:300] + ("..." if len(backend_text) > 300 else ""),
"model": agent["models"][0],
}
log("Backend done")
elif key == "frontend_dev":
ctx = backend_text[:500] if backend_text else ""
p = (
"Tarea: " + task + "\n"
+ ("Backend hizo:\n" + ctx + "\n" if ctx else "")
+ "Entrega SOLO HTML/CSS/JS completo. "
+ 'Si no se necesita frontend: {"skip":"no frontend needed"}'
)
frontend_text = await call_llm(agent, p)
if is_skip(frontend_text):
results["frontend_dev"] = {"status": "idle", "message": "", "model": ""}
else:
results["frontend_dev"] = {
"status": "active",
"message": frontend_text[:300] + ("..." if len(frontend_text) > 300 else ""),
"model": agent["models"][0],
}
log("Frontend done")
else:
raw = await call_llm(agent, task)
results[key] = {"status": "active", "message": raw, "model": agent["models"][0]}
log(agent["name"] + " done")
except Exception as e:
results[key] = {"status": "resting", "message": str(e), "model": ""}
log(key + " error: " + str(e))
# Execution order
be = [k for k in delegates if k == "backend_dev"]
par = [k for k in delegates if k in ("image_agent", "writer")]
seq = [k for k in delegates if k in ("frontend_dev", "analyst")]
oth = [k for k in delegates if k not in be + par + seq]
for k in be:
await run_one(k)
if par + oth:
await asyncio.gather(*[run_one(k) for k in par + oth])
for k in seq:
await run_one(k)
# ── Build output files ───────────────────────────────────────────────────
safe = re.sub(r"[^\w\-]", "_", task[:40])
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
if writer_text:
try:
db = build_docx(task, writer_text, image_bytes, analyst_text)
fn = safe + "_" + ts + ".docx"
(DOCS_DIR / fn).write_bytes(db)
doc_file = fn
results["manager"]["doc_file"] = fn
log("Docx: " + fn)
except Exception as e:
log("Docx error: " + str(e))
if backend_text and tc["excel"]:
try:
xb = build_excel(task, backend_text)
if xb:
fn = safe + "_" + ts + ".xlsx"
(DOCS_DIR / fn).write_bytes(xb)
doc_file = fn
results["backend_dev"]["doc_file"] = fn
log("Excel: " + fn)
except Exception as e:
log("Excel error: " + str(e))
if backend_text and not tc["excel"] and not is_skip(backend_text):
try:
code = re.sub(r"```\w*\n?", "", backend_text)
code = re.sub(r"```", "", code).strip()
if len(code) > 30:
ext = ".groovy" if ("groovy" in backend_text.lower() or "jenkins" in task.lower()) else ".py"
fn = safe + "_" + ts + ext
(DOCS_DIR / fn).write_text(code, encoding="utf-8")
results["backend_dev"]["doc_file"] = fn
log("Code: " + fn)
except Exception as e:
log("Code error: " + str(e))
if frontend_text and not is_skip(frontend_text):
try:
html = re.sub(r"```\w*\n?", "", frontend_text)
html = re.sub(r"```", "", html).strip()
if len(html) > 30:
fn = safe + "_frontend_" + ts + ".html"
(DOCS_DIR / fn).write_text(html, encoding="utf-8")
results["frontend_dev"]["doc_file"] = fn
if not doc_file:
doc_file = fn
log("HTML: " + fn)
except Exception as e:
log("HTML error: " + str(e))
for k in agent_registry:
if k not in results:
results[k] = {"status": "idle", "message": "", "model": ""}
final = results.get("manager", {}).get("message", "")[:300]
entry = {
"id": len(mission_history) + 1,
"task": task,
"started_at": started,
"ended_at": datetime.now().isoformat(),
"results": results,
"final": final,
"doc_file": doc_file,
"events": events,
}
mission_history.append(entry)
# Cache agent outputs so direct chat can reference them
for _k, _r in results.items():
if _r.get("status") == "active" and _r.get("message"):
mission_context_cache[_k] = (
"En una mision reciente trabajaste en: " + task[:80] + "\n"
+ _r["message"][:600]
)
return JSONResponse({
"success": True,
"task": task,
"results": results,
"final": final,
"doc_file": doc_file,
"events": events,
"mission_id": entry["id"],
})
# ── SHARED MISSION CONTEXT (injected into chat after missions) ─────────────
# Stores last mission output per agent so chat knows what was done
mission_context_cache: dict = {} # {agent_key: "last output summary"}
chat_sessions: dict = {} # {session_id: [{role, content}]}
@app.post("/api/chat")
async def chat_with_agent(request: Request):
body = await request.json()
agent_key = body.get("agent", "").strip()
message = body.get("message", "").strip()
session_id = body.get("session_id", agent_key)
clear = body.get("clear", False)
if not agent_key or not message:
return JSONResponse({"error": "agent and message required"}, status_code=400)
if agent_key not in agent_registry:
return JSONResponse({"error": f"Agent '{agent_key}' not found"}, status_code=404)
if clear:
chat_sessions[session_id] = []
if session_id not in chat_sessions:
chat_sessions[session_id] = []
# Build message history (last 20 turns)
history = list(chat_sessions[session_id][-40:])
history.append({"role": "user", "content": message})
# Build system role: conversational + inject mission context if available
_today = datetime.now().strftime("%A %d de %B de %Y, %H:%M")
base_role = get_chat_role(agent_key, agent_registry[agent_key])
prev_work = mission_context_cache.get(agent_key, "")
if prev_work:
base_role += (
"\n\n--- CONTEXTO DE MISIONES ANTERIORES ---\n"
+ prev_work +
"\nPuedes referenciar este trabajo cuando el usuario haga preguntas o pida ajustes."
)
agent = dict(agent_registry[agent_key])
agent["role"] = f"HOY ES: {_today}.\n{base_role}"
try:
response = await call_llm_multiturn(agent, history)
# Intercept special actions from agents
img_result = None
img_files = []
delegate_result = None
import json as _json2
try:
parsed = _json2.loads(response.strip())
# Manager delegates to team
if isinstance(parsed, dict) and parsed.get("action") == "delegate" and agent_key == "manager":
delegate_task = parsed.get("task", message)
# Run as a collab mission
sub_results = {}
sub_context = "TAREA: " + delegate_task + "\n"
sub_today = datetime.now().strftime("%A %d de %B de %Y, %H:%M")
# Detect which agents to use
lo = delegate_task.lower()
sub_delegates = []
if any(w in lo for w in ["imagen","image","foto","dibujo","genera","crea una imagen"]):
sub_delegates = ["image_agent"]
elif any(w in lo for w in ["excel","planilla","spreadsheet"]):
sub_delegates = ["backend_dev"]
elif any(w in lo for w in ["informe","reporte","documento"]):
sub_delegates = ["writer","analyst"]
elif any(w in lo for w in ["html","web","formulario","interfaz"]) and any(w in lo for w in ["api","backend","python"]):
sub_delegates = ["backend_dev","frontend_dev"]
elif any(w in lo for w in ["html","web","formulario","interfaz"]):
sub_delegates = ["frontend_dev"]
elif any(w in lo for w in ["python","api","script","backend","codigo","código"]):
sub_delegates = ["backend_dev"]
else:
sub_delegates = ["analyst"]
# Execute each sub-agent
sub_imgs = []
for sub_key in sub_delegates:
if sub_key not in agent_registry: continue
sub_agent = dict(agent_registry[sub_key])
sub_role = get_chat_role(sub_key, sub_agent)
sub_agent["role"] = "HOY ES: " + sub_today + ".\n" + sub_role + "\n\nContexto: " + sub_context
try:
if sub_key == "image_agent":
ip = ("Find images for: " + delegate_task +
' Respond ONLY: {"image_queries":["t1","t2","t3"]}')
sub_raw = await call_llm(sub_agent, ip)
m3 = re.search(r'"image_queries"\s*:\s*\[([^\]]*)\]', sub_raw)
qs = re.findall(r'"([^"]+)"', m3.group(1)) if m3 else [delegate_task[:40]]
sub_imgs_r = await asyncio.gather(*[get_image(q, force_generate=True) for q in qs[:3]])
_safe2 = re.sub(r"[^\w]","_",delegate_task[:28])
_ts2 = datetime.now().strftime("%H%M%S")
_base2 = "mgr_" + _safe2 + "_" + _ts2
for idx2, im2 in enumerate(sub_imgs_r):
if im2:
fn2 = _base2 + "_img" + str(idx2+1) + ".jpg"
(DOCS_DIR / fn2).write_bytes(im2)
sub_imgs.append(fn2)
sub_results[sub_key] = {
"status": "active",
"message": str(len(sub_imgs)) + " imagen(es) para: " + ", ".join(qs[:3]),
"img_base": _base2 if sub_imgs else None,
"img_count": len(sub_imgs),
}
sub_context += "\n=== IMAGE AGENT ===\n" + str(len(sub_imgs)) + " images\n"
else:
sub_prompt = sub_context + "\nINSTRUCCION: " + delegate_task
sub_resp = await call_llm_multiturn(sub_agent, [{"role":"user","content":sub_prompt}])
sub_results[sub_key] = {"status":"active","message":sub_resp,"model":sub_agent["models"][0]}
sub_context += "\n=== " + sub_key.upper() + " ===\n" + sub_resp[:500] + "\n"
# Save files
_ts3 = datetime.now().strftime("%Y%m%d_%H%M%S")
_safe3 = re.sub(r"[^\w\-]","_",delegate_task[:35])
if sub_key == "frontend_dev" and not is_skip(sub_resp):
html3 = re.sub(r"```\w*\n?","",sub_resp); html3 = re.sub(r"```","",html3).strip()
if len(html3)>80:
fn3 = _safe3+"_frontend_"+_ts3+".html"
(DOCS_DIR/fn3).write_text(html3,encoding="utf-8")
sub_results[sub_key]["doc_file"]=fn3
elif sub_key == "backend_dev" and not is_skip(sub_resp):
code3 = re.sub(r"```\w*\n?","",sub_resp); code3 = re.sub(r"```","",code3).strip()
if len(code3)>80:
fn3 = _safe3+"_backend_"+_ts3+".py"
(DOCS_DIR/fn3).write_text(code3,encoding="utf-8")
sub_results[sub_key]["doc_file"]=fn3
mission_context_cache[sub_key] = "En tarea reciente: " + delegate_task[:60] + "\n" + sub_resp[:400]
except Exception as sub_e:
sub_results[sub_key] = {"status":"resting","message":str(sub_e)}
# Build manager response text
parts = ["Delegué la tarea a mi equipo:"]
for k,r in sub_results.items():
parts.append("• " + k + ": " + (r.get("message","")[:120] or "done"))
response = "\n".join(parts)
delegate_result = {"sub_results": sub_results, "sub_imgs": sub_imgs,
"img_base": sub_results.get("image_agent",{}).get("img_base"),
"img_count": sub_results.get("image_agent",{}).get("img_count",0)}
# ImageAgent: generate image
elif isinstance(parsed, dict) and parsed.get("action") == "generate_image":
queries = parsed.get("queries", [message])[:3]
_safe = re.sub(r"[^\w]", "_", message[:28])
_ts = datetime.now().strftime("%H%M%S")
_base = "chat_" + _safe + "_" + _ts
imgs = await asyncio.gather(*[get_image(q, force_generate=True) for q in queries])
for i, img in enumerate(imgs):
if img:
fname = _base + "_img" + str(i+1) + ".jpg"
(DOCS_DIR / fname).write_bytes(img)
img_files.append(fname)
if img_files:
response = "✓ " + str(len(img_files)) + " imagen(es) generada(s) — haz clic para ver"
img_result = {"img_base": _base, "img_count": len(img_files)}
else:
response = "No pude generar la imagen. Verifica HF_TOKEN en los Secrets del Space."
except (ValueError, TypeError, KeyError):
pass
# Save turn
chat_sessions[session_id].append({"role": "user", "content": message})
chat_sessions[session_id].append({"role": "assistant", "content": response})
if len(chat_sessions[session_id]) > 100:
chat_sessions[session_id] = chat_sessions[session_id][-80:]
used_model = agent["models"][0] if agent.get("models") else ""
result = {
"success": True,
"agent": agent_key,
"response": response,
"model": used_model,
"turn": len(chat_sessions[session_id]) // 2,
}
if img_result:
result.update(img_result)
if delegate_result:
result["delegate_result"] = delegate_result
if delegate_result.get("img_base"):
result["img_base"] = delegate_result["img_base"]
result["img_count"] = delegate_result["img_count"]
return JSONResponse(result)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@app.delete("/api/chat/{session_id}")
async def clear_chat_session(session_id: str):
chat_sessions.pop(session_id, None)
return {"success": True}
@app.post("/api/collab")
async def collaborative_chat(request: Request):
"""
Multi-agent collaborative task from chat.
Manager reads the request and coordinates Backend + Frontend (or any combo)
in a pipeline with shared context. Returns each agent's output separately.
"""
body = await request.json()
task = body.get("task", "").strip()
agents = body.get("agents", []) # optional override: ["backend_dev","frontend_dev"]
if not task:
return JSONResponse({"error": "task required"}, status_code=400)
_today = datetime.now().strftime("%A %d de %B de %Y, %H:%M")
events = []
results = {}
def log(m):
events.append({"time": datetime.now().strftime("%H:%M:%S"), "msg": m})
# Manager decides which agents to use (unless caller specified them)
if agents:
delegates = [a for a in agents if a in agent_registry]
agent_tasks = {a: task for a in delegates}
log("Caller-specified agents: " + str(delegates))
else:
log("Manager planning collab task...")
mgr = dict(agent_registry["manager"])
mgr["role"] = (
"HOY ES: " + _today + ". " + mgr["role"] +
"\nResponde SOLO con JSON: "
'{"plan":[{"agent":"key","task":"instruccion especifica"}],"summary":"resumen"}'
"\nAgentes: " + ", ".join(agent_registry.keys())
)
try:
mgr_raw = await call_llm(mgr, task)
mgr_clean = re.sub(r"```json|```", "", mgr_raw).strip()
m = re.search(r"\{.*\}", mgr_clean, re.DOTALL)
plan_data = {}
if m:
try:
import json as _jj
plan_data = _jj.loads(m.group())
except Exception:
pass
plan_steps = plan_data.get("plan", [])
delegates = [s["agent"] for s in plan_steps if s.get("agent") in agent_registry]
agent_tasks = {s["agent"]: s.get("task", task) for s in plan_steps}
if not delegates:
delegates = parse_delegates(mgr_raw)
agent_tasks = {a: task for a in delegates}
results["manager"] = {
"status": "active",
"message": plan_data.get("summary", clean(mgr_raw)),
"model": agent_registry["manager"]["models"][0],
}
log("Manager plan: " + str(delegates))
except Exception as e:
log("Manager error: " + str(e))
delegates = []
agent_tasks = {}
if not delegates:
return JSONResponse({"error": "No agents to run"}, status_code=400)
# Run pipeline with shared context
shared_ctx = "TAREA: " + task + "\n"
for key in delegates:
if key not in agent_registry:
continue
agent = dict(agent_registry[key])
specific = agent_tasks.get(key, task)
chat_role = get_chat_role(key, agent)
# Inject previous context so agents can collaborate
agent["role"] = (
"HOY ES: " + _today + ".\n" + chat_role +
"\n\n--- CONTEXTO DEL EQUIPO ---\n" + shared_ctx +
"\nTu instruccion especifica: " + specific
)
log(key + " working...")
try:
agent_history = [{"role": "user", "content": specific}]
response = await call_llm_multiturn(agent, agent_history)
results[key] = {
"status": "active",
"message": response,
"model": agent["models"][0] if agent.get("models") else "",
}
# Accumulate context for next agents
shared_ctx += "\n=== " + key.upper() + " ===\n" + response[:800] + "\n"
# Save to mission cache so individual chat can reference it
mission_context_cache[key] = (
"En una tarea reciente hiciste lo siguiente para: " + task[:80] +
"\n" + response[:600]
)
log(key + " done")
# Save files if code was generated
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
safe = re.sub(r"[^\w\-]", "_", task[:35])
if key == "frontend_dev" and not is_skip(response):
html = re.sub(r"```\w*\n?", "", response)
html = re.sub(r"```", "", html).strip()
if len(html) > 80:
fn = safe + "_frontend_" + ts + ".html"
(DOCS_DIR / fn).write_text(html, encoding="utf-8")
results[key]["doc_file"] = fn
elif key == "backend_dev" and not is_skip(response):
code = re.sub(r"```\w*\n?", "", response)
code = re.sub(r"```", "", code).strip()
if len(code) > 80:
fn = safe + "_backend_" + ts + ".py"
(DOCS_DIR / fn).write_text(code, encoding="utf-8")
results[key]["doc_file"] = fn
except Exception as e:
results[key] = {"status": "resting", "message": str(e), "model": ""}
log(key + " error: " + str(e))
return JSONResponse({
"success": True,
"task": task,
"delegates": delegates,
"results": results,
"events": events,
})
@app.get("/api/archive")
async def list_archive():
files = []
for f in sorted(DOCS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if f.is_file() and not f.name.startswith('.'):
files.append({
"name": f.name,
"size": f.stat().st_size,
"modified": datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M"),
"ext": f.suffix.lower().lstrip('.'),
})
return {"files": files}
@app.delete("/api/archive/{filename}")
async def delete_archive_file(filename: str):
path = DOCS_DIR / filename
if path.exists() and path.is_file():
path.unlink()
return {"success": True}
@app.get("/api/health")
async def health():
return {"status":"ok","providers":{"gemini":"ok" if GOOGLE_API_KEY else "missing","openrouter":"ok" if OPENROUTER_API_KEY else "missing","groq":"ok" if GROQ_API_KEY else "missing","pexels":"ok" if PEXELS_API_KEY else "optional","hf_images":"ok" if HF_API_KEY else "optional"}}