import json import logging from typing import Any, Dict from agents.geometry_agent import GeometryAgent from agents.knowledge_agent import KnowledgeAgent from agents.ocr_agent import OCRAgent from agents.parser_agent import ParserAgent from agents.solver_agent import SolverAgent from app.logutil import log_step from app.ocr_celery import ocr_from_image_url from solver.dsl_parser import DSLParser from solver.engine import GeometryEngine logger = logging.getLogger(__name__) _CLIP = 2000 def _clip(val: Any, n: int = _CLIP) -> str | None: if val is None: return None if isinstance(val, str): s = val else: s = json.dumps(val, ensure_ascii=False, default=str) return s if len(s) <= n else s[:n] + "…" def _step_io(step: str, input_val: Any = None, output_val: Any = None) -> None: """Debug: chỉ input/output (đã cắt), tránh dump dài dòng không cần thiết.""" log_step(step, input=_clip(input_val), output=_clip(output_val)) class Orchestrator: def __init__(self): self.parser_agent = ParserAgent() self.geometry_agent = GeometryAgent() self.ocr_agent = OCRAgent() self.knowledge_agent = KnowledgeAgent() self.solver_agent = SolverAgent() self.solver_engine = GeometryEngine() self.dsl_parser = DSLParser() def _generate_step_description(self, semantic_json: Dict[str, Any], engine_result: Dict[str, Any]) -> str: """Tạo mô tả từng bước vẽ dựa trên kết quả của engine.""" analysis = semantic_json.get("analysis", "") if not analysis: analysis = f"Giải bài toán về {semantic_json.get('type', 'hình học')}." steps = ["\n\n**Các bước dựng hình:**"] drawing_phases = engine_result.get("drawing_phases", []) for phase in drawing_phases: label = phase.get("label", f"Giai đoạn {phase['phase']}") points = ", ".join(phase.get("points", [])) segments = ", ".join([f"{s[0]}{s[1]}" for s in phase.get("segments", [])]) step_text = f"- **{label}**:" if points: step_text += f" Xác định các điểm {points}." if segments: step_text += f" Vẽ các đoạn thẳng {segments}." steps.append(step_text) circles = engine_result.get("circles", []) for c in circles: steps.append(f"- **Đường tròn**: Vẽ đường tròn tâm {c['center']} bán kính {c['radius']}.") return analysis + "\n".join(steps) async def run( self, text: str, image_url: str = None, job_id: str = None, session_id: str = None, status_callback=None, history: list = None, ) -> Dict[str, Any]: """ Run the full pipeline. Optional history allows context-aware solving. """ _step_io( "orchestrate_start", input_val={ "job_id": job_id, "text_len": len(text or ""), "image_url": image_url, "history_len": len(history or []), }, output_val=None, ) if status_callback: await status_callback("processing") # 1. Extract context from history (if any) previous_context = None if history: # Look for the last assistant message with geometry data for msg in reversed(history): if msg.get("role") == "assistant" and msg.get("metadata", {}).get("geometry_dsl"): previous_context = { "geometry_dsl": msg["metadata"]["geometry_dsl"], "coordinates": msg["metadata"].get("coordinates", {}), "analysis": msg.get("content", ""), } break if previous_context: _step_io("context_found", input_val=None, output_val={"dsl_len": len(previous_context["geometry_dsl"])}) # 2. Gather input text (OCR or direct) input_text = text if image_url: input_text = await ocr_from_image_url(image_url, self.ocr_agent) _step_io("step1_ocr", input_val=image_url, output_val=input_text) else: _step_io("step1_ocr", input_val="(no image)", output_val=text) feedback = None MAX_RETRIES = 2 for attempt in range(MAX_RETRIES + 1): _step_io( "attempt", input_val=f"{attempt + 1}/{MAX_RETRIES + 1}", output_val=None, ) if status_callback: await status_callback("solving") # Parser with context _step_io("step2_parse", input_val=f"{input_text[:50]}...", output_val=None) semantic_json = await self.parser_agent.process(input_text, feedback=feedback, context=previous_context) semantic_json["input_text"] = input_text _step_io("step2_parse", input_val=None, output_val=semantic_json) # Knowledge augmentation _step_io("step3_knowledge", input_val=semantic_json, output_val=None) semantic_json = self.knowledge_agent.augment_semantic_data(semantic_json) _step_io("step3_knowledge", input_val=None, output_val=semantic_json) # Geometry DSL with context (passing previous DSL to guide generation) _step_io("step4_geometry_dsl", input_val=semantic_json, output_val=None) dsl_code = await self.geometry_agent.generate_dsl( semantic_json, previous_dsl=previous_context["geometry_dsl"] if previous_context else None ) _step_io("step4_geometry_dsl", input_val=None, output_val=dsl_code) _step_io("step5_dsl_parse", input_val=dsl_code, output_val=None) points, constraints, is_3d = self.dsl_parser.parse(dsl_code) _step_io( "step5_dsl_parse", input_val=None, output_val={ "points": len(points), "constraints": len(constraints), "is_3d": is_3d, }, ) _step_io("step6_solve", input_val=f"{len(points)} pts / {len(constraints)} cons (is_3d={is_3d})", output_val=None) import anyio engine_result = await anyio.to_thread.run_sync(self.solver_engine.solve, points, constraints, is_3d) if engine_result: coordinates = engine_result.get("coordinates") _step_io("step6_solve", input_val=None, output_val=coordinates) logger.info( "[Orchestrator] geometry solved job_id=%s is_3d=%s n_coords=%d", job_id, is_3d, len(coordinates) if isinstance(coordinates, dict) else 0, ) break feedback = "Geometry solver failed to find a valid solution for the given constraints. Parallelism or lengths might be inconsistent." _step_io( "step6_solve", input_val=f"attempt {attempt + 1}", output_val=feedback, ) if attempt == MAX_RETRIES: _step_io( "orchestrate_abort", input_val=None, output_val="solver_exhausted_retries", ) return { "error": "Solver failed after multiple attempts.", "last_dsl": dsl_code, } _step_io("orchestrate_done", input_val=job_id, output_val="success") # 8. Solution calculation (New in v5.1) solution = None if engine_result: _step_io("step8_solve_math", input_val=semantic_json.get("target_question"), output_val=None) solution = await self.solver_agent.solve(semantic_json, engine_result) _step_io("step8_solve_math", input_val=None, output_val=solution.get("answer")) final_analysis = self._generate_step_description(semantic_json, engine_result) status = "success" return { "status": status, "job_id": job_id, "geometry_dsl": dsl_code, "coordinates": coordinates, "polygon_order": engine_result.get("polygon_order", []), "circles": engine_result.get("circles", []), "lines": engine_result.get("lines", []), "rays": engine_result.get("rays", []), "drawing_phases": engine_result.get("drawing_phases", []), "semantic": semantic_json, "semantic_analysis": final_analysis, "solution": solution, "is_3d": is_3d, }