import os import json import time import logging import asyncio import threading import re import uuid from typing import Any, Dict, List, Optional, Tuple from collections import OrderedDict from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, JSONResponse, Response from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import g4f # ===================================================== # LOGGING # ===================================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("g4f-execution-engine") # ===================================================== # COOKIES # ===================================================== def _load_cookies_raw() -> Dict[str, Any]: raw_env = (os.getenv("COOKIES_JSON") or "").strip() if raw_env: try: return json.loads(raw_env) except Exception as e: logger.warning(f"Failed to load cookies from env: {e}") try: if os.path.exists("cookies.json"): with open("cookies.json", "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load cookies from file: {e}") return {} def load_cookies() -> str: data = _load_cookies_raw() if not data: return "⚠️ No Cookies" try: from g4f.cookies import set_cookies except Exception: return "⚠️ Cookies Found" for domain, vals in data.items(): try: dom = domain if "." in domain else f".{domain}.com" if isinstance(vals, list): vals = {x["name"]: x["value"] for x in vals if isinstance(x, dict)} if isinstance(vals, dict): set_cookies(dom, vals) except Exception as e: logger.warning(f"Cookie error for {domain}: {e}") return "✅ Cookies Loaded" COOKIE_STATUS = load_cookies() # ===================================================== # EXECUTION STATE MACHINE # ===================================================== class ExecutionState: IDLE = "idle" PLANNING = "planning" TOOL_PENDING = "tool_pending" TOOL_EXECUTING = "tool_executing" TOOL_DONE = "tool_done" AWAITING_NEXT = "awaiting_next" COMPLETED = "completed" class PendingToolCall: def __init__(self, name: str, arguments: Dict, tool_id: str, reason: str = ""): self.name = name self.arguments = arguments self.tool_id = tool_id self.reason = reason self.created_at = time.time() self.attempts = 0 self.max_attempts = 3 def to_dict(self) -> Dict: return { "name": self.name, "arguments": self.arguments, "tool_id": self.tool_id, "reason": self.reason, "created_at": self.created_at, "attempts": self.attempts } # ===================================================== # PERSISTENT CONTEXT STORE # ===================================================== class PersistentContextStore: def __init__(self): self._store: Dict[str, Dict] = {} self._lock = threading.Lock() def get_or_create_session(self, session_id: str) -> Dict: with self._lock: if session_id not in self._store: self._store[session_id] = { "original_task": "", "task_breakdown": [], "completed_steps": [], "tool_results": [], "pending_tool_calls": [], "executed_tool_calls": [], "current_state": ExecutionState.IDLE, "next_required_action": "", "created_at": time.time(), "last_updated": time.time(), "message_count": 0, "provider_history": [], "last_model_response": "", "described_but_not_executed": [], "consecutive_description_count": 0, } return self._store[session_id] def update_session(self, session_id: str, **kwargs): with self._lock: if session_id not in self._store: self.get_or_create_session(session_id) self._store[session_id].update(kwargs) self._store[session_id]["last_updated"] = time.time() def add_tool_result(self, session_id: str, tool_name: str, tool_id: str, result: str): with self._lock: if session_id not in self._store: self.get_or_create_session(session_id) # إزالة من المعلّقة self._store[session_id]["pending_tool_calls"] = [ p for p in self._store[session_id].get("pending_tool_calls", []) if p.get("tool_id") != tool_id ] # إضافة للمنفَّذة self._store[session_id]["executed_tool_calls"].append({ "tool_name": tool_name, "tool_id": tool_id, "timestamp": time.time() }) # حفظ النتيجة self._store[session_id]["tool_results"].append({ "tool_name": tool_name, "tool_id": tool_id, "result": result, "timestamp": time.time() }) # إعادة ضبط عداد الوصف self._store[session_id]["consecutive_description_count"] = 0 self._store[session_id]["described_but_not_executed"] = [] self._store[session_id]["last_updated"] = time.time() if self._store[session_id].get("current_state") == ExecutionState.TOOL_EXECUTING: self._store[session_id]["current_state"] = ExecutionState.TOOL_DONE def add_completed_step(self, session_id: str, step: str): with self._lock: if session_id not in self._store: self.get_or_create_session(session_id) self._store[session_id]["completed_steps"].append({ "step": step, "timestamp": time.time() }) self._store[session_id]["last_updated"] = time.time() def register_pending_tool(self, session_id: str, tool_call: PendingToolCall): with self._lock: if session_id not in self._store: self.get_or_create_session(session_id) existing_ids = [ p.get("tool_id") for p in self._store[session_id].get("pending_tool_calls", []) ] if tool_call.tool_id not in existing_ids: self._store[session_id]["pending_tool_calls"].append(tool_call.to_dict()) self._store[session_id]["current_state"] = ExecutionState.TOOL_PENDING self._store[session_id]["last_updated"] = time.time() def get_pending_tools(self, session_id: str) -> List[Dict]: with self._lock: if session_id not in self._store: return [] return self._store[session_id].get("pending_tool_calls", []) def increment_description_count(self, session_id: str, response_text: str) -> int: """ يزيد عداد "الوصف بدون تنفيذ" ويعيد العدد الحالي. هذا يُصلح مشكلة "المشاهد الذكي" بتتبع دقيق لعدد المرات. """ with self._lock: if session_id not in self._store: self.get_or_create_session(session_id) self._store[session_id]["described_but_not_executed"].append({ "response_preview": response_text[:300], "timestamp": time.time() }) count = self._store[session_id].get("consecutive_description_count", 0) + 1 self._store[session_id]["consecutive_description_count"] = count self._store[session_id]["last_updated"] = time.time() return count def get_execution_directive(self, session_id: str) -> str: """ *** القلب الحقيقي للحل *** يولّد توجيهاً إجبارياً يحدد ما يجب فعله الآن. هذا يحل مشكلة "غياب الدفعة الأخيرة". """ with self._lock: if session_id not in self._store: return "" ctx = self._store[session_id] parts = [] state = ctx.get("current_state", ExecutionState.IDLE) pending = ctx.get("pending_tool_calls", []) executed = ctx.get("executed_tool_calls", []) desc_count = ctx.get("consecutive_description_count", 0) tool_results = ctx.get("tool_results", []) # ===== الحالة 1: أداة معلّقة - أعلى أولوية ===== if pending and state == ExecutionState.TOOL_PENDING: pending_tool = pending[0] tool_name = pending_tool.get("name", "unknown") tool_args = pending_tool.get("arguments", {}) tool_id = pending_tool.get("tool_id", "") reason = pending_tool.get("reason", "") parts.append("## ⚡ MANDATORY IMMEDIATE ACTION") parts.append("") parts.append( f"Tool `{tool_name}` was registered as pending but NOT executed." ) if reason: parts.append(f"Reason: {reason}") parts.append("") parts.append("**COPY THIS EXACTLY INTO YOUR RESPONSE:**") parts.append("") parts.append("") call_obj = {"name": tool_name, "arguments": tool_args} if tool_id: call_obj["id"] = tool_id parts.append(json.dumps(call_obj, ensure_ascii=False, indent=2)) parts.append("") parts.append("") parts.append("NO other text is needed. Just the tool_call block above.") return "\n".join(parts) # ===== الحالة 2: وصف متكرر بدون تنفيذ ===== if desc_count >= 1: parts.append("## 🚨 CRITICAL: EXECUTION FAILURE PATTERN DETECTED") parts.append("") parts.append( f"You have described actions {desc_count} time(s) without executing them." ) parts.append("") parts.append("**THE ONLY ACCEPTABLE RESPONSE FORMAT:**") parts.append("") parts.append("```") parts.append("") parts.append('{"name": "TOOL_NAME", "arguments": {ACTUAL_ARGUMENTS}}') parts.append("") parts.append("```") parts.append("") parts.append("**FORBIDDEN PATTERNS (will be rejected):**") parts.append('❌ "I will create..." → FORBIDDEN') parts.append('❌ "I\'m going to use..." → FORBIDDEN') parts.append('❌ "Let me write..." → FORBIDDEN') parts.append('❌ "I\'ll use the tool..." → FORBIDDEN') parts.append('❌ "سأقوم بـ..." → FORBIDDEN') parts.append('❌ "سأستخدم أداة..." → FORBIDDEN') parts.append("") parts.append("**EXECUTE NOW. No descriptions. No explanations.**") return "\n".join(parts) # ===== الحالة 3: أداة منفَّذة، تحقق من الاكتمال ===== if tool_results and state == ExecutionState.TOOL_DONE: last_result = tool_results[-1] tool_name = last_result.get("tool_name", "") next_action = ctx.get("next_required_action", "") parts.append(f"## ✅ Tool `{tool_name}` Completed Successfully") parts.append("") if next_action: parts.append(f"**Next Required Action:** {next_action}") parts.append("Execute it now using the appropriate tool.") else: parts.append( "Determine if the original task is now complete. " "If more tools are needed, call them now. " "If done, provide a brief confirmation." ) return "\n".join(parts) return "" def get_context_summary(self, session_id: str) -> str: with self._lock: if session_id not in self._store: return "" ctx = self._store[session_id] parts = [] if ctx.get("original_task"): parts.append(f"## Original Task\n{ctx['original_task']}") if ctx.get("completed_steps"): steps_text = "\n".join( f" ✅ {s['step']}" for s in ctx["completed_steps"][-10:] ) parts.append(f"## Completed Steps\n{steps_text}") if ctx.get("executed_tool_calls"): exec_text = "\n".join( f" 🔧 {e['tool_name']} (id: {e['tool_id'][:12]}...)" for e in ctx["executed_tool_calls"][-5:] ) parts.append(f"## Tools Already Executed\n{exec_text}") if ctx.get("tool_results"): recent_results = ctx["tool_results"][-3:] results_text = "\n".join( f" - [{r['tool_name']}]: " f"{r['result'][:300]}{'...' if len(r['result']) > 300 else ''}" for r in recent_results ) parts.append(f"## Tool Results Available\n{results_text}") if ctx.get("pending_tool_calls"): pending_text = "\n".join( f" ⏳ {p['name']} (NOT YET EXECUTED)" for p in ctx["pending_tool_calls"] ) parts.append(f"## ⚠️ PENDING TOOLS (Must Execute)\n{pending_text}") # التوجيه التنفيذي - الأهم دائماً directive = self.get_execution_directive(session_id) if directive: parts.append(directive) if not parts: return "" return ( "# Task Context & Execution State\n" "# (This is your memory - use it to continue the task)\n\n" + "\n\n".join(parts) ) def cleanup_old_sessions(self, max_age_hours: int = 24): with self._lock: now = time.time() old_sessions = [ sid for sid, data in self._store.items() if now - data.get("last_updated", 0) > max_age_hours * 3600 ] for sid in old_sessions: del self._store[sid] if old_sessions: logger.info(f"[Context] Cleaned {len(old_sessions)} old sessions") def extract_task_from_messages(self, messages: List[Dict]) -> str: for msg in messages: if msg.get("role") == "user": content = msg.get("content", "") if isinstance(content, str) and len(content) > 10: return content[:500] elif isinstance(content, list): for item in content: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text", "") if len(text) > 10: return text[:500] return "" CONTEXT_STORE = PersistentContextStore() # ===================================================== # CACHE # ===================================================== class TTLCache: def __init__(self, max_size: int = 100, ttl_seconds: int = 300): self.cache: OrderedDict = OrderedDict() self.max_size = max_size self.ttl = ttl_seconds self._lock = threading.Lock() self._last_cleanup = time.time() self._cleanup_interval = 60 def _clean_expired(self): now = time.time() if now - self._last_cleanup < self._cleanup_interval: return self._last_cleanup = now expired = [k for k, (_, ts) in self.cache.items() if now - ts > self.ttl] for k in expired: del self.cache[k] def get(self, key: str) -> Optional[str]: with self._lock: if key in self.cache: value, _ = self.cache[key] self.cache.move_to_end(key) return value return None def set(self, key: str, value: str): with self._lock: self._clean_expired() if len(self.cache) >= self.max_size: self.cache.popitem(last=False) self.cache[key] = (value, time.time()) CACHE = TTLCache(max_size=100, ttl_seconds=300) # ===================================================== # PROVIDERS # ===================================================== def get_provider(name: str): try: return getattr(g4f.Provider, name) except Exception: return None REAL_PROVIDERS = { "Blackbox": get_provider("Blackbox"), "DeepSeek": get_provider("DeepSeek"), "Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"), "Copilot": get_provider("Copilot"), "You": get_provider("You"), "Bing": get_provider("Bing"), "Qwen": get_provider("Qwen"), } REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v} PROVIDER_MODELS_FALLBACK = { "Blackbox": ["gpt-4o", "claude-3.5-sonnet", "llama-3.1-70b", "gemini-pro"], "DeepSeek": ["deepseek-chat", "deepseek-coder"], "Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"], "Copilot": ["gpt-4o", "claude-3.5-sonnet"], "You": ["gpt-4o", "claude-3.5-sonnet"], "Bing": ["gpt-4o"], "Qwen": ["qwen-max", "qwen-plus", "qwen-turbo"], } _PROVIDER_MODEL_CACHE = {} def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]: if provider_name in _PROVIDER_MODEL_CACHE: return _PROVIDER_MODEL_CACHE[provider_name] candidates = [] for attr in ("models", "model", "default_model", "available_models", "supported_models"): try: if hasattr(provider_obj, attr): val = getattr(provider_obj, attr) if isinstance(val, dict): candidates.extend(str(k) for k in val.keys()) elif isinstance(val, (list, tuple, set)): candidates.extend(str(i) for i in val) elif val: candidates.append(str(val)) except Exception: pass if not candidates: candidates = PROVIDER_MODELS_FALLBACK.get(provider_name, ["gpt-4o"]) seen = set() unique = [m for m in candidates if not (m in seen or seen.add(m))] _PROVIDER_MODEL_CACHE[provider_name] = unique return unique # ===================================================== # STREAM CLEANER # ===================================================== def clean_stream(chunk): try: if isinstance(chunk, dict): if 'choices' in chunk and chunk['choices']: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] if 'text' in delta: return delta['text'] return chunk.get('content') or chunk.get('text') or "" if isinstance(chunk, str): if chunk and chunk[0] == '{' and chunk[-1] == '}': try: data = json.loads(chunk) if 'choices' in data and data['choices']: delta = data['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] return data.get('content') or data.get('text') or "" except Exception: pass if '\\' in chunk: chunk = chunk.replace('\\n', '\n') if '\\r' in chunk: chunk = chunk.replace('\\r', '\r') if '\\t' in chunk: chunk = chunk.replace('\\t', ' ') return chunk return str(chunk) except Exception as e: logger.warning(f"clean_stream error: {e}") return "" # ===================================================== # استخراج النص # ===================================================== def extract_text_from_content(content) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): if item.get("type") == "text": parts.append(item.get("text", "")) elif item.get("type") == "tool_result": tool_content = item.get("content", "") tool_use_id = item.get("tool_use_id", "") if isinstance(tool_content, list): result_texts = [] for tc in tool_content: if isinstance(tc, dict) and tc.get("type") == "text": result_texts.append(tc.get("text", "")) elif isinstance(tc, str): result_texts.append(tc) result_str = "\n".join(result_texts) elif isinstance(tool_content, str): result_str = tool_content else: result_str = str(tool_content) parts.append(f"[Tool Result for {tool_use_id}]:\n{result_str}") elif item.get("type") == "tool_use": tool_name = item.get("name", "unknown") tool_input = item.get("input", {}) tool_id = item.get("id", "") parts.append( f"[Tool Called: {tool_name} (id:{tool_id}) " f"with: {json.dumps(tool_input, ensure_ascii=False)}]" ) elif "text" in item: parts.append(item["text"]) elif "content" in item: parts.append(str(item["content"])) return "\n".join(parts) if content is not None: return str(content) return "" # ===================================================== # RESPONSE ANALYZER - محسّن جذرياً # ===================================================== class ResponseAnalyzer: """ يحلل رد النموذج لاكتشاف: 1. هل وصف فعلاً بدون تنفيذ؟ 2. هل يحتوي على tool call فعلي؟ 3. ما هي الأداة المقصودة؟ """ # أنماط "الوصف بدون تنفيذ" - شاملة ودقيقة DESCRIPTION_PHRASES = [ # عربي r"سأقوم\s+ب", r"سأستخدم\s+(?:أداة|الأداة|tool)", r"سأكتب\s+(?:الكود|الملف|الآن)", r"سأنشئ\s+(?:ملف|الملف|كود)", r"سأحفظ\s+(?:الملف|الكود|الآن)", r"سأبدأ\s+(?:الآن|بـ)", r"دعني\s+(?:أنشئ|أكتب|أستخدم|أحفظ)", # إنجليزي - I will r"I\s+will\s+(?:now\s+)?(?:create|write|save|use|call|make|build|generate|execute)", r"I\s+will\s+(?:now\s+)?(?:use|call)\s+the\s+\w+\s+tool", # إنجليزي - I'm going to r"I(?:'m|'m)\s+going\s+to\s+(?:create|write|save|use|call|make|build|generate)", # إنجليزي - Let me r"Let\s+me\s+(?:create|write|save|use|call|make|build|generate|now)", # إنجليزي - Now I'll r"Now\s+I(?:'ll|'m\s+going\s+to)\s+(?:create|write|save|use|call|make|build)", # إنجليزي - I'll r"I(?:'ll|'ll)\s+(?:now\s+)?(?:use|call)\s+the\s+\w+\s+(?:tool|function)", r"I(?:'ll|'ll)\s+(?:now\s+)?(?:create|write|save|make|build|generate)", # إنجليزي - using the tool r"using\s+the\s+(?:write_file|create_file|save_file|search|browse|read_file)\s+tool", # إنجليزي - To create/write r"To\s+(?:create|write|save)\s+this\s+(?:file|code|content)", # إنجليزي - I need to use r"I\s+need\s+to\s+(?:use|call)\s+the\s+\w+\s+(?:tool|function)", # الوصف في code block r"```json\n\{[^}]*\"name\"\s*:", ] @classmethod def analyze(cls, response: str, available_tools: List[Dict] = None) -> Dict: result = { "has_actual_tool_call": False, "has_description_without_execution": False, "described_actions": [], "needs_execution_push": False, "tool_calls_found": [], "intended_tool_name": None, } # التحقق من وجود tool call فعلي clean_text, tool_calls = ToolCallParser.parse_tool_calls(response, available_tools) result["has_actual_tool_call"] = len(tool_calls) > 0 result["tool_calls_found"] = tool_calls # البحث عن عبارات الوصف descriptions_found = [] for pattern in cls.DESCRIPTION_PHRASES: matches = re.findall(pattern, response, re.IGNORECASE) descriptions_found.extend(matches) result["described_actions"] = descriptions_found # الحكم النهائي if descriptions_found and not result["has_actual_tool_call"]: result["has_description_without_execution"] = True result["needs_execution_push"] = True # محاولة استخراج اسم الأداة المقصودة if available_tools: for tool in available_tools: tool_name = tool.get("name", "") if tool_name and tool_name.lower() in response.lower(): result["intended_tool_name"] = tool_name break return result # ===================================================== # TOOL CALL PARSER - محسّن مع تسامح أعلى # ===================================================== class ToolCallParser: """ يكتشف ويحلل وسوم Tool Call. محسّن للتعامل مع التنسيقات المختلفة والأخطاء الطفيفة. """ START_MARKERS = [ '', '```tool_call', '```json', ' str: return f"toolu_{uuid.uuid4().hex[:24]}" @classmethod def might_contain_tool_call(cls, text: str) -> bool: text_stripped = text.strip().lower() for marker in cls.START_MARKERS: if marker.lower() in text_stripped: return True if text_stripped.startswith('{') and '"name"' in text_stripped: return True return False @classmethod def parse_tool_calls( cls, text: str, available_tools: Optional[List[Dict]] = None ) -> Tuple[str, List[Dict]]: tool_calls = [] clean_text = text # النمط 1: JSON # مرن جداً - يتحمل مسافات وأسطر إضافية pattern1 = re.compile( r'\s*\s*(\{.*?\})\s*\s*', re.DOTALL | re.IGNORECASE ) for match in pattern1.finditer(text): try: raw_json = match.group(1).strip() # تنظيف JSON من الأخطاء الشائعة raw_json = cls._clean_json_string(raw_json) parsed = json.loads(raw_json) tool_call = cls._normalize_tool_call(parsed, available_tools) if tool_call: tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError as e: logger.warning(f"[Parser] JSON parse error: {e}") fixed = cls._try_fix_json(match.group(1).strip()) if fixed: tool_call = cls._normalize_tool_call(fixed, available_tools) if tool_call: tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) # النمط 2: ```tool_call\nJSON\n``` if not tool_calls: pattern2 = re.compile( r'```(?:tool_call|json)?\s*\n?\s*(\{.*?"(?:name|function)".*?\})\s*\n?\s*```', re.DOTALL | re.IGNORECASE ) for match in pattern2.finditer(text): try: raw_json = match.group(1).strip() raw_json = cls._clean_json_string(raw_json) parsed = json.loads(raw_json) tool_call = cls._normalize_tool_call(parsed, available_tools) if tool_call: tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError: pass # النمط 3: JSON if not tool_calls: pattern3 = re.compile( r'\s*\s*(\{.*?\})\s*\s*', re.DOTALL | re.IGNORECASE ) for match in pattern3.finditer(text): try: func_name = match.group(1) args_json = json.loads(match.group(2).strip()) tool_call = { "type": "tool_use", "id": cls.generate_tool_id(), "name": func_name, "input": args_json } if available_tools: tool_call = cls._validate_against_tools(tool_call, available_tools) if tool_call: tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError: pass # النمط 4: [TOOL_CALL]...[/TOOL_CALL] if not tool_calls: pattern4 = re.compile( r'\s*\[TOOL_CALL\]\s*(.*?)\s*\[/TOOL_CALL\]\s*', re.DOTALL | re.IGNORECASE ) for match in pattern4.finditer(text): content_inner = match.group(1).strip() try: parsed = json.loads(content_inner) tool_call = cls._normalize_tool_call(parsed, available_tools) if tool_call: tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError: func_match = re.match(r'(\w+)\s*\((.*)\)', content_inner, re.DOTALL) if func_match: try: func_name = func_match.group(1) args_str = func_match.group(2).strip() args = json.loads(args_str) if args_str else {} tool_call = { "type": "tool_use", "id": cls.generate_tool_id(), "name": func_name, "input": args } tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError: pass # النمط 5: ✿FUNCTION✿ if not tool_calls: pattern5 = re.compile( r'✿FUNCTION✿:\s*(\w+)\s*\n✿ARGS✿:\s*(\{.*?\})\s*(?:\n✿RESULT✿)?', re.DOTALL ) for match in pattern5.finditer(text): try: func_name = match.group(1) args_json = json.loads(match.group(2).strip()) tool_call = { "type": "tool_use", "id": cls.generate_tool_id(), "name": func_name, "input": args_json } tool_calls.append(tool_call) clean_text = clean_text.replace(match.group(0), " ", 1) except json.JSONDecodeError: pass # النمط 6: JSON مباشر يحتوي tool_calls if not tool_calls: try: json_pattern = re.compile( r'\{[^{}]*"tool_calls"[^{}]*\[.*?\]\s*\}', re.DOTALL ) for jm in json_pattern.finditer(text): try: parsed = json.loads(jm.group(0)) if "tool_calls" in parsed: for tc in parsed["tool_calls"]: func_data = tc.get("function", tc) name = func_data.get("name", "") args = func_data.get( "arguments", func_data.get("input", {}) ) if isinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: args = {"raw": args} if name: tool_call = { "type": "tool_use", "id": cls.generate_tool_id(), "name": name, "input": args } tool_calls.append(tool_call) clean_text = clean_text.replace(jm.group(0), " ", 1) except json.JSONDecodeError: pass except Exception: pass clean_text = clean_text.strip() clean_text = re.sub(r'\n{3,}', '\n\n', clean_text) clean_text = re.sub(r' +', ' ', clean_text) return clean_text, tool_calls @classmethod def _clean_json_string(cls, json_str: str) -> str: """ تنظيف JSON من الأخطاء الشائعة. هذا يحل مشكلة "ضياع الأوامر في الترجمة". """ # إزالة trailing commas json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # إصلاح علامات الاقتباس المنحنية json_str = json_str.replace('\u201c', '"').replace('\u201d', '"') json_str = json_str.replace('\u2018', "'").replace('\u2019', "'") # إزالة BOM json_str = json_str.lstrip('\ufeff') return json_str @classmethod def _normalize_tool_call( cls, parsed: Dict, available_tools: Optional[List[Dict]] = None ) -> Optional[Dict]: name = None arguments = {} if "function" in parsed and isinstance(parsed["function"], dict): name = parsed["function"].get("name") arguments = parsed["function"].get("arguments", {}) elif "name" in parsed: name = parsed["name"] arguments = ( parsed.get("arguments") or parsed.get("parameters") or parsed.get("input") or parsed.get("args") or {} ) elif "tool" in parsed: name = parsed["tool"] arguments = parsed.get("args", parsed.get("arguments", parsed.get("input", {}))) if not name: return None if isinstance(arguments, str): try: arguments = json.loads(arguments) except json.JSONDecodeError: arguments = {"raw_input": arguments} tool_call = { "type": "tool_use", "id": cls.generate_tool_id(), "name": str(name), "input": arguments if isinstance(arguments, dict) else {"value": arguments} } if available_tools: tool_call = cls._validate_against_tools(tool_call, available_tools) return tool_call @classmethod def _validate_against_tools( cls, tool_call: Dict, available_tools: List[Dict] ) -> Optional[Dict]: requested_name = tool_call["name"] tool_names = [] for tool in available_tools: tool_name = tool.get("name", "") if not tool_name and "function" in tool: tool_name = tool["function"].get("name", "") tool_names.append(tool_name) if tool_name == requested_name: return tool_call requested_lower = requested_name.lower().replace("_", "").replace("-", "") for tool_name in tool_names: tool_lower = tool_name.lower().replace("_", "").replace("-", "") if requested_lower == tool_lower: tool_call["name"] = tool_name return tool_call if requested_lower in tool_lower or tool_lower in requested_lower: tool_call["name"] = tool_name return tool_call logger.warning(f"[Validator] Tool '{requested_name}' not in: {tool_names}") return tool_call @classmethod def _try_fix_json(cls, broken_json: str) -> Optional[Dict]: # تنظيف أولي fixed = cls._clean_json_string(broken_json) # إغلاق الأقواس المفتوحة open_braces = fixed.count('{') - fixed.count('}') if open_braces > 0: fixed += '}' * open_braces open_brackets = fixed.count('[') - fixed.count(']') if open_brackets > 0: fixed += ']' * open_brackets try: return json.loads(fixed) except json.JSONDecodeError: pass # محاولة استخراج JSON من وسط النص try: start = fixed.index('{') depth = 0 for i in range(start, len(fixed)): if fixed[i] == '{': depth += 1 elif fixed[i] == '}': depth -= 1 if depth == 0: return json.loads(fixed[start:i + 1]) except (ValueError, json.JSONDecodeError): pass return None # ===================================================== # STREAM TOOL BUFFER # ===================================================== class StreamToolBuffer: def __init__(self, available_tools: Optional[List[Dict]] = None): self.buffer = "" self.in_tool_call = False self.tool_call_buffer = "" self.available_tools = available_tools or [] self.pending_text = "" self.tool_call_depth = 0 self._active_start_marker = "" self._active_end_marker = "" self._stream_started_at = time.time() def feed(self, chunk: str) -> List[Dict]: events = [] self.buffer += chunk while self.buffer: if self.in_tool_call: events.extend(self._process_tool_call_mode()) if self.in_tool_call: break else: events.extend(self._process_text_mode()) if self.in_tool_call: continue break return events def _process_text_mode(self) -> List[Dict]: events = [] buffer_lower = self.buffer.lower() earliest_pos = -1 earliest_marker = "" for marker in ToolCallParser.START_MARKERS: pos = buffer_lower.find(marker.lower()) if pos != -1 and (earliest_pos == -1 or pos < earliest_pos): earliest_pos = pos earliest_marker = marker if earliest_pos == -1: safe_length = len(self.buffer) - 100 if safe_length > 0: text_to_send = self.buffer[:safe_length] self.buffer = self.buffer[safe_length:] if text_to_send: events.append({"type": "text", "text": text_to_send}) else: text_before = self.buffer[:earliest_pos] if text_before and not text_before.isspace(): events.append({"type": "text", "text": text_before}) elif text_before and text_before.isspace(): events.append({"type": "text", "text": " "}) self.buffer = self.buffer[earliest_pos:] self.in_tool_call = True self.tool_call_buffer = "" self._active_start_marker = earliest_marker self._set_active_end_marker(earliest_marker) return events def _set_active_end_marker(self, start_marker: str): start_lower = start_marker.lower() if start_lower == '': self._active_end_marker = '' elif start_lower in ('```tool_call', '```json', '```'): self._active_end_marker = '```' elif start_lower.startswith(' List[Dict]: events = [] end_markers_pairs = [ ('', ''), ('```', '```tool_call'), ('```', '```json'), ('', ' 5000: logger.warning( "[Buffer] Tool call buffer too large, treating as text" ) events.append({"type": "text", "text": self.buffer}) self.buffer = "" self.in_tool_call = False self._active_start_marker = "" self._active_end_marker = "" return events def flush(self) -> List[Dict]: events = [] if self.in_tool_call and self.buffer: logger.warning( f"[Buffer] Stream ended with unclosed tool_call. " f"Attempting manual closure..." ) end_marker = self._active_end_marker or "" buffer_with_close = self.buffer + end_marker clean_text, tool_calls = ToolCallParser.parse_tool_calls( buffer_with_close, self.available_tools ) if tool_calls: logger.info( f"[Buffer] Manual closure SUCCESS: {len(tool_calls)} tool call(s)" ) for tc in tool_calls: if not tc.get("id"): tc["id"] = ToolCallParser.generate_tool_id() events.append(tc) if clean_text.strip(): events.append({"type": "text", "text": clean_text}) else: raw_buffer = self.buffer if self._active_start_marker: start_lower = self._active_start_marker.lower() raw_lower = raw_buffer.lower() if raw_lower.startswith(start_lower): raw_buffer = raw_buffer[len(self._active_start_marker):] json_start = raw_buffer.find('{') if json_start != -1: json_fragment = raw_buffer[json_start:].strip() open_braces = json_fragment.count('{') - json_fragment.count('}') if open_braces > 0: json_fragment += '}' * open_braces open_brackets = json_fragment.count('[') - json_fragment.count(']') if open_brackets > 0: json_fragment += ']' * open_brackets repaired_text = ( self._active_start_marker + json_fragment + end_marker ) clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls( repaired_text, self.available_tools ) if tool_calls2: for tc in tool_calls2: if not tc.get("id"): tc["id"] = ToolCallParser.generate_tool_id() events.append(tc) if clean_text2.strip(): events.append({"type": "text", "text": clean_text2}) else: if self.buffer.strip(): events.append({"type": "text", "text": self.buffer}) else: if self.buffer.strip(): events.append({"type": "text", "text": self.buffer}) elif self.buffer: if self.buffer.strip(): events.append({"type": "text", "text": self.buffer}) self.buffer = "" self.in_tool_call = False self._active_start_marker = "" self._active_end_marker = "" return events # ===================================================== # TOOLS FORMATTER - محسّن # ===================================================== class ToolsFormatter: @staticmethod def format_tools_for_prompt(tools: List[Dict], tool_choice: Any = None) -> str: if not tools: return "" lines = [] lines.append("# Tools Available") lines.append("") lines.append( "**CRITICAL RULE**: You are an EXECUTOR, not a DESCRIBER. " "When you decide to use a tool, you MUST call it immediately " "using the format below. " "NEVER write 'I will use...' or 'I'm going to...' - just DO IT." ) lines.append("") lines.append("## ✅ CORRECT - How to call a tool:") lines.append("") lines.append('{"name": "tool_name", "arguments": {"param": "value"}}') lines.append("") lines.append("") lines.append("## ❌ WRONG - What NOT to do:") lines.append(' "I will now use the write_file tool to..." → FORBIDDEN') lines.append(' "Let me create the file..." → FORBIDDEN') lines.append(' "I\'ll use the tool to..." → FORBIDDEN') lines.append("") lines.append("---") lines.append("") lines.append("## Available Tools:") lines.append("") for i, tool in enumerate(tools, 1): name = tool.get("name", "unknown") description = tool.get("description", "No description") input_schema = tool.get("input_schema", {}) lines.append(f"### {i}. `{name}`") lines.append(f"**Purpose:** {description}") properties = input_schema.get("properties", {}) required = input_schema.get("required", []) example_args = {} if properties: lines.append("**Parameters:**") for param_name, param_info in properties.items(): param_type = param_info.get("type", "any") param_desc = param_info.get("description", "") is_required = param_name in required req_marker = " *(required)*" if is_required else " *(optional)*" lines.append( f" - `{param_name}` ({param_type}{req_marker}): {param_desc}" ) if "enum" in param_info: lines.append( f" Values: {', '.join(str(v) for v in param_info['enum'])}" ) if param_name in required: if param_type == "string": example_args[param_name] = f"<{param_name}_value>" elif param_type == "integer": example_args[param_name] = 0 elif param_type == "boolean": example_args[param_name] = True elif param_type == "array": example_args[param_name] = [] else: example_args[param_name] = f"<{param_name}>" lines.append(f"**Call it like this:**") lines.append("") example = {"name": name, "arguments": example_args} lines.append(json.dumps(example, ensure_ascii=False, indent=2)) lines.append("") lines.append("") # معالجة tool_choice if tool_choice: if isinstance(tool_choice, dict): if tool_choice.get("type") == "tool": forced_tool = tool_choice.get("name", "") if forced_tool: lines.append("---") lines.append( f"**⚡ MANDATORY:** You MUST call `{forced_tool}` right now. " f"Use the format above." ) elif tool_choice.get("type") == "any": lines.append("---") lines.append( "**⚡ MANDATORY:** You MUST call at least one tool. " "Use the format above." ) elif tool_choice == "any": lines.append("---") lines.append("**⚡ MANDATORY:** Use at least one tool now.") lines.append("") return "\n".join(lines) @staticmethod def format_tool_result_for_message(tool_use_id: str, content: Any) -> str: result_text = "" if isinstance(content, str): result_text = content elif isinstance(content, list): parts = [] for item in content: if isinstance(item, dict): if item.get("type") == "text": parts.append(item.get("text", "")) elif item.get("type") == "image": parts.append("[Image content]") else: parts.append(json.dumps(item, ensure_ascii=False)) elif isinstance(item, str): parts.append(item) result_text = "\n".join(parts) elif isinstance(content, dict): result_text = json.dumps(content, ensure_ascii=False, indent=2) else: result_text = str(content) if content else "" return ( f"[Tool Result - ID: {tool_use_id}]\n" f"{result_text}\n" f"[End Tool Result]" ) # ===================================================== # MESSAGE CONVERTER # ===================================================== class MessageConverter: @staticmethod def convert_messages( messages: List[Dict], system_prompt: str = "", tools: Optional[List[Dict]] = None, tool_choice: Any = None, session_id: str = "" ) -> Tuple[str, List[Dict]]: history = [] full_system = system_prompt if system_prompt else "" # إضافة تعريف الأدوات if tools: tools_text = ToolsFormatter.format_tools_for_prompt(tools, tool_choice) if full_system: full_system = f"{full_system}\n\n{tools_text}" else: full_system = tools_text # حقن سياق المهمة مع التوجيه التنفيذي if session_id: context_summary = CONTEXT_STORE.get_context_summary(session_id) if context_summary: if full_system: full_system = f"{full_system}\n\n{context_summary}" else: full_system = context_summary logger.info( f"[Context] Injected context+directive for session {session_id[:8]}..." ) # معالجة الرسائل for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": sys_text = extract_text_from_content(content) if full_system: full_system = f"{full_system}\n\n{sys_text}" else: full_system = sys_text continue converted_text = MessageConverter._convert_content(content, role) if converted_text: g4f_role = "user" if role == "user" else "assistant" history.append({"role": g4f_role, "content": converted_text}) # استخراج الرسالة الأخيرة if history: last_msg = history.pop() user_message = last_msg["content"] else: user_message = "" # بناء الرسالة الكاملة if full_system: full_message = ( f"[System Instructions]\n{full_system}\n" f"[/System Instructions]\n\n{user_message}" ) else: full_message = user_message return full_message, history @staticmethod def _convert_content(content: Any, role: str) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for block in content: if isinstance(block, str): parts.append(block) elif isinstance(block, dict): block_type = block.get("type", "") if block_type == "text": parts.append(block.get("text", "")) elif block_type == "tool_use": name = block.get("name", "unknown") input_data = block.get("input", {}) tool_id = block.get("id", "") parts.append( f"" f'{{"name": "{name}", ' f'"id": "{tool_id}", ' f'"arguments": {json.dumps(input_data, ensure_ascii=False)}}}' f"" ) elif block_type == "tool_result": tool_use_id = block.get("tool_use_id", "") result_content = block.get("content", "") is_error = block.get("is_error", False) result_text = ToolsFormatter.format_tool_result_for_message( tool_use_id, result_content ) if is_error: result_text = f"[ERROR] {result_text}" parts.append(result_text) elif block_type == "image": parts.append("[Image content - not supported in text mode]") else: if "text" in block: parts.append(block["text"]) else: parts.append(json.dumps(block, ensure_ascii=False)) return "\n".join(parts) if content is not None: return str(content) return "" # ===================================================== # SMART HISTORY MANAGER # ===================================================== class SmartHistoryManager: FIRST_MESSAGES_TO_KEEP = 5 LAST_MESSAGES_TO_KEEP = 20 MAX_TOTAL_MESSAGES = 30 @classmethod def process_history( cls, history: List[Dict], session_id: str = "" ) -> List[Dict]: if not history: return [] if session_id: cls._extract_and_store_tool_results(history, session_id) total = len(history) if total <= cls.MAX_TOTAL_MESSAGES: logger.info(f"[History] Full history: {total} messages") return history first_part = history[:cls.FIRST_MESSAGES_TO_KEEP] last_part = history[-cls.LAST_MESSAGES_TO_KEEP:] middle_part = history[cls.FIRST_MESSAGES_TO_KEEP:-cls.LAST_MESSAGES_TO_KEEP] if middle_part: summary = cls._summarize_middle(middle_part) if summary: summary_msg = { "role": "assistant", "content": ( f"[Context Summary - {len(middle_part)} messages compressed]\n" f"{summary}" ) } smart_history = first_part + [summary_msg] + last_part else: smart_history = first_part + last_part else: smart_history = first_part + last_part logger.info( f"[History] Smart compression: {total} → {len(smart_history)} messages" ) return smart_history @classmethod def _extract_and_store_tool_results( cls, history: List[Dict], session_id: str ): for msg in history: content = msg.get("content", "") if isinstance(content, str): if "[Tool Result" in content: tool_match = re.search( r'\[Tool Result - ID: ([^\]]+)\]\n(.*?)\n\[End Tool Result\]', content, re.DOTALL ) if tool_match: tool_id = tool_match.group(1) result = tool_match.group(2)[:500] CONTEXT_STORE.add_tool_result( session_id, "tool", tool_id, result ) @classmethod def _summarize_middle(cls, messages: List[Dict]) -> str: if not messages: return "" summary_parts = [] for msg in messages: role = msg.get("role", "") content = str(msg.get("content", ""))[:200] summary_parts.append(f"[{role}]: {content}...") return "\n".join(summary_parts[:10]) # ===================================================== # EXECUTION ENFORCER - القلب الحقيقي للحل # ===================================================== class ExecutionEnforcer: """ المحرك التنفيذي الإجباري. يحل المشاكل الأربع: 1. "المشاهد الذكي" → يكتشف الوصف ويُجبر على التنفيذ 2. "ضياع الأوامر" → يُعيد الطلب بتنسيق صحيح ومضمون 3. "غياب الدفعة الأخيرة" → يُرسل رسالة إجبارية للتنفيذ 4. "تشتت الهوية" → يُعرّف النموذج كـ EXECUTOR لا DESCRIBER """ MAX_EXECUTION_RETRIES = 2 @classmethod def build_forced_execution_message( cls, tool_name: str, tool_args: Dict, reason: str = "" ) -> str: """ يبني رسالة تُجبر النموذج على تنفيذ أداة محددة. هذا هو "الدفعة الأخيرة" المفقودة. """ tool_call_json = json.dumps( {"name": tool_name, "arguments": tool_args}, ensure_ascii=False, indent=2 ) msg_parts = [ "EXECUTE THIS TOOL CALL NOW.", "", ] if reason: msg_parts.append(f"Reason: {reason}") msg_parts.append("") msg_parts.extend([ "Copy this EXACTLY:", "", "", tool_call_json, "", "", "No other text needed. Just the tool_call block." ]) return "\n".join(msg_parts) @classmethod def pre_process_request( cls, message: str, session_id: str, tools: List[Dict] ) -> str: """ معالجة الطلب قبل إرساله. يضيف توجيهات إجبارية إذا كانت هناك أداة معلّقة. """ if not session_id or not tools: return message ctx = CONTEXT_STORE.get_or_create_session(session_id) state = ctx.get("current_state", ExecutionState.IDLE) pending = ctx.get("pending_tool_calls", []) desc_count = ctx.get("consecutive_description_count", 0) # إذا كانت هناك أداة معلّقة if pending and state == ExecutionState.TOOL_PENDING: pending_tool = pending[0] tool_name = pending_tool.get("name", "unknown") tool_args = pending_tool.get("arguments", {}) reminder = ( f"\n\n[EXECUTION REMINDER]: " f"You must call `{tool_name}` using:\n" f"\n" f"{json.dumps({'name': tool_name, 'arguments': tool_args}, ensure_ascii=False, indent=2)}\n" f"" ) return message + reminder # إذا وصف بدون تنفيذ مرات متعددة if desc_count >= 2: warning = ( f"\n\n[SYSTEM WARNING - EXECUTION REQUIRED]: " f"You have described actions {desc_count} times without executing them. " f"Your response MUST contain a block. " f"Descriptions are not acceptable." ) return message + warning return message @classmethod def post_process_response( cls, response: str, session_id: str, tools: List[Dict], tool_calls_found: List[Dict] ) -> Dict: """ تحليل الرد بعد استلامه. يقرر هل يحتاج "دفعة" أخرى. """ if not session_id: return {"needs_retry": False, "retry_message": "", "analysis": {}} analysis = ResponseAnalyzer.analyze(response, tools) # إذا وجد tool calls فعلية - نجاح if tool_calls_found: for tc in tool_calls_found: CONTEXT_STORE.add_completed_step( session_id, f"Executed tool: {tc.get('name', 'unknown')} " f"with args: {json.dumps(tc.get('input', {}))[:100]}" ) # إعادة ضبط عداد الوصف CONTEXT_STORE.update_session( session_id, described_but_not_executed=[], consecutive_description_count=0, current_state=ExecutionState.TOOL_EXECUTING ) return {"needs_retry": False, "retry_message": "", "analysis": analysis} # إذا وصف بدون تنفيذ if analysis["has_description_without_execution"]: desc_count = CONTEXT_STORE.increment_description_count(session_id, response) if desc_count <= cls.MAX_EXECUTION_RETRIES: intended_tool = analysis.get("intended_tool_name") if intended_tool: # بناء رسالة إجبارية للأداة المقصودة tool_def = next( (t for t in tools if t.get("name") == intended_tool), None ) example_args = {} if tool_def: schema = tool_def.get("input_schema", {}) for param_name in schema.get("required", []): param_info = schema.get("properties", {}).get(param_name, {}) param_type = param_info.get("type", "string") if param_type == "string": example_args[param_name] = f"<{param_name}>" elif param_type == "integer": example_args[param_name] = 0 elif param_type == "boolean": example_args[param_name] = True else: example_args[param_name] = None retry_message = cls.build_forced_execution_message( intended_tool, example_args, f"You mentioned using {intended_tool} but didn't call it" ) else: retry_message = ( "You described an action without executing it. " "Use a block now:\n\n" "\n" '{"name": "TOOL_NAME", "arguments": {ARGUMENTS}}\n' "" ) return { "needs_retry": True, "retry_message": retry_message, "analysis": analysis } return {"needs_retry": False, "retry_message": "", "analysis": analysis} # ===================================================== # CHAT LOGIC - مع محرك التنفيذ الكامل # ===================================================== def ask( message: str, history: List, provider_name: str, model_name: str, stop_flag=None, tools: Optional[List[Dict]] = None, session_id: str = "" ): """ دالة الدردشة الرئيسية مع محرك التنفيذ الإجباري. """ message = (message or "").strip() if not message: yield "" return # تطبيق المعالجة المسبقة للتنفيذ if tools and session_id: message = ExecutionEnforcer.pre_process_request(message, session_id, tools) # لا كاش عند وجود أدوات if not tools: key = f"{provider_name}|{model_name}|{message[:200]}" cached = CACHE.get(key) if cached: yield cached return else: key = None # معالجة التاريخ msgs = [] try: if history: if isinstance(history, list) and len(history) > 0: if isinstance(history[0], dict): smart_history = SmartHistoryManager.process_history( history, session_id ) for item in smart_history: role = item.get("role") content = item.get("content") if role and content: text = ( extract_text_from_content(content) if not isinstance(content, str) else content ) if text: msgs.append({"role": str(role), "content": text}) else: if len(history) > 30: smart_history_tuples = history[:5] + history[-25:] else: smart_history_tuples = history for item in smart_history_tuples: if isinstance(item, (list, tuple)) and len(item) == 2: if item[0]: msgs.append({"role": "user", "content": str(item[0])}) if item[1]: msgs.append({"role": "assistant", "content": str(item[1])}) except Exception as e: logger.warning(f"[History] Error: {e}") msgs.append({"role": "user", "content": message}) # تحديث المخزن الدائم if session_id: ctx = CONTEXT_STORE.get_or_create_session(session_id) if not ctx.get("original_task") and len(msgs) <= 2: CONTEXT_STORE.update_session( session_id, original_task=message[:500], current_state=ExecutionState.PLANNING ) ctx["message_count"] = ctx.get("message_count", 0) + 1 # قائمة المزودين all_provider_names = list(REAL_PROVIDERS.keys()) if provider_name in all_provider_names: fallback_providers = [provider_name] + [ p for p in all_provider_names if p != provider_name ] else: fallback_providers = all_provider_names used = [] for pname in fallback_providers: if pname in used: continue used.append(pname) pobj = REAL_PROVIDERS.get(pname) if not pobj: continue models_list = discover_provider_models(pobj, pname) if not models_list: continue if model_name in models_list: model_candidates = [model_name] + [m for m in models_list if m != model_name] else: model_candidates = models_list for m in model_candidates[:5]: try: logger.info(f"[Fallback] Trying {pname}/{m}") if session_id: provider_history = CONTEXT_STORE._store.get( session_id, {} ).get("provider_history", []) provider_history.append({ "provider": pname, "model": m, "timestamp": time.time() }) CONTEXT_STORE._store[session_id]["provider_history"] = ( provider_history[-20:] ) stream = g4f.ChatCompletion.create( model=m, provider=pobj, messages=msgs, stream=True, timeout=60 ) buffer = [] got_response = False for chunk in stream: if stop_flag and stop_flag.is_set(): return c = clean_stream(chunk) if not c: continue got_response = True buffer.append(c) yield c full = "".join(buffer) if full.strip() and got_response: if key: CACHE.set(key, full) if session_id: CONTEXT_STORE.update_session( session_id, last_model_response=full[:500], current_state=ExecutionState.AWAITING_NEXT ) return except Exception as e: logger.warning( f"[Fallback] {pname}/{m} failed: {str(e)[:200]}" ) continue yield "❌ فشلت جميع المزودات. تأكد من اتصال الإنترنت أو حاول لاحقاً." # ===================================================== # SESSION ID EXTRACTOR # ===================================================== def extract_session_id(request: Request, body: Dict) -> str: session_id = ( request.headers.get("X-Session-ID", "") or request.headers.get("x-session-id", "") or request.headers.get("X-Conversation-ID", "") or "" ) if session_id: return session_id metadata = body.get("metadata", {}) if isinstance(metadata, dict): session_id = ( metadata.get("session_id", "") or metadata.get("conversation_id", "") or metadata.get("user_id", "") or "" ) if session_id: return session_id messages = body.get("messages", []) if messages: first_msg = messages[0] content = str(first_msg.get("content", ""))[:100] import hashlib hash_val = hashlib.md5(content.encode()).hexdigest()[:16] return f"auto_{hash_val}" return f"rand_{uuid.uuid4().hex[:16]}" # ===================================================== # FASTAPI # ===================================================== app = FastAPI( title="G4F Execution Engine", description="AI Gateway with Execution Enforcement" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) API_KEY = os.getenv("API_KEY", "mysecretkey123") class ChatRequest(BaseModel): message: str provider: str = "Blackbox" model: str = "gpt-4o" history: List[Any] = [] session_id: str = "" def verify_api_key(request: Request): auth = request.headers.get("Authorization", "").strip() x_key = request.headers.get("X-API-Key", "").strip() x_api_key = request.headers.get("x-api-key", "").strip() if auth.startswith("Bearer "): key = auth[7:].strip() if key and key == API_KEY: return True if x_key and x_key == API_KEY: return True if x_api_key and x_api_key == API_KEY: return True raise HTTPException( status_code=401, detail="Invalid API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'" ) # ===================================================== # دعم HEAD # ===================================================== @app.head("/") async def head_root(): return Response(status_code=200) @app.head("/health") async def head_health(): return Response(status_code=200) @app.head("/v1/models") async def head_models(): return Response(status_code=200) @app.head("/v1/messages") async def head_messages(): return Response(status_code=200) @app.head("/v1/chat/completions") async def head_chat_completions(): return Response(status_code=200) # ===================================================== # نقاط نهاية Anthropic API # ===================================================== @app.get("/v1/models") async def v1_models(request: Request): models = [] seen_ids = set() for pname, pobj in REAL_PROVIDERS.items(): models_list = discover_provider_models(pobj, pname) for model in models_list[:5]: if model not in seen_ids: seen_ids.add(model) models.append({ "id": model, "object": "model", "created": int(time.time()), "owned_by": pname, "type": "model", "display_name": f"{pname} - {model}" }) if not models: models = [{ "id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system", "type": "model", "display_name": "Default" }] return {"object": "list", "data": models} @app.post("/v1/messages") async def v1_messages(request: Request): verify_api_key(request) body = await request.json() session_id = extract_session_id(request, body) logger.info( f"[API] /v1/messages - model={body.get('model')}, " f"stream={body.get('stream')}, " f"tools_count={len(body.get('tools', []))}, " f"session={session_id[:8]}..." ) messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="No messages provided") model = body.get("model", "gpt-4o") system_prompt = body.get("system", "") if isinstance(system_prompt, list): sys_parts = [] for sp in system_prompt: if isinstance(sp, dict) and sp.get("type") == "text": sys_parts.append(sp.get("text", "")) elif isinstance(sp, str): sys_parts.append(sp) system_prompt = "\n".join(sys_parts) is_stream = body.get("stream", False) max_tokens = body.get("max_tokens", 4096) tools = body.get("tools", []) tool_choice = body.get("tool_choice", "auto") metadata = body.get("metadata", {}) # حفظ المهمة الأصلية ctx = CONTEXT_STORE.get_or_create_session(session_id) if not ctx.get("original_task"): original_task = CONTEXT_STORE.extract_task_from_messages(messages) if original_task: CONTEXT_STORE.update_session(session_id, original_task=original_task) # تحويل الرسائل full_message, history = MessageConverter.convert_messages( messages, system_prompt, tools, tool_choice, session_id ) logger.info( f"[API] Message length: {len(full_message)}, " f"history: {len(history)}, " f"tools: {[t.get('name', '') for t in tools]}" ) if is_stream: return await _handle_anthropic_stream( full_message, history, model, max_tokens, tools, tool_choice, metadata, session_id ) # Non-stream full_response = "" for chunk in ask( full_message, history, "Blackbox", model, tools=tools, session_id=session_id ): full_response += chunk clean_text, tool_calls = ToolCallParser.parse_tool_calls(full_response, tools) # تطبيق محرك التنفيذ if tools and session_id: enforcement = ExecutionEnforcer.post_process_response( full_response, session_id, tools, tool_calls ) if enforcement["needs_retry"]: logger.info( f"[Enforcer] Retry needed: {enforcement['retry_message'][:100]}" ) retry_response = "" for chunk in ask( enforcement["retry_message"], [], "Blackbox", model, tools=tools, session_id=session_id ): retry_response += chunk if retry_response: clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls( retry_response, tools ) if tool_calls2: tool_calls = tool_calls2 clean_text = clean_text2 logger.info( f"[Enforcer] Retry SUCCESS: {len(tool_calls2)} tool calls" ) message_id = f"msg_{int(time.time())}_{os.urandom(4).hex()}" input_tokens = max(1, len(full_message) // 4) output_tokens = max(1, len(full_response) // 4) content_blocks = [] if clean_text.strip(): content_blocks.append({"type": "text", "text": clean_text.strip()}) for tc in tool_calls: tool_id = tc.get("id") or ToolCallParser.generate_tool_id() content_blocks.append({ "type": "tool_use", "id": tool_id, "name": tc["name"], "input": tc.get("input", {}) }) logger.info(f"[API] Tool call: {tc['name']} (id: {tool_id})") if session_id: CONTEXT_STORE.add_completed_step( session_id, f"Called tool: {tc['name']} with {json.dumps(tc.get('input', {}))[:100]}" ) if not content_blocks: content_blocks.append({"type": "text", "text": full_response or ""}) stop_reason = "tool_use" if tool_calls else "end_turn" return { "id": message_id, "type": "message", "role": "assistant", "content": content_blocks, "model": model, "stop_reason": stop_reason, "stop_sequence": None, "usage": { "input_tokens": input_tokens, "output_tokens": output_tokens } } # ===================================================== # معالج الـ streaming # ===================================================== async def _handle_anthropic_stream( full_message: str, history: list, model: str, max_tokens: int, tools: List[Dict] = None, tool_choice: Any = None, metadata: Dict = None, session_id: str = "" ): async def generate_stream(): message_id = f"msg_{int(time.time())}_{os.urandom(4).hex()}" tools_list = tools or [] msg_start = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "content": [], "model": model, "stop_reason": None, "stop_sequence": None, "usage": { "input_tokens": max(1, len(full_message) // 4), "output_tokens": 0 } } } yield ( f"event: message_start\n" f"data: {json.dumps(msg_start, ensure_ascii=False)}\n\n" ) tool_buffer = StreamToolBuffer(available_tools=tools_list) current_block_index = 0 text_block_open = False output_tokens = 0 has_tool_calls = False detected_tool_calls = [] full_response_buffer = [] def make_text_block_start(index: int) -> str: block_start = { "type": "content_block_start", "index": index, "content_block": {"type": "text", "text": ""} } return ( f"event: content_block_start\n" f"data: {json.dumps(block_start, ensure_ascii=False)}\n\n" ) def make_text_delta(index: int, text: str) -> str: delta_event = { "type": "content_block_delta", "index": index, "delta": {"type": "text_delta", "text": text} } return ( f"event: content_block_delta\n" f"data: {json.dumps(delta_event, ensure_ascii=False)}\n\n" ) def make_block_stop(index: int) -> str: block_stop = {"type": "content_block_stop", "index": index} return ( f"event: content_block_stop\n" f"data: {json.dumps(block_stop, ensure_ascii=False)}\n\n" ) def make_tool_use_events(index: int, tool_call: Dict) -> str: events_str = "" tool_id = tool_call.get("id") or ToolCallParser.generate_tool_id() tool_name = tool_call.get("name", "unknown") tool_input = tool_call.get("input", {}) logger.info(f"[Stream] Sending tool_use: {tool_name} (id: {tool_id})") block_start = { "type": "content_block_start", "index": index, "content_block": { "type": "tool_use", "id": tool_id, "name": tool_name, "input": {} } } events_str += ( f"event: content_block_start\n" f"data: {json.dumps(block_start, ensure_ascii=False)}\n\n" ) input_json = json.dumps(tool_input, ensure_ascii=False) chunk_size = 100 for i in range(0, len(input_json), chunk_size): json_chunk = input_json[i:i + chunk_size] delta_event = { "type": "content_block_delta", "index": index, "delta": { "type": "input_json_delta", "partial_json": json_chunk } } events_str += ( f"event: content_block_delta\n" f"data: {json.dumps(delta_event, ensure_ascii=False)}\n\n" ) events_str += make_block_stop(index) return events_str # معالجة الـ Stream try: for chunk in ask( full_message, history, "Blackbox", model, tools=tools_list, session_id=session_id ): if not chunk: continue full_response_buffer.append(chunk) output_tokens += max(1, len(chunk) // 4) if not tools_list: if not text_block_open: yield make_text_block_start(current_block_index) text_block_open = True yield make_text_delta(current_block_index, chunk) continue events = tool_buffer.feed(chunk) for event in events: if event.get("type") == "text": text = event["text"] if text: if not text_block_open: yield make_text_block_start(current_block_index) text_block_open = True yield make_text_delta(current_block_index, text) elif event.get("type") == "tool_use": if text_block_open: yield make_block_stop(current_block_index) current_block_index += 1 text_block_open = False has_tool_calls = True detected_tool_calls.append(event) yield make_tool_use_events(current_block_index, event) current_block_index += 1 # تفريغ الـ buffer logger.info("[Stream] Flushing buffer...") remaining_events = tool_buffer.flush() for event in remaining_events: if event.get("type") == "text": text = event["text"] if text: if not text_block_open: yield make_text_block_start(current_block_index) text_block_open = True yield make_text_delta(current_block_index, text) elif event.get("type") == "tool_use": if text_block_open: yield make_block_stop(current_block_index) current_block_index += 1 text_block_open = False has_tool_calls = True detected_tool_calls.append(event) logger.info( f"[Stream] Recovered tool call: {event.get('name')}" ) yield make_tool_use_events(current_block_index, event) current_block_index += 1 # ===== تطبيق محرك التنفيذ على الـ stream ===== if tools_list and session_id and not has_tool_calls: full_response = "".join(full_response_buffer) enforcement = ExecutionEnforcer.post_process_response( full_response, session_id, tools_list, detected_tool_calls ) if enforcement["needs_retry"]: logger.info( f"[Stream Enforcer] Sending execution push..." ) retry_tool_buffer = StreamToolBuffer(available_tools=tools_list) for retry_chunk in ask( enforcement["retry_message"], [], "Blackbox", model, tools=tools_list, session_id=session_id ): if not retry_chunk: continue retry_events = retry_tool_buffer.feed(retry_chunk) for event in retry_events: if event.get("type") == "text": text = event["text"] if text: if not text_block_open: yield make_text_block_start(current_block_index) text_block_open = True yield make_text_delta(current_block_index, text) elif event.get("type") == "tool_use": if text_block_open: yield make_block_stop(current_block_index) current_block_index += 1 text_block_open = False has_tool_calls = True detected_tool_calls.append(event) logger.info( f"[Stream Enforcer] Tool call in retry: " f"{event.get('name')}" ) yield make_tool_use_events(current_block_index, event) current_block_index += 1 # تفريغ retry buffer retry_remaining = retry_tool_buffer.flush() for event in retry_remaining: if event.get("type") == "tool_use": if text_block_open: yield make_block_stop(current_block_index) current_block_index += 1 text_block_open = False has_tool_calls = True detected_tool_calls.append(event) yield make_tool_use_events(current_block_index, event) current_block_index += 1 except Exception as e: logger.error(f"[Stream] Error: {str(e)}") if not text_block_open: yield make_text_block_start(current_block_index) text_block_open = True yield make_text_delta(current_block_index, f"\n\n[Error: {str(e)}]") # إغلاق الـ blocks المفتوحة if text_block_open: yield make_block_stop(current_block_index) elif current_block_index == 0 and not has_tool_calls: yield make_text_block_start(0) yield make_text_delta(0, "") yield make_block_stop(0) # حفظ الأدوات المكتشفة if session_id and detected_tool_calls: for tc in detected_tool_calls: CONTEXT_STORE.add_completed_step( session_id, f"Called tool: {tc.get('name', 'unknown')} " f"with {json.dumps(tc.get('input', {}))[:100]}" ) stop_reason = "tool_use" if has_tool_calls else "end_turn" msg_delta = { "type": "message_delta", "delta": { "stop_reason": stop_reason, "stop_sequence": None }, "usage": {"output_tokens": output_tokens} } yield ( f"event: message_delta\n" f"data: {json.dumps(msg_delta, ensure_ascii=False)}\n\n" ) yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # ===================================================== # نقطة /v1/messages/stream # ===================================================== @app.post("/v1/messages/stream") async def v1_messages_stream(request: Request): verify_api_key(request) body = await request.json() session_id = extract_session_id(request, body) messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="No messages provided") model = body.get("model", "gpt-4o") system_prompt = body.get("system", "") if isinstance(system_prompt, list): sys_parts = [] for sp in system_prompt: if isinstance(sp, dict) and sp.get("type") == "text": sys_parts.append(sp.get("text", "")) elif isinstance(sp, str): sys_parts.append(sp) system_prompt = "\n".join(sys_parts) max_tokens = body.get("max_tokens", 4096) tools = body.get("tools", []) tool_choice = body.get("tool_choice", "auto") full_message, history = MessageConverter.convert_messages( messages, system_prompt, tools, tool_choice, session_id ) return await _handle_anthropic_stream( full_message, history, model, max_tokens, tools, tool_choice, {}, session_id ) # ===================================================== # نقطة /v1/chat/completions (OpenAI) # ===================================================== @app.post("/v1/chat/completions") async def v1_chat_completions(request: Request): verify_api_key(request) body = await request.json() session_id = extract_session_id(request, body) messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="No messages provided") model = body.get("model", "gpt-4o") is_stream = body.get("stream", False) last_message = messages[-1] user_message = extract_text_from_content(last_message.get("content", "")) history = [] for msg in messages[:-1]: role = msg.get("role", "user") content = extract_text_from_content(msg.get("content", "")) if role and content: history.append({"role": role, "content": content}) history = SmartHistoryManager.process_history(history, session_id) completion_id = f"chatcmpl-{int(time.time())}_{os.urandom(4).hex()}" if is_stream: async def openai_stream(): for chunk in ask( user_message, history, "Blackbox", model, session_id=session_id ): if chunk: data = { "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {"content": chunk}, "finish_reason": None }] } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" final_data = { "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }] } yield f"data: {json.dumps(final_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( openai_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) full_response = "" for chunk in ask( user_message, history, "Blackbox", model, session_id=session_id ): full_response += chunk return { "id": completion_id, "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": full_response }, "finish_reason": "stop" }], "usage": { "prompt_tokens": max(1, len(user_message) // 4), "completion_tokens": max(1, len(full_response) // 4), "total_tokens": max(1, (len(user_message) + len(full_response)) // 4) } } # ===================================================== # نقاط نهاية إضافية # ===================================================== @app.get("/") async def root(): return { "message": "G4F Execution Engine", "version": "6.0.0", "problems_solved": { "1_smart_watcher_syndrome": ( "ResponseAnalyzer detects 'I will...' patterns " "and flags them for immediate retry" ), "2_lost_in_translation": ( "ToolCallParser._clean_json_string() fixes common JSON errors " "before parsing - tolerant of extra spaces and characters" ), "3_missing_final_push": ( "ExecutionEnforcer.build_forced_execution_message() " "sends a mandatory execution message when description detected" ), "4_identity_confusion": ( "ToolsFormatter defines the model as EXECUTOR not DESCRIBER " "with clear forbidden patterns list" ), }, "providers": list(REAL_PROVIDERS.keys()), "cookies": COOKIE_STATUS, "status": "✅ Execution Engine v6 Active" } @app.get("/health") async def health(): return { "status": "ok", "version": "6.0.0", "cookies": COOKIE_STATUS, "providers": list(REAL_PROVIDERS.keys()), "provider_count": len(REAL_PROVIDERS), "context_store": { "active_sessions": len(CONTEXT_STORE._store), }, "execution_engine": { "response_analyzer": True, "execution_enforcer": True, "stream_retry": True, "json_cleaner": True, "description_patterns": len(ResponseAnalyzer.DESCRIPTION_PHRASES), }, "timestamp": int(time.time()) } @app.get("/providers") async def get_providers(request: Request): verify_api_key(request) result = {} for pname, pobj in REAL_PROVIDERS.items(): models = discover_provider_models(pobj, pname) result[pname] = {"available": True, "models": models} return {"providers": result} @app.get("/context/{session_id}") async def get_session_context(session_id: str, request: Request): verify_api_key(request) ctx = CONTEXT_STORE.get_or_create_session(session_id) return { "session_id": session_id, "context": ctx, "summary": CONTEXT_STORE.get_context_summary(session_id), "execution_directive": CONTEXT_STORE.get_execution_directive(session_id) } @app.delete("/context/{session_id}") async def clear_session_context(session_id: str, request: Request): verify_api_key(request) with CONTEXT_STORE._lock: if session_id in CONTEXT_STORE._store: del CONTEXT_STORE._store[session_id] return {"message": f"Session {session_id} cleared"} return {"message": f"Session {session_id} not found"} @app.get("/context") async def list_sessions(request: Request): verify_api_key(request) sessions = [] with CONTEXT_STORE._lock: for sid, ctx in CONTEXT_STORE._store.items(): sessions.append({ "session_id": sid, "message_count": ctx.get("message_count", 0), "original_task_preview": ctx.get("original_task", "")[:100], "current_state": ctx.get("current_state", ExecutionState.IDLE), "completed_steps": len(ctx.get("completed_steps", [])), "tool_results": len(ctx.get("tool_results", [])), "pending_tools": len(ctx.get("pending_tool_calls", [])), "executed_tools": len(ctx.get("executed_tool_calls", [])), "description_count": ctx.get("consecutive_description_count", 0), "last_updated": ctx.get("last_updated", 0), }) return {"sessions": sessions, "total": len(sessions)} @app.post("/debug/test-execution-enforcer") async def debug_test_execution_enforcer(request: Request): verify_api_key(request) test_cases = [ { "name": "Arabic description without execution", "response": "سأقوم الآن بإنشاء الملف باستخدام أداة write_file.", "expected_needs_retry": True }, { "name": "English description without execution", "response": "I will now create the file using the write_file tool.", "expected_needs_retry": True }, { "name": "Actual tool call", "response": ( "Creating the file now.\n" "\n" '{"name": "write_file", "arguments": {"path": "test.py", "content": "print(1)"}}\n' "" ), "expected_needs_retry": False }, { "name": "Let me create pattern", "response": "Let me create a Python file with the requested functionality.", "expected_needs_retry": True }, { "name": "Regular response (no tools needed)", "response": "The answer to your question is 42.", "expected_needs_retry": False }, { "name": "I'll use the tool pattern", "response": "I'll use the write_file tool to save your code.", "expected_needs_retry": True }, ] test_tools = [{ "name": "write_file", "description": "Write content to a file", "input_schema": { "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "content": {"type": "string", "description": "File content"} }, "required": ["path", "content"] } }] results = [] for tc in test_cases: analysis = ResponseAnalyzer.analyze(tc["response"], test_tools) test_session = f"test_{uuid.uuid4().hex[:8]}" enforcement = ExecutionEnforcer.post_process_response( tc["response"], test_session, test_tools, [] ) # تنظيف الجلسة التجريبية with CONTEXT_STORE._lock: if test_session in CONTEXT_STORE._store: del CONTEXT_STORE._store[test_session] results.append({ "test_name": tc["name"], "response_preview": tc["response"][:100], "expected_needs_retry": tc["expected_needs_retry"], "actual_needs_retry": enforcement["needs_retry"], "passed": enforcement["needs_retry"] == tc["expected_needs_retry"], "analysis": { "has_actual_tool_call": analysis["has_actual_tool_call"], "has_description_without_execution": ( analysis["has_description_without_execution"] ), "intended_tool": analysis.get("intended_tool_name"), }, "retry_message_preview": ( enforcement["retry_message"][:150] if enforcement["retry_message"] else "" ) }) passed = sum(1 for r in results if r["passed"]) total = len(results) return { "test_results": results, "summary": { "passed": passed, "failed": total - passed, "total": total, "success_rate": f"{passed/total*100:.1f}%" } } @app.get("/debug/test-tool-parse") async def debug_test_tool_parse(request: Request): verify_api_key(request) test_cases = [ { "name": "Standard tool_call tags", "input": ( 'I will create the file now.\n' '{"name": "write_file", ' '"arguments": {"path": "test.py", "content": "print(\'hello\')"}}' '' ), }, { "name": "Tool call with whitespace", "input": ( 'Let me help you.\n\n\n' ' \n' '{"name": "read_file", "arguments": {"path": "config.json"}}\n' '' ), }, { "name": "Code block format", "input": ( 'Let me write that.\n' '```tool_call\n' '{"name": "read_file", "arguments": {"path": "config.json"}}\n' '```' ), }, { "name": "JSON with trailing comma (common error)", "input": ( '' '{"name": "write_file", "arguments": {"path": "test.py", "content": "data",}}' '' ), }, { "name": "Multiple tool calls", "input": ( 'First read, then write.\n' '{"name": "read_file", "arguments": {"path": "old.txt"}}' '\n' 'Now writing:\n' '{"name": "write_file", "arguments": {"path": "new.txt", "content": "data"}}' '' ), }, { "name": "No tool calls (plain text)", "input": "This is just a regular response with no tool calls.", } ] results = [] for tc in test_cases: clean_text, tool_calls = ToolCallParser.parse_tool_calls(tc["input"]) results.append({ "test_name": tc["name"], "input_preview": ( tc["input"][:100] + "..." if len(tc["input"]) > 100 else tc["input"] ), "clean_text": clean_text, "tool_calls_found": len(tool_calls), "tool_calls": tool_calls, "has_tool_call_marker": ToolCallParser.might_contain_tool_call(tc["input"]), }) return {"test_results": results} @app.post("/debug/test-stream-buffer") async def debug_test_stream_buffer(request: Request): verify_api_key(request) body = await request.json() text = body.get("text", "") tools = body.get("tools", []) chunk_size = body.get("chunk_size", 10) buffer = StreamToolBuffer(available_tools=tools) all_events = [] for i in range(0, len(text), chunk_size): chunk = text[i:i + chunk_size] events = buffer.feed(chunk) for event in events: all_events.append({"chunk_index": i // chunk_size, "event": event}) remaining = buffer.flush() for event in remaining: all_events.append({"chunk_index": "flush", "event": event}) return { "input_length": len(text), "chunk_size": chunk_size, "total_chunks": (len(text) + chunk_size - 1) // chunk_size, "total_events": len(all_events), "events": all_events } # ===================================================== # نقطة الدردشة البسيطة # ===================================================== @app.post("/chat") async def simple_chat(chat_req: ChatRequest, request: Request): verify_api_key(request) session_id = chat_req.session_id or extract_session_id(request, {}) full_response = "" for chunk in ask( chat_req.message, chat_req.history, chat_req.provider, chat_req.model, session_id=session_id ): full_response += chunk return { "response": full_response, "session_id": session_id, "provider": chat_req.provider, "model": chat_req.model } @app.post("/chat/stream") async def simple_chat_stream(chat_req: ChatRequest, request: Request): verify_api_key(request) session_id = chat_req.session_id or extract_session_id(request, {}) async def generate(): for chunk in ask( chat_req.message, chat_req.history, chat_req.provider, chat_req.model, session_id=session_id ): if chunk: yield f"data: {json.dumps({'text': chunk}, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # ===================================================== # BACKGROUND CLEANUP TASK # ===================================================== async def cleanup_task(): """مهمة تنظيف دورية للجلسات القديمة""" while True: try: await asyncio.sleep(3600) # كل ساعة CONTEXT_STORE.cleanup_old_sessions(max_age_hours=24) logger.info("[Cleanup] Old sessions cleaned") except Exception as e: logger.error(f"[Cleanup] Error: {e}") @app.on_event("startup") async def startup_event(): """تشغيل المهام عند بدء التطبيق""" asyncio.create_task(cleanup_task()) logger.info("=" * 60) logger.info("🚀 G4F Execution Engine v6.0.0 Started") logger.info(f"🍪 Cookies: {COOKIE_STATUS}") logger.info(f"🔌 Providers: {list(REAL_PROVIDERS.keys())}") logger.info("🔧 Execution Engine: ACTIVE") logger.info("✅ Problems Fixed:") logger.info(" 1. Smart Watcher Syndrome → ResponseAnalyzer") logger.info(" 2. Lost in Translation → JSON Cleaner") logger.info(" 3. Missing Final Push → ExecutionEnforcer") logger.info(" 4. Identity Confusion → EXECUTOR mode") logger.info("=" * 60) @app.on_event("shutdown") async def shutdown_event(): """تنظيف عند إغلاق التطبيق""" logger.info("[Shutdown] G4F Execution Engine stopping...") active_sessions = len(CONTEXT_STORE._store) logger.info(f"[Shutdown] Active sessions at shutdown: {active_sessions}") # ===================================================== # نقطة تشخيص: عرض حالة النظام الكاملة # ===================================================== @app.get("/debug/system-status") async def debug_system_status(request: Request): """عرض حالة النظام الكاملة للتشخيص""" verify_api_key(request) # إحصائيات الجلسات session_stats = { "total_sessions": 0, "sessions_with_pending_tools": 0, "sessions_with_description_failures": 0, "sessions_completed": 0, "total_tool_executions": 0, "total_description_failures": 0, } with CONTEXT_STORE._lock: session_stats["total_sessions"] = len(CONTEXT_STORE._store) for sid, ctx in CONTEXT_STORE._store.items(): pending = len(ctx.get("pending_tool_calls", [])) desc_failures = ctx.get("consecutive_description_count", 0) executed = len(ctx.get("executed_tool_calls", [])) state = ctx.get("current_state", "") if pending > 0: session_stats["sessions_with_pending_tools"] += 1 if desc_failures > 0: session_stats["sessions_with_description_failures"] += 1 if state == ExecutionState.COMPLETED: session_stats["sessions_completed"] += 1 session_stats["total_tool_executions"] += executed session_stats["total_description_failures"] += desc_failures # إحصائيات المزودين provider_stats = {} for pname, pobj in REAL_PROVIDERS.items(): models = discover_provider_models(pobj, pname) provider_stats[pname] = { "available": True, "model_count": len(models), "models": models[:3], # أول 3 نماذج فقط } # إحصائيات محرك التنفيذ engine_stats = { "description_patterns_count": len(ResponseAnalyzer.DESCRIPTION_PHRASES), "tool_call_start_markers": len(ToolCallParser.START_MARKERS), "max_execution_retries": ExecutionEnforcer.MAX_EXECUTION_RETRIES, "cache_size": len(CACHE.cache), "cache_max_size": CACHE.max_size, "cache_ttl_seconds": CACHE.ttl, } return { "status": "operational", "version": "6.0.0", "uptime_info": { "cookie_status": COOKIE_STATUS, "providers_loaded": len(REAL_PROVIDERS), }, "session_statistics": session_stats, "provider_statistics": provider_stats, "execution_engine_statistics": engine_stats, "execution_states": { "IDLE": ExecutionState.IDLE, "PLANNING": ExecutionState.PLANNING, "TOOL_PENDING": ExecutionState.TOOL_PENDING, "TOOL_EXECUTING": ExecutionState.TOOL_EXECUTING, "TOOL_DONE": ExecutionState.TOOL_DONE, "AWAITING_NEXT": ExecutionState.AWAITING_NEXT, "COMPLETED": ExecutionState.COMPLETED, }, "timestamp": int(time.time()) } # ===================================================== # نقطة تشخيص: اختبار مزود محدد # ===================================================== @app.post("/debug/test-provider") async def debug_test_provider(request: Request): """اختبار مزود محدد بشكل مباشر""" verify_api_key(request) body = await request.json() provider_name = body.get("provider", "Blackbox") model_name = body.get("model", "gpt-4o") test_message = body.get("message", "Say 'hello' in one word.") timeout_seconds = body.get("timeout", 30) pobj = REAL_PROVIDERS.get(provider_name) if not pobj: return { "success": False, "error": f"Provider '{provider_name}' not found", "available_providers": list(REAL_PROVIDERS.keys()) } start_time = time.time() response_chunks = [] error_msg = None try: msgs = [{"role": "user", "content": test_message}] stream = g4f.ChatCompletion.create( model=model_name, provider=pobj, messages=msgs, stream=True, timeout=timeout_seconds ) for chunk in stream: c = clean_stream(chunk) if c: response_chunks.append(c) # توقف بعد 500 حرف للاختبار if sum(len(r) for r in response_chunks) > 500: break except Exception as e: error_msg = str(e) elapsed = time.time() - start_time full_response = "".join(response_chunks) return { "success": bool(full_response and not error_msg), "provider": provider_name, "model": model_name, "test_message": test_message, "response_preview": full_response[:300] if full_response else "", "response_length": len(full_response), "elapsed_seconds": round(elapsed, 2), "error": error_msg, "timestamp": int(time.time()) } # ===================================================== # نقطة تشخيص: اختبار دورة كاملة مع أداة # ===================================================== @app.post("/debug/test-full-tool-cycle") async def debug_test_full_tool_cycle(request: Request): """ اختبار دورة كاملة: رسالة → كشف أداة → تنفيذ → نتيجة يُظهر بوضوح هل المحرك يعمل أم لا """ verify_api_key(request) body = await request.json() test_message = body.get( "message", "Create a file called 'hello.py' with content: print('Hello World')" ) model = body.get("model", "gpt-4o") test_tools = [ { "name": "write_file", "description": "Write content to a file on disk", "input_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "The file path to write to" }, "content": { "type": "string", "description": "The content to write to the file" } }, "required": ["path", "content"] } } ] test_session = f"test_cycle_{uuid.uuid4().hex[:8]}" results = { "test_session": test_session, "test_message": test_message, "model": model, "steps": [], "final_tool_calls": [], "success": False, "description_attempts": 0, "execution_attempts": 0, } # المرحلة 1: تحويل الرسالة messages = [{"role": "user", "content": test_message}] full_message, history = MessageConverter.convert_messages( messages, "", test_tools, "auto", test_session ) results["steps"].append({ "step": 1, "name": "Message Conversion", "message_length": len(full_message), "has_tool_instructions": "tool_call" in full_message.lower(), "has_executor_rule": "EXECUTOR" in full_message, }) # المرحلة 2: إرسال للنموذج step2_start = time.time() full_response = "" try: for chunk in ask( full_message, history, "Blackbox", model, tools=test_tools, session_id=test_session ): full_response += chunk except Exception as e: results["steps"].append({ "step": 2, "name": "Model Request", "error": str(e) }) return results step2_elapsed = time.time() - step2_start # المرحلة 3: تحليل الرد analysis = ResponseAnalyzer.analyze(full_response, test_tools) clean_text, tool_calls = ToolCallParser.parse_tool_calls(full_response, test_tools) results["steps"].append({ "step": 2, "name": "Model Request", "response_length": len(full_response), "response_preview": full_response[:200], "elapsed_seconds": round(step2_elapsed, 2), "has_actual_tool_call": analysis["has_actual_tool_call"], "has_description_without_execution": analysis["has_description_without_execution"], "tool_calls_found": len(tool_calls), "described_actions": analysis["described_actions"][:3], "intended_tool": analysis.get("intended_tool_name"), }) if analysis["has_description_without_execution"]: results["description_attempts"] += 1 # المرحلة 4: تطبيق محرك التنفيذ إذا لزم if not tool_calls and analysis["has_description_without_execution"]: enforcement = ExecutionEnforcer.post_process_response( full_response, test_session, test_tools, tool_calls ) results["steps"].append({ "step": 3, "name": "Execution Enforcer", "needs_retry": enforcement["needs_retry"], "retry_message_preview": enforcement["retry_message"][:200], }) if enforcement["needs_retry"]: results["execution_attempts"] += 1 retry_response = "" retry_start = time.time() try: for chunk in ask( enforcement["retry_message"], [], "Blackbox", model, tools=test_tools, session_id=test_session ): retry_response += chunk except Exception as e: results["steps"].append({ "step": 4, "name": "Retry Request", "error": str(e) }) retry_elapsed = time.time() - retry_start clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls( retry_response, test_tools ) results["steps"].append({ "step": 4, "name": "Retry Request", "response_length": len(retry_response), "response_preview": retry_response[:200], "elapsed_seconds": round(retry_elapsed, 2), "tool_calls_found": len(tool_calls2), }) if tool_calls2: tool_calls = tool_calls2 # النتيجة النهائية results["final_tool_calls"] = tool_calls results["success"] = len(tool_calls) > 0 # تنظيف الجلسة التجريبية with CONTEXT_STORE._lock: if test_session in CONTEXT_STORE._store: del CONTEXT_STORE._store[test_session] results["summary"] = { "total_steps": len(results["steps"]), "description_attempts": results["description_attempts"], "execution_attempts": results["execution_attempts"], "tool_calls_extracted": len(results["final_tool_calls"]), "verdict": ( "✅ SUCCESS: Tool was actually called" if results["success"] else "❌ FAILURE: Model described but didn't execute" ) } return results # ===================================================== # نقطة: مسح كل الجلسات (للتطوير فقط) # ===================================================== @app.delete("/context/all/clear") async def clear_all_sessions(request: Request): """مسح جميع الجلسات - للتطوير فقط""" verify_api_key(request) with CONTEXT_STORE._lock: count = len(CONTEXT_STORE._store) CONTEXT_STORE._store.clear() return { "message": f"Cleared {count} sessions", "timestamp": int(time.time()) } # ===================================================== # نقطة: إضافة نتيجة أداة يدوياً (للاختبار) # ===================================================== @app.post("/context/{session_id}/add-tool-result") async def add_tool_result_manually( session_id: str, request: Request ): """إضافة نتيجة أداة يدوياً - مفيد للاختبار""" verify_api_key(request) body = await request.json() tool_name = body.get("tool_name", "unknown") tool_id = body.get("tool_id", ToolCallParser.generate_tool_id()) result = body.get("result", "") CONTEXT_STORE.add_tool_result(session_id, tool_name, tool_id, result) return { "message": f"Tool result added for session {session_id}", "tool_name": tool_name, "tool_id": tool_id, "result_preview": result[:100], "session_state": CONTEXT_STORE.get_or_create_session(session_id).get( "current_state", ExecutionState.IDLE ) } # ===================================================== # نقطة: تحديث حالة الجلسة يدوياً # ===================================================== @app.post("/context/{session_id}/set-state") async def set_session_state(session_id: str, request: Request): """تحديث حالة الجلسة يدوياً""" verify_api_key(request) body = await request.json() new_state = body.get("state", ExecutionState.IDLE) next_action = body.get("next_required_action", "") valid_states = [ ExecutionState.IDLE, ExecutionState.PLANNING, ExecutionState.TOOL_PENDING, ExecutionState.TOOL_EXECUTING, ExecutionState.TOOL_DONE, ExecutionState.AWAITING_NEXT, ExecutionState.COMPLETED, ] if new_state not in valid_states: raise HTTPException( status_code=400, detail=f"Invalid state. Valid states: {valid_states}" ) CONTEXT_STORE.update_session( session_id, current_state=new_state, next_required_action=next_action ) return { "message": f"Session {session_id} state updated", "new_state": new_state, "next_required_action": next_action, "execution_directive": CONTEXT_STORE.get_execution_directive(session_id) } # ===================================================== # نقطة: تسجيل أداة معلّقة يدوياً # ===================================================== @app.post("/context/{session_id}/register-pending-tool") async def register_pending_tool_manually( session_id: str, request: Request ): """تسجيل أداة معلّقة يدوياً - لإجبار البوت على تنفيذها""" verify_api_key(request) body = await request.json() tool_name = body.get("name", "") tool_args = body.get("arguments", {}) reason = body.get("reason", "Manually registered") if not tool_name: raise HTTPException(status_code=400, detail="Tool name is required") tool_id = ToolCallParser.generate_tool_id() pending_tool = PendingToolCall( name=tool_name, arguments=tool_args, tool_id=tool_id, reason=reason ) CONTEXT_STORE.register_pending_tool(session_id, pending_tool) return { "message": f"Pending tool '{tool_name}' registered for session {session_id}", "tool_id": tool_id, "directive": CONTEXT_STORE.get_execution_directive(session_id) } # ===================================================== # نقطة: اختبار JSON cleaner # ===================================================== @app.post("/debug/test-json-cleaner") async def debug_test_json_cleaner(request: Request): """اختبار مُصلح JSON على حالات شائعة""" verify_api_key(request) test_cases = [ { "name": "Trailing comma", "input": '{"name": "write_file", "arguments": {"path": "test.py",}}', "expected_valid": True }, { "name": "Curly quotes", "input": '{\u201cname\u201d: \u201cwrite_file\u201d, \u201carguments\u201d: {}}', "expected_valid": True }, { "name": "Missing closing brace", "input": '{"name": "write_file", "arguments": {"path": "test.py"', "expected_valid": True }, { "name": "Valid JSON", "input": '{"name": "read_file", "arguments": {"path": "config.json"}}', "expected_valid": True }, { "name": "Double trailing commas", "input": '{"name": "tool",, "arguments": {}}', "expected_valid": False # هذا صعب الإصلاح }, ] results = [] for tc in test_cases: cleaned = ToolCallParser._clean_json_string(tc["input"]) parse_success = False parse_result = None error_msg = None try: parse_result = json.loads(cleaned) parse_success = True except json.JSONDecodeError as e: error_msg = str(e) # محاولة الإصلاح التلقائي fixed = ToolCallParser._try_fix_json(tc["input"]) if fixed: parse_result = fixed parse_success = True error_msg = None results.append({ "test_name": tc["name"], "original_input": tc["input"], "cleaned_output": cleaned, "parse_success": parse_success, "parse_result": parse_result, "error": error_msg, "expected_valid": tc["expected_valid"], "passed": parse_success == tc["expected_valid"] }) passed = sum(1 for r in results if r["passed"]) return { "test_results": results, "summary": { "passed": passed, "failed": len(results) - passed, "total": len(results) } } # ===================================================== # نقطة: اختبار ResponseAnalyzer على نص مخصص # ===================================================== @app.post("/debug/analyze-response") async def debug_analyze_response(request: Request): """تحليل نص مخصص بواسطة ResponseAnalyzer""" verify_api_key(request) body = await request.json() response_text = body.get("response", "") tools = body.get("tools", []) if not response_text: raise HTTPException(status_code=400, detail="response field is required") analysis = ResponseAnalyzer.analyze(response_text, tools) # محاكاة قرار المحرك test_session = f"analyze_{uuid.uuid4().hex[:8]}" enforcement = ExecutionEnforcer.post_process_response( response_text, test_session, tools, analysis["tool_calls_found"] ) # تنظيف with CONTEXT_STORE._lock: if test_session in CONTEXT_STORE._store: del CONTEXT_STORE._store[test_session] return { "input_response_preview": response_text[:300], "analysis": { "has_actual_tool_call": analysis["has_actual_tool_call"], "has_description_without_execution": ( analysis["has_description_without_execution"] ), "needs_execution_push": analysis["needs_execution_push"], "described_actions_found": analysis["described_actions"], "intended_tool_name": analysis.get("intended_tool_name"), "tool_calls_extracted": len(analysis["tool_calls_found"]), "tool_calls_details": analysis["tool_calls_found"], }, "enforcer_decision": { "needs_retry": enforcement["needs_retry"], "retry_message": enforcement["retry_message"], }, "verdict": ( "✅ EXECUTOR: Model actually called a tool" if analysis["has_actual_tool_call"] else ( "⚠️ DESCRIBER: Model described without executing" if analysis["has_description_without_execution"] else "ℹ️ NEUTRAL: Regular text response (no tools involved)" ) ) } # ===================================================== # نقطة: مراقبة الكاش # ===================================================== @app.get("/debug/cache-stats") async def debug_cache_stats(request: Request): """إحصائيات الكاش""" verify_api_key(request) with CACHE._lock: cache_size = len(CACHE.cache) now = time.time() expired_count = sum( 1 for _, ts in CACHE.cache.values() if now - ts > CACHE.ttl ) entries = [] for key, (value, ts) in list(CACHE.cache.items())[:10]: entries.append({ "key_preview": key[:50], "value_length": len(value), "age_seconds": round(now - ts), "expired": now - ts > CACHE.ttl }) return { "cache_size": cache_size, "max_size": CACHE.max_size, "ttl_seconds": CACHE.ttl, "expired_entries": expired_count, "active_entries": cache_size - expired_count, "sample_entries": entries, "timestamp": int(time.time()) } # ===================================================== # نقطة: تصدير سياق جلسة كاملة # ===================================================== @app.get("/context/{session_id}/export") async def export_session_context(session_id: str, request: Request): """تصدير سياق الجلسة كاملاً بصيغة JSON""" verify_api_key(request) ctx = CONTEXT_STORE.get_or_create_session(session_id) summary = CONTEXT_STORE.get_context_summary(session_id) directive = CONTEXT_STORE.get_execution_directive(session_id) export_data = { "session_id": session_id, "exported_at": int(time.time()), "context": { "original_task": ctx.get("original_task", ""), "current_state": ctx.get("current_state", ExecutionState.IDLE), "message_count": ctx.get("message_count", 0), "completed_steps": ctx.get("completed_steps", []), "tool_results": ctx.get("tool_results", []), "pending_tool_calls": ctx.get("pending_tool_calls", []), "executed_tool_calls": ctx.get("executed_tool_calls", []), "described_but_not_executed": ctx.get("described_but_not_executed", []), "consecutive_description_count": ctx.get( "consecutive_description_count", 0 ), "next_required_action": ctx.get("next_required_action", ""), "provider_history": ctx.get("provider_history", [])[-5:], "created_at": ctx.get("created_at", 0), "last_updated": ctx.get("last_updated", 0), }, "human_readable_summary": summary, "current_execution_directive": directive, } return export_data # ===================================================== # نقطة: استيراد سياق جلسة # ===================================================== @app.post("/context/{session_id}/import") async def import_session_context(session_id: str, request: Request): """استيراد سياق جلسة من بيانات JSON""" verify_api_key(request) body = await request.json() context_data = body.get("context", {}) if not context_data: raise HTTPException(status_code=400, detail="context field is required") # التحقق من الحقول المطلوبة وتعيين القيم الافتراضية safe_context = { "original_task": context_data.get("original_task", ""), "task_breakdown": context_data.get("task_breakdown", []), "completed_steps": context_data.get("completed_steps", []), "tool_results": context_data.get("tool_results", []), "pending_tool_calls": context_data.get("pending_tool_calls", []), "executed_tool_calls": context_data.get("executed_tool_calls", []), "current_state": context_data.get("current_state", ExecutionState.IDLE), "next_required_action": context_data.get("next_required_action", ""), "created_at": context_data.get("created_at", time.time()), "last_updated": time.time(), "message_count": context_data.get("message_count", 0), "provider_history": context_data.get("provider_history", []), "last_model_response": context_data.get("last_model_response", ""), "described_but_not_executed": context_data.get("described_but_not_executed", []), "consecutive_description_count": context_data.get( "consecutive_description_count", 0 ), } with CONTEXT_STORE._lock: CONTEXT_STORE._store[session_id] = safe_context return { "message": f"Context imported for session {session_id}", "session_id": session_id, "state": safe_context["current_state"], "directive": CONTEXT_STORE.get_execution_directive(session_id) } # ===================================================== # نقطة: اختبار pattern matching للأنماط المحظورة # ===================================================== @app.post("/debug/test-patterns") async def debug_test_patterns(request: Request): """اختبار أنماط كشف الوصف بدون تنفيذ""" verify_api_key(request) body = await request.json() test_texts = body.get("texts", [ "سأقوم بإنشاء الملف الآن", "I will create the file", "Let me write the code", "I'm going to use write_file", "Here is the result: 42", "سأستخدم أداة write_file", "Now I'll create the Python file", "The file has been created successfully", "I'll use the write_file tool to save your code", "Using the write_file tool to create the file", ]) results = [] for text in test_texts: matched_patterns = [] for pattern in ResponseAnalyzer.DESCRIPTION_PHRASES: matches = re.findall(pattern, text, re.IGNORECASE) if matches: matched_patterns.append({ "pattern": pattern, "matches": matches }) is_description = len(matched_patterns) > 0 results.append({ "text": text, "is_description_without_execution": is_description, "matched_patterns_count": len(matched_patterns), "matched_patterns": matched_patterns[:3], # أول 3 فقط "verdict": ( "⚠️ DESCRIPTION (needs enforcement)" if is_description else "✅ OK (not a description pattern)" ) }) description_count = sum(1 for r in results if r["is_description_without_execution"]) return { "test_results": results, "summary": { "total_texts": len(results), "description_patterns_found": description_count, "clean_texts": len(results) - description_count, "total_patterns_available": len(ResponseAnalyzer.DESCRIPTION_PHRASES), } } # ===================================================== # نقطة: إحصائيات المزودين مع الأداء # ===================================================== @app.get("/providers/stats") async def get_provider_stats(request: Request): """إحصائيات المزودين مع معلومات الأداء من التاريخ""" verify_api_key(request) provider_usage = {} for pname in REAL_PROVIDERS.keys(): provider_usage[pname] = { "total_uses": 0, "recent_uses": 0, "models_used": set(), } # تحليل تاريخ الاستخدام من الجلسات with CONTEXT_STORE._lock: for sid, ctx in CONTEXT_STORE._store.items(): for ph in ctx.get("provider_history", []): pname = ph.get("provider", "") model = ph.get("model", "") ts = ph.get("timestamp", 0) if pname in provider_usage: provider_usage[pname]["total_uses"] += 1 if model: provider_usage[pname]["models_used"].add(model) # استخدام خلال آخر ساعة if time.time() - ts < 3600: provider_usage[pname]["recent_uses"] += 1 # تحويل sets إلى lists للـ JSON result = {} for pname, stats in provider_usage.items(): pobj = REAL_PROVIDERS.get(pname) available_models = discover_provider_models(pobj, pname) if pobj else [] result[pname] = { "available": True, "total_uses": stats["total_uses"], "recent_uses_last_hour": stats["recent_uses"], "models_actually_used": list(stats["models_used"]), "all_available_models": available_models, } return { "providers": result, "timestamp": int(time.time()) } # ===================================================== # نقطة: اختبار stream buffer مع أداة كاملة # ===================================================== @app.post("/debug/test-stream-with-tool") async def debug_test_stream_with_tool(request: Request): """اختبار stream buffer مع أداة كاملة مقسّمة على chunks""" verify_api_key(request) body = await request.json() chunk_size = body.get("chunk_size", 5) test_tools = [{ "name": "write_file", "description": "Write content to a file", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"} }, "required": ["path", "content"] } }] # نص يحتوي على tool call كاملة test_text = ( 'I will create the file for you.\n' '\n' '{\n' ' "name": "write_file",\n' ' "arguments": {\n' ' "path": "hello.py",\n' ' "content": "print(\'Hello World\')"\n' ' }\n' '}\n' '\n' 'The file has been created.' ) buffer = StreamToolBuffer(available_tools=test_tools) all_events = [] text_events = [] tool_events = [] # محاكاة الـ streaming chunk by chunk for i in range(0, len(test_text), chunk_size): chunk = test_text[i:i + chunk_size] events = buffer.feed(chunk) for event in events: all_events.append({ "chunk_index": i // chunk_size, "chunk_preview": chunk[:10], "event_type": event.get("type", "tool_use"), "event_preview": ( event.get("text", "")[:30] if event.get("type") == "text" else f"tool: {event.get('name', '')}" ) }) if event.get("type") == "text": text_events.append(event["text"]) else: tool_events.append(event) # تفريغ ما تبقى remaining = buffer.flush() for event in remaining: all_events.append({ "chunk_index": "flush", "event_type": event.get("type", "tool_use"), "event_preview": ( event.get("text", "")[:30] if event.get("type") == "text" else f"tool: {event.get('name', '')}" ) }) if event.get("type") == "text": text_events.append(event["text"]) else: tool_events.append(event) return { "input_text_length": len(test_text), "chunk_size": chunk_size, "total_chunks": (len(test_text) + chunk_size - 1) // chunk_size, "total_events": len(all_events), "text_events_count": len(text_events), "tool_events_count": len(tool_events), "reconstructed_text": "".join(text_events), "tool_calls_detected": tool_events, "success": len(tool_events) > 0, "verdict": ( "✅ Stream buffer correctly detected tool call" if len(tool_events) > 0 else "❌ Stream buffer missed the tool call" ), "all_events_sample": all_events[:20] } # ===================================================== # MAIN ENTRY POINT # ===================================================== if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 8000)) host = os.getenv("HOST", "0.0.0.0") workers = int(os.getenv("WORKERS", 1)) log_level = os.getenv("LOG_LEVEL", "info").lower() logger.info(f"Starting G4F Execution Engine v6.0.0") logger.info(f"Host: {host}, Port: {port}") logger.info(f"Workers: {workers}") logger.info(f"Cookie Status: {COOKIE_STATUS}") logger.info(f"Providers: {list(REAL_PROVIDERS.keys())}") uvicorn.run( "app:app", host=host, port=port, workers=workers, log_level=log_level, reload=False, access_log=True, )