caulde / app.py
bahi-bh's picture
Update app.py
b6386e6 verified
import os
import json
import time
import logging
import asyncio
import threading
import re
import uuid
from typing import Any, Dict, List, Optional, Tuple
from collections import OrderedDict
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, JSONResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import g4f
# =====================================================
# LOGGING
# =====================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("g4f-execution-engine")
# =====================================================
# COOKIES
# =====================================================
def _load_cookies_raw() -> Dict[str, Any]:
raw_env = (os.getenv("COOKIES_JSON") or "").strip()
if raw_env:
try:
return json.loads(raw_env)
except Exception as e:
logger.warning(f"Failed to load cookies from env: {e}")
try:
if os.path.exists("cookies.json"):
with open("cookies.json", "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load cookies from file: {e}")
return {}
def load_cookies() -> str:
data = _load_cookies_raw()
if not data:
return "⚠️ No Cookies"
try:
from g4f.cookies import set_cookies
except Exception:
return "⚠️ Cookies Found"
for domain, vals in data.items():
try:
dom = domain if "." in domain else f".{domain}.com"
if isinstance(vals, list):
vals = {x["name"]: x["value"] for x in vals if isinstance(x, dict)}
if isinstance(vals, dict):
set_cookies(dom, vals)
except Exception as e:
logger.warning(f"Cookie error for {domain}: {e}")
return "✅ Cookies Loaded"
COOKIE_STATUS = load_cookies()
# =====================================================
# EXECUTION STATE MACHINE
# =====================================================
class ExecutionState:
IDLE = "idle"
PLANNING = "planning"
TOOL_PENDING = "tool_pending"
TOOL_EXECUTING = "tool_executing"
TOOL_DONE = "tool_done"
AWAITING_NEXT = "awaiting_next"
COMPLETED = "completed"
class PendingToolCall:
def __init__(self, name: str, arguments: Dict, tool_id: str, reason: str = ""):
self.name = name
self.arguments = arguments
self.tool_id = tool_id
self.reason = reason
self.created_at = time.time()
self.attempts = 0
self.max_attempts = 3
def to_dict(self) -> Dict:
return {
"name": self.name,
"arguments": self.arguments,
"tool_id": self.tool_id,
"reason": self.reason,
"created_at": self.created_at,
"attempts": self.attempts
}
# =====================================================
# PERSISTENT CONTEXT STORE
# =====================================================
class PersistentContextStore:
def __init__(self):
self._store: Dict[str, Dict] = {}
self._lock = threading.Lock()
def get_or_create_session(self, session_id: str) -> Dict:
with self._lock:
if session_id not in self._store:
self._store[session_id] = {
"original_task": "",
"task_breakdown": [],
"completed_steps": [],
"tool_results": [],
"pending_tool_calls": [],
"executed_tool_calls": [],
"current_state": ExecutionState.IDLE,
"next_required_action": "",
"created_at": time.time(),
"last_updated": time.time(),
"message_count": 0,
"provider_history": [],
"last_model_response": "",
"described_but_not_executed": [],
"consecutive_description_count": 0,
}
return self._store[session_id]
def update_session(self, session_id: str, **kwargs):
with self._lock:
if session_id not in self._store:
self.get_or_create_session(session_id)
self._store[session_id].update(kwargs)
self._store[session_id]["last_updated"] = time.time()
def add_tool_result(self, session_id: str, tool_name: str, tool_id: str, result: str):
with self._lock:
if session_id not in self._store:
self.get_or_create_session(session_id)
# إزالة من المعلّقة
self._store[session_id]["pending_tool_calls"] = [
p for p in self._store[session_id].get("pending_tool_calls", [])
if p.get("tool_id") != tool_id
]
# إضافة للمنفَّذة
self._store[session_id]["executed_tool_calls"].append({
"tool_name": tool_name,
"tool_id": tool_id,
"timestamp": time.time()
})
# حفظ النتيجة
self._store[session_id]["tool_results"].append({
"tool_name": tool_name,
"tool_id": tool_id,
"result": result,
"timestamp": time.time()
})
# إعادة ضبط عداد الوصف
self._store[session_id]["consecutive_description_count"] = 0
self._store[session_id]["described_but_not_executed"] = []
self._store[session_id]["last_updated"] = time.time()
if self._store[session_id].get("current_state") == ExecutionState.TOOL_EXECUTING:
self._store[session_id]["current_state"] = ExecutionState.TOOL_DONE
def add_completed_step(self, session_id: str, step: str):
with self._lock:
if session_id not in self._store:
self.get_or_create_session(session_id)
self._store[session_id]["completed_steps"].append({
"step": step,
"timestamp": time.time()
})
self._store[session_id]["last_updated"] = time.time()
def register_pending_tool(self, session_id: str, tool_call: PendingToolCall):
with self._lock:
if session_id not in self._store:
self.get_or_create_session(session_id)
existing_ids = [
p.get("tool_id")
for p in self._store[session_id].get("pending_tool_calls", [])
]
if tool_call.tool_id not in existing_ids:
self._store[session_id]["pending_tool_calls"].append(tool_call.to_dict())
self._store[session_id]["current_state"] = ExecutionState.TOOL_PENDING
self._store[session_id]["last_updated"] = time.time()
def get_pending_tools(self, session_id: str) -> List[Dict]:
with self._lock:
if session_id not in self._store:
return []
return self._store[session_id].get("pending_tool_calls", [])
def increment_description_count(self, session_id: str, response_text: str) -> int:
"""
يزيد عداد "الوصف بدون تنفيذ" ويعيد العدد الحالي.
هذا يُصلح مشكلة "المشاهد الذكي" بتتبع دقيق لعدد المرات.
"""
with self._lock:
if session_id not in self._store:
self.get_or_create_session(session_id)
self._store[session_id]["described_but_not_executed"].append({
"response_preview": response_text[:300],
"timestamp": time.time()
})
count = self._store[session_id].get("consecutive_description_count", 0) + 1
self._store[session_id]["consecutive_description_count"] = count
self._store[session_id]["last_updated"] = time.time()
return count
def get_execution_directive(self, session_id: str) -> str:
"""
*** القلب الحقيقي للحل ***
يولّد توجيهاً إجبارياً يحدد ما يجب فعله الآن.
هذا يحل مشكلة "غياب الدفعة الأخيرة".
"""
with self._lock:
if session_id not in self._store:
return ""
ctx = self._store[session_id]
parts = []
state = ctx.get("current_state", ExecutionState.IDLE)
pending = ctx.get("pending_tool_calls", [])
executed = ctx.get("executed_tool_calls", [])
desc_count = ctx.get("consecutive_description_count", 0)
tool_results = ctx.get("tool_results", [])
# ===== الحالة 1: أداة معلّقة - أعلى أولوية =====
if pending and state == ExecutionState.TOOL_PENDING:
pending_tool = pending[0]
tool_name = pending_tool.get("name", "unknown")
tool_args = pending_tool.get("arguments", {})
tool_id = pending_tool.get("tool_id", "")
reason = pending_tool.get("reason", "")
parts.append("## ⚡ MANDATORY IMMEDIATE ACTION")
parts.append("")
parts.append(
f"Tool `{tool_name}` was registered as pending but NOT executed."
)
if reason:
parts.append(f"Reason: {reason}")
parts.append("")
parts.append("**COPY THIS EXACTLY INTO YOUR RESPONSE:**")
parts.append("")
parts.append("<tool_call>")
call_obj = {"name": tool_name, "arguments": tool_args}
if tool_id:
call_obj["id"] = tool_id
parts.append(json.dumps(call_obj, ensure_ascii=False, indent=2))
parts.append("</tool_call>")
parts.append("")
parts.append("NO other text is needed. Just the tool_call block above.")
return "\n".join(parts)
# ===== الحالة 2: وصف متكرر بدون تنفيذ =====
if desc_count >= 1:
parts.append("## 🚨 CRITICAL: EXECUTION FAILURE PATTERN DETECTED")
parts.append("")
parts.append(
f"You have described actions {desc_count} time(s) without executing them."
)
parts.append("")
parts.append("**THE ONLY ACCEPTABLE RESPONSE FORMAT:**")
parts.append("")
parts.append("```")
parts.append("<tool_call>")
parts.append('{"name": "TOOL_NAME", "arguments": {ACTUAL_ARGUMENTS}}')
parts.append("</tool_call>")
parts.append("```")
parts.append("")
parts.append("**FORBIDDEN PATTERNS (will be rejected):**")
parts.append('❌ "I will create..." → FORBIDDEN')
parts.append('❌ "I\'m going to use..." → FORBIDDEN')
parts.append('❌ "Let me write..." → FORBIDDEN')
parts.append('❌ "I\'ll use the tool..." → FORBIDDEN')
parts.append('❌ "سأقوم بـ..." → FORBIDDEN')
parts.append('❌ "سأستخدم أداة..." → FORBIDDEN')
parts.append("")
parts.append("**EXECUTE NOW. No descriptions. No explanations.**")
return "\n".join(parts)
# ===== الحالة 3: أداة منفَّذة، تحقق من الاكتمال =====
if tool_results and state == ExecutionState.TOOL_DONE:
last_result = tool_results[-1]
tool_name = last_result.get("tool_name", "")
next_action = ctx.get("next_required_action", "")
parts.append(f"## ✅ Tool `{tool_name}` Completed Successfully")
parts.append("")
if next_action:
parts.append(f"**Next Required Action:** {next_action}")
parts.append("Execute it now using the appropriate tool.")
else:
parts.append(
"Determine if the original task is now complete. "
"If more tools are needed, call them now. "
"If done, provide a brief confirmation."
)
return "\n".join(parts)
return ""
def get_context_summary(self, session_id: str) -> str:
with self._lock:
if session_id not in self._store:
return ""
ctx = self._store[session_id]
parts = []
if ctx.get("original_task"):
parts.append(f"## Original Task\n{ctx['original_task']}")
if ctx.get("completed_steps"):
steps_text = "\n".join(
f" ✅ {s['step']}"
for s in ctx["completed_steps"][-10:]
)
parts.append(f"## Completed Steps\n{steps_text}")
if ctx.get("executed_tool_calls"):
exec_text = "\n".join(
f" 🔧 {e['tool_name']} (id: {e['tool_id'][:12]}...)"
for e in ctx["executed_tool_calls"][-5:]
)
parts.append(f"## Tools Already Executed\n{exec_text}")
if ctx.get("tool_results"):
recent_results = ctx["tool_results"][-3:]
results_text = "\n".join(
f" - [{r['tool_name']}]: "
f"{r['result'][:300]}{'...' if len(r['result']) > 300 else ''}"
for r in recent_results
)
parts.append(f"## Tool Results Available\n{results_text}")
if ctx.get("pending_tool_calls"):
pending_text = "\n".join(
f" ⏳ {p['name']} (NOT YET EXECUTED)"
for p in ctx["pending_tool_calls"]
)
parts.append(f"## ⚠️ PENDING TOOLS (Must Execute)\n{pending_text}")
# التوجيه التنفيذي - الأهم دائماً
directive = self.get_execution_directive(session_id)
if directive:
parts.append(directive)
if not parts:
return ""
return (
"# Task Context & Execution State\n"
"# (This is your memory - use it to continue the task)\n\n"
+ "\n\n".join(parts)
)
def cleanup_old_sessions(self, max_age_hours: int = 24):
with self._lock:
now = time.time()
old_sessions = [
sid for sid, data in self._store.items()
if now - data.get("last_updated", 0) > max_age_hours * 3600
]
for sid in old_sessions:
del self._store[sid]
if old_sessions:
logger.info(f"[Context] Cleaned {len(old_sessions)} old sessions")
def extract_task_from_messages(self, messages: List[Dict]) -> str:
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str) and len(content) > 10:
return content[:500]
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
if len(text) > 10:
return text[:500]
return ""
CONTEXT_STORE = PersistentContextStore()
# =====================================================
# CACHE
# =====================================================
class TTLCache:
def __init__(self, max_size: int = 100, ttl_seconds: int = 300):
self.cache: OrderedDict = OrderedDict()
self.max_size = max_size
self.ttl = ttl_seconds
self._lock = threading.Lock()
self._last_cleanup = time.time()
self._cleanup_interval = 60
def _clean_expired(self):
now = time.time()
if now - self._last_cleanup < self._cleanup_interval:
return
self._last_cleanup = now
expired = [k for k, (_, ts) in self.cache.items() if now - ts > self.ttl]
for k in expired:
del self.cache[k]
def get(self, key: str) -> Optional[str]:
with self._lock:
if key in self.cache:
value, _ = self.cache[key]
self.cache.move_to_end(key)
return value
return None
def set(self, key: str, value: str):
with self._lock:
self._clean_expired()
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = (value, time.time())
CACHE = TTLCache(max_size=100, ttl_seconds=300)
# =====================================================
# PROVIDERS
# =====================================================
def get_provider(name: str):
try:
return getattr(g4f.Provider, name)
except Exception:
return None
REAL_PROVIDERS = {
"Blackbox": get_provider("Blackbox"),
"DeepSeek": get_provider("DeepSeek"),
"Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"),
"Copilot": get_provider("Copilot"),
"You": get_provider("You"),
"Bing": get_provider("Bing"),
"Qwen": get_provider("Qwen"),
}
REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v}
PROVIDER_MODELS_FALLBACK = {
"Blackbox": ["gpt-4o", "claude-3.5-sonnet", "llama-3.1-70b", "gemini-pro"],
"DeepSeek": ["deepseek-chat", "deepseek-coder"],
"Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"],
"Copilot": ["gpt-4o", "claude-3.5-sonnet"],
"You": ["gpt-4o", "claude-3.5-sonnet"],
"Bing": ["gpt-4o"],
"Qwen": ["qwen-max", "qwen-plus", "qwen-turbo"],
}
_PROVIDER_MODEL_CACHE = {}
def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]:
if provider_name in _PROVIDER_MODEL_CACHE:
return _PROVIDER_MODEL_CACHE[provider_name]
candidates = []
for attr in ("models", "model", "default_model", "available_models", "supported_models"):
try:
if hasattr(provider_obj, attr):
val = getattr(provider_obj, attr)
if isinstance(val, dict):
candidates.extend(str(k) for k in val.keys())
elif isinstance(val, (list, tuple, set)):
candidates.extend(str(i) for i in val)
elif val:
candidates.append(str(val))
except Exception:
pass
if not candidates:
candidates = PROVIDER_MODELS_FALLBACK.get(provider_name, ["gpt-4o"])
seen = set()
unique = [m for m in candidates if not (m in seen or seen.add(m))]
_PROVIDER_MODEL_CACHE[provider_name] = unique
return unique
# =====================================================
# STREAM CLEANER
# =====================================================
def clean_stream(chunk):
try:
if isinstance(chunk, dict):
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'text' in delta:
return delta['text']
return chunk.get('content') or chunk.get('text') or ""
if isinstance(chunk, str):
if chunk and chunk[0] == '{' and chunk[-1] == '}':
try:
data = json.loads(chunk)
if 'choices' in data and data['choices']:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
return data.get('content') or data.get('text') or ""
except Exception:
pass
if '\\' in chunk:
chunk = chunk.replace('\\n', '\n')
if '\\r' in chunk:
chunk = chunk.replace('\\r', '\r')
if '\\t' in chunk:
chunk = chunk.replace('\\t', ' ')
return chunk
return str(chunk)
except Exception as e:
logger.warning(f"clean_stream error: {e}")
return ""
# =====================================================
# استخراج النص
# =====================================================
def extract_text_from_content(content) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif item.get("type") == "tool_result":
tool_content = item.get("content", "")
tool_use_id = item.get("tool_use_id", "")
if isinstance(tool_content, list):
result_texts = []
for tc in tool_content:
if isinstance(tc, dict) and tc.get("type") == "text":
result_texts.append(tc.get("text", ""))
elif isinstance(tc, str):
result_texts.append(tc)
result_str = "\n".join(result_texts)
elif isinstance(tool_content, str):
result_str = tool_content
else:
result_str = str(tool_content)
parts.append(f"[Tool Result for {tool_use_id}]:\n{result_str}")
elif item.get("type") == "tool_use":
tool_name = item.get("name", "unknown")
tool_input = item.get("input", {})
tool_id = item.get("id", "")
parts.append(
f"[Tool Called: {tool_name} (id:{tool_id}) "
f"with: {json.dumps(tool_input, ensure_ascii=False)}]"
)
elif "text" in item:
parts.append(item["text"])
elif "content" in item:
parts.append(str(item["content"]))
return "\n".join(parts)
if content is not None:
return str(content)
return ""
# =====================================================
# RESPONSE ANALYZER - محسّن جذرياً
# =====================================================
class ResponseAnalyzer:
"""
يحلل رد النموذج لاكتشاف:
1. هل وصف فعلاً بدون تنفيذ؟
2. هل يحتوي على tool call فعلي؟
3. ما هي الأداة المقصودة؟
"""
# أنماط "الوصف بدون تنفيذ" - شاملة ودقيقة
DESCRIPTION_PHRASES = [
# عربي
r"سأقوم\s+ب",
r"سأستخدم\s+(?:أداة|الأداة|tool)",
r"سأكتب\s+(?:الكود|الملف|الآن)",
r"سأنشئ\s+(?:ملف|الملف|كود)",
r"سأحفظ\s+(?:الملف|الكود|الآن)",
r"سأبدأ\s+(?:الآن|بـ)",
r"دعني\s+(?:أنشئ|أكتب|أستخدم|أحفظ)",
# إنجليزي - I will
r"I\s+will\s+(?:now\s+)?(?:create|write|save|use|call|make|build|generate|execute)",
r"I\s+will\s+(?:now\s+)?(?:use|call)\s+the\s+\w+\s+tool",
# إنجليزي - I'm going to
r"I(?:'m|'m)\s+going\s+to\s+(?:create|write|save|use|call|make|build|generate)",
# إنجليزي - Let me
r"Let\s+me\s+(?:create|write|save|use|call|make|build|generate|now)",
# إنجليزي - Now I'll
r"Now\s+I(?:'ll|'m\s+going\s+to)\s+(?:create|write|save|use|call|make|build)",
# إنجليزي - I'll
r"I(?:'ll|'ll)\s+(?:now\s+)?(?:use|call)\s+the\s+\w+\s+(?:tool|function)",
r"I(?:'ll|'ll)\s+(?:now\s+)?(?:create|write|save|make|build|generate)",
# إنجليزي - using the tool
r"using\s+the\s+(?:write_file|create_file|save_file|search|browse|read_file)\s+tool",
# إنجليزي - To create/write
r"To\s+(?:create|write|save)\s+this\s+(?:file|code|content)",
# إنجليزي - I need to use
r"I\s+need\s+to\s+(?:use|call)\s+the\s+\w+\s+(?:tool|function)",
# الوصف في code block
r"```json\n\{[^}]*\"name\"\s*:",
]
@classmethod
def analyze(cls, response: str, available_tools: List[Dict] = None) -> Dict:
result = {
"has_actual_tool_call": False,
"has_description_without_execution": False,
"described_actions": [],
"needs_execution_push": False,
"tool_calls_found": [],
"intended_tool_name": None,
}
# التحقق من وجود tool call فعلي
clean_text, tool_calls = ToolCallParser.parse_tool_calls(response, available_tools)
result["has_actual_tool_call"] = len(tool_calls) > 0
result["tool_calls_found"] = tool_calls
# البحث عن عبارات الوصف
descriptions_found = []
for pattern in cls.DESCRIPTION_PHRASES:
matches = re.findall(pattern, response, re.IGNORECASE)
descriptions_found.extend(matches)
result["described_actions"] = descriptions_found
# الحكم النهائي
if descriptions_found and not result["has_actual_tool_call"]:
result["has_description_without_execution"] = True
result["needs_execution_push"] = True
# محاولة استخراج اسم الأداة المقصودة
if available_tools:
for tool in available_tools:
tool_name = tool.get("name", "")
if tool_name and tool_name.lower() in response.lower():
result["intended_tool_name"] = tool_name
break
return result
# =====================================================
# TOOL CALL PARSER - محسّن مع تسامح أعلى
# =====================================================
class ToolCallParser:
"""
يكتشف ويحلل وسوم Tool Call.
محسّن للتعامل مع التنسيقات المختلفة والأخطاء الطفيفة.
"""
START_MARKERS = [
'<tool_call>',
'```tool_call',
'```json',
'<function=',
'[tool_call]',
'[TOOL_CALL]',
'✿function✿',
'✿FUNCTION✿',
'{"tool_calls"',
'"tool_calls":',
]
@staticmethod
def generate_tool_id() -> str:
return f"toolu_{uuid.uuid4().hex[:24]}"
@classmethod
def might_contain_tool_call(cls, text: str) -> bool:
text_stripped = text.strip().lower()
for marker in cls.START_MARKERS:
if marker.lower() in text_stripped:
return True
if text_stripped.startswith('{') and '"name"' in text_stripped:
return True
return False
@classmethod
def parse_tool_calls(
cls,
text: str,
available_tools: Optional[List[Dict]] = None
) -> Tuple[str, List[Dict]]:
tool_calls = []
clean_text = text
# النمط 1: <tool_call>JSON</tool_call>
# مرن جداً - يتحمل مسافات وأسطر إضافية
pattern1 = re.compile(
r'\s*<tool_call>\s*(\{.*?\})\s*</tool_call>\s*',
re.DOTALL | re.IGNORECASE
)
for match in pattern1.finditer(text):
try:
raw_json = match.group(1).strip()
# تنظيف JSON من الأخطاء الشائعة
raw_json = cls._clean_json_string(raw_json)
parsed = json.loads(raw_json)
tool_call = cls._normalize_tool_call(parsed, available_tools)
if tool_call:
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError as e:
logger.warning(f"[Parser] JSON parse error: {e}")
fixed = cls._try_fix_json(match.group(1).strip())
if fixed:
tool_call = cls._normalize_tool_call(fixed, available_tools)
if tool_call:
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
# النمط 2: ```tool_call\nJSON\n```
if not tool_calls:
pattern2 = re.compile(
r'```(?:tool_call|json)?\s*\n?\s*(\{.*?"(?:name|function)".*?\})\s*\n?\s*```',
re.DOTALL | re.IGNORECASE
)
for match in pattern2.finditer(text):
try:
raw_json = match.group(1).strip()
raw_json = cls._clean_json_string(raw_json)
parsed = json.loads(raw_json)
tool_call = cls._normalize_tool_call(parsed, available_tools)
if tool_call:
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError:
pass
# النمط 3: <function=name>JSON</function>
if not tool_calls:
pattern3 = re.compile(
r'\s*<function=(\w+)>\s*(\{.*?\})\s*</function>\s*',
re.DOTALL | re.IGNORECASE
)
for match in pattern3.finditer(text):
try:
func_name = match.group(1)
args_json = json.loads(match.group(2).strip())
tool_call = {
"type": "tool_use",
"id": cls.generate_tool_id(),
"name": func_name,
"input": args_json
}
if available_tools:
tool_call = cls._validate_against_tools(tool_call, available_tools)
if tool_call:
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError:
pass
# النمط 4: [TOOL_CALL]...[/TOOL_CALL]
if not tool_calls:
pattern4 = re.compile(
r'\s*\[TOOL_CALL\]\s*(.*?)\s*\[/TOOL_CALL\]\s*',
re.DOTALL | re.IGNORECASE
)
for match in pattern4.finditer(text):
content_inner = match.group(1).strip()
try:
parsed = json.loads(content_inner)
tool_call = cls._normalize_tool_call(parsed, available_tools)
if tool_call:
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError:
func_match = re.match(r'(\w+)\s*\((.*)\)', content_inner, re.DOTALL)
if func_match:
try:
func_name = func_match.group(1)
args_str = func_match.group(2).strip()
args = json.loads(args_str) if args_str else {}
tool_call = {
"type": "tool_use",
"id": cls.generate_tool_id(),
"name": func_name,
"input": args
}
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError:
pass
# النمط 5: ✿FUNCTION✿
if not tool_calls:
pattern5 = re.compile(
r'✿FUNCTION✿:\s*(\w+)\s*\n✿ARGS✿:\s*(\{.*?\})\s*(?:\n✿RESULT✿)?',
re.DOTALL
)
for match in pattern5.finditer(text):
try:
func_name = match.group(1)
args_json = json.loads(match.group(2).strip())
tool_call = {
"type": "tool_use",
"id": cls.generate_tool_id(),
"name": func_name,
"input": args_json
}
tool_calls.append(tool_call)
clean_text = clean_text.replace(match.group(0), " ", 1)
except json.JSONDecodeError:
pass
# النمط 6: JSON مباشر يحتوي tool_calls
if not tool_calls:
try:
json_pattern = re.compile(
r'\{[^{}]*"tool_calls"[^{}]*\[.*?\]\s*\}',
re.DOTALL
)
for jm in json_pattern.finditer(text):
try:
parsed = json.loads(jm.group(0))
if "tool_calls" in parsed:
for tc in parsed["tool_calls"]:
func_data = tc.get("function", tc)
name = func_data.get("name", "")
args = func_data.get(
"arguments",
func_data.get("input", {})
)
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {"raw": args}
if name:
tool_call = {
"type": "tool_use",
"id": cls.generate_tool_id(),
"name": name,
"input": args
}
tool_calls.append(tool_call)
clean_text = clean_text.replace(jm.group(0), " ", 1)
except json.JSONDecodeError:
pass
except Exception:
pass
clean_text = clean_text.strip()
clean_text = re.sub(r'\n{3,}', '\n\n', clean_text)
clean_text = re.sub(r' +', ' ', clean_text)
return clean_text, tool_calls
@classmethod
def _clean_json_string(cls, json_str: str) -> str:
"""
تنظيف JSON من الأخطاء الشائعة.
هذا يحل مشكلة "ضياع الأوامر في الترجمة".
"""
# إزالة trailing commas
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
# إصلاح علامات الاقتباس المنحنية
json_str = json_str.replace('\u201c', '"').replace('\u201d', '"')
json_str = json_str.replace('\u2018', "'").replace('\u2019', "'")
# إزالة BOM
json_str = json_str.lstrip('\ufeff')
return json_str
@classmethod
def _normalize_tool_call(
cls,
parsed: Dict,
available_tools: Optional[List[Dict]] = None
) -> Optional[Dict]:
name = None
arguments = {}
if "function" in parsed and isinstance(parsed["function"], dict):
name = parsed["function"].get("name")
arguments = parsed["function"].get("arguments", {})
elif "name" in parsed:
name = parsed["name"]
arguments = (
parsed.get("arguments")
or parsed.get("parameters")
or parsed.get("input")
or parsed.get("args")
or {}
)
elif "tool" in parsed:
name = parsed["tool"]
arguments = parsed.get("args", parsed.get("arguments", parsed.get("input", {})))
if not name:
return None
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"raw_input": arguments}
tool_call = {
"type": "tool_use",
"id": cls.generate_tool_id(),
"name": str(name),
"input": arguments if isinstance(arguments, dict) else {"value": arguments}
}
if available_tools:
tool_call = cls._validate_against_tools(tool_call, available_tools)
return tool_call
@classmethod
def _validate_against_tools(
cls,
tool_call: Dict,
available_tools: List[Dict]
) -> Optional[Dict]:
requested_name = tool_call["name"]
tool_names = []
for tool in available_tools:
tool_name = tool.get("name", "")
if not tool_name and "function" in tool:
tool_name = tool["function"].get("name", "")
tool_names.append(tool_name)
if tool_name == requested_name:
return tool_call
requested_lower = requested_name.lower().replace("_", "").replace("-", "")
for tool_name in tool_names:
tool_lower = tool_name.lower().replace("_", "").replace("-", "")
if requested_lower == tool_lower:
tool_call["name"] = tool_name
return tool_call
if requested_lower in tool_lower or tool_lower in requested_lower:
tool_call["name"] = tool_name
return tool_call
logger.warning(f"[Validator] Tool '{requested_name}' not in: {tool_names}")
return tool_call
@classmethod
def _try_fix_json(cls, broken_json: str) -> Optional[Dict]:
# تنظيف أولي
fixed = cls._clean_json_string(broken_json)
# إغلاق الأقواس المفتوحة
open_braces = fixed.count('{') - fixed.count('}')
if open_braces > 0:
fixed += '}' * open_braces
open_brackets = fixed.count('[') - fixed.count(']')
if open_brackets > 0:
fixed += ']' * open_brackets
try:
return json.loads(fixed)
except json.JSONDecodeError:
pass
# محاولة استخراج JSON من وسط النص
try:
start = fixed.index('{')
depth = 0
for i in range(start, len(fixed)):
if fixed[i] == '{':
depth += 1
elif fixed[i] == '}':
depth -= 1
if depth == 0:
return json.loads(fixed[start:i + 1])
except (ValueError, json.JSONDecodeError):
pass
return None
# =====================================================
# STREAM TOOL BUFFER
# =====================================================
class StreamToolBuffer:
def __init__(self, available_tools: Optional[List[Dict]] = None):
self.buffer = ""
self.in_tool_call = False
self.tool_call_buffer = ""
self.available_tools = available_tools or []
self.pending_text = ""
self.tool_call_depth = 0
self._active_start_marker = ""
self._active_end_marker = ""
self._stream_started_at = time.time()
def feed(self, chunk: str) -> List[Dict]:
events = []
self.buffer += chunk
while self.buffer:
if self.in_tool_call:
events.extend(self._process_tool_call_mode())
if self.in_tool_call:
break
else:
events.extend(self._process_text_mode())
if self.in_tool_call:
continue
break
return events
def _process_text_mode(self) -> List[Dict]:
events = []
buffer_lower = self.buffer.lower()
earliest_pos = -1
earliest_marker = ""
for marker in ToolCallParser.START_MARKERS:
pos = buffer_lower.find(marker.lower())
if pos != -1 and (earliest_pos == -1 or pos < earliest_pos):
earliest_pos = pos
earliest_marker = marker
if earliest_pos == -1:
safe_length = len(self.buffer) - 100
if safe_length > 0:
text_to_send = self.buffer[:safe_length]
self.buffer = self.buffer[safe_length:]
if text_to_send:
events.append({"type": "text", "text": text_to_send})
else:
text_before = self.buffer[:earliest_pos]
if text_before and not text_before.isspace():
events.append({"type": "text", "text": text_before})
elif text_before and text_before.isspace():
events.append({"type": "text", "text": " "})
self.buffer = self.buffer[earliest_pos:]
self.in_tool_call = True
self.tool_call_buffer = ""
self._active_start_marker = earliest_marker
self._set_active_end_marker(earliest_marker)
return events
def _set_active_end_marker(self, start_marker: str):
start_lower = start_marker.lower()
if start_lower == '<tool_call>':
self._active_end_marker = '</tool_call>'
elif start_lower in ('```tool_call', '```json', '```'):
self._active_end_marker = '```'
elif start_lower.startswith('<function='):
self._active_end_marker = '</function>'
elif start_lower in ('[tool_call]', '[tool_call]'):
self._active_end_marker = '[/TOOL_CALL]'
elif start_lower == '✿function✿':
self._active_end_marker = ''
else:
self._active_end_marker = '</tool_call>'
def _process_tool_call_mode(self) -> List[Dict]:
events = []
end_markers_pairs = [
('</tool_call>', '<tool_call>'),
('```', '```tool_call'),
('```', '```json'),
('</function>', '<function='),
('[/tool_call]', '[tool_call]'),
('[/TOOL_CALL]', '[TOOL_CALL]'),
]
buffer_lower = self.buffer.lower()
found_end = False
for end_marker, start_marker in end_markers_pairs:
end_lower = end_marker.lower()
start_lower = start_marker.lower()
if not buffer_lower.startswith(start_lower):
continue
search_from = len(start_marker)
end_pos = buffer_lower.find(end_lower, search_from)
if end_pos != -1:
full_tool_text = self.buffer[:end_pos + len(end_marker)]
remaining = self.buffer[end_pos + len(end_marker):]
clean_text, tool_calls = ToolCallParser.parse_tool_calls(
full_tool_text, self.available_tools
)
for tc in tool_calls:
if not tc.get("id"):
tc["id"] = ToolCallParser.generate_tool_id()
logger.info(
f"[Buffer] Tool call detected: {tc['name']} "
f"(id: {tc['id']})"
)
events.append(tc)
if clean_text.strip():
events.append({"type": "text", "text": clean_text})
self.buffer = remaining
self.in_tool_call = False
self._active_start_marker = ""
self._active_end_marker = ""
found_end = True
break
if not found_end:
if len(self.buffer) > 5000:
logger.warning(
"[Buffer] Tool call buffer too large, treating as text"
)
events.append({"type": "text", "text": self.buffer})
self.buffer = ""
self.in_tool_call = False
self._active_start_marker = ""
self._active_end_marker = ""
return events
def flush(self) -> List[Dict]:
events = []
if self.in_tool_call and self.buffer:
logger.warning(
f"[Buffer] Stream ended with unclosed tool_call. "
f"Attempting manual closure..."
)
end_marker = self._active_end_marker or "</tool_call>"
buffer_with_close = self.buffer + end_marker
clean_text, tool_calls = ToolCallParser.parse_tool_calls(
buffer_with_close, self.available_tools
)
if tool_calls:
logger.info(
f"[Buffer] Manual closure SUCCESS: {len(tool_calls)} tool call(s)"
)
for tc in tool_calls:
if not tc.get("id"):
tc["id"] = ToolCallParser.generate_tool_id()
events.append(tc)
if clean_text.strip():
events.append({"type": "text", "text": clean_text})
else:
raw_buffer = self.buffer
if self._active_start_marker:
start_lower = self._active_start_marker.lower()
raw_lower = raw_buffer.lower()
if raw_lower.startswith(start_lower):
raw_buffer = raw_buffer[len(self._active_start_marker):]
json_start = raw_buffer.find('{')
if json_start != -1:
json_fragment = raw_buffer[json_start:].strip()
open_braces = json_fragment.count('{') - json_fragment.count('}')
if open_braces > 0:
json_fragment += '}' * open_braces
open_brackets = json_fragment.count('[') - json_fragment.count(']')
if open_brackets > 0:
json_fragment += ']' * open_brackets
repaired_text = (
self._active_start_marker + json_fragment + end_marker
)
clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls(
repaired_text, self.available_tools
)
if tool_calls2:
for tc in tool_calls2:
if not tc.get("id"):
tc["id"] = ToolCallParser.generate_tool_id()
events.append(tc)
if clean_text2.strip():
events.append({"type": "text", "text": clean_text2})
else:
if self.buffer.strip():
events.append({"type": "text", "text": self.buffer})
else:
if self.buffer.strip():
events.append({"type": "text", "text": self.buffer})
elif self.buffer:
if self.buffer.strip():
events.append({"type": "text", "text": self.buffer})
self.buffer = ""
self.in_tool_call = False
self._active_start_marker = ""
self._active_end_marker = ""
return events
# =====================================================
# TOOLS FORMATTER - محسّن
# =====================================================
class ToolsFormatter:
@staticmethod
def format_tools_for_prompt(tools: List[Dict], tool_choice: Any = None) -> str:
if not tools:
return ""
lines = []
lines.append("# Tools Available")
lines.append("")
lines.append(
"**CRITICAL RULE**: You are an EXECUTOR, not a DESCRIBER. "
"When you decide to use a tool, you MUST call it immediately "
"using the <tool_call> format below. "
"NEVER write 'I will use...' or 'I'm going to...' - just DO IT."
)
lines.append("")
lines.append("## ✅ CORRECT - How to call a tool:")
lines.append("<tool_call>")
lines.append('{"name": "tool_name", "arguments": {"param": "value"}}')
lines.append("</tool_call>")
lines.append("")
lines.append("## ❌ WRONG - What NOT to do:")
lines.append(' "I will now use the write_file tool to..." → FORBIDDEN')
lines.append(' "Let me create the file..." → FORBIDDEN')
lines.append(' "I\'ll use the tool to..." → FORBIDDEN')
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Available Tools:")
lines.append("")
for i, tool in enumerate(tools, 1):
name = tool.get("name", "unknown")
description = tool.get("description", "No description")
input_schema = tool.get("input_schema", {})
lines.append(f"### {i}. `{name}`")
lines.append(f"**Purpose:** {description}")
properties = input_schema.get("properties", {})
required = input_schema.get("required", [])
example_args = {}
if properties:
lines.append("**Parameters:**")
for param_name, param_info in properties.items():
param_type = param_info.get("type", "any")
param_desc = param_info.get("description", "")
is_required = param_name in required
req_marker = " *(required)*" if is_required else " *(optional)*"
lines.append(
f" - `{param_name}` ({param_type}{req_marker}): {param_desc}"
)
if "enum" in param_info:
lines.append(
f" Values: {', '.join(str(v) for v in param_info['enum'])}"
)
if param_name in required:
if param_type == "string":
example_args[param_name] = f"<{param_name}_value>"
elif param_type == "integer":
example_args[param_name] = 0
elif param_type == "boolean":
example_args[param_name] = True
elif param_type == "array":
example_args[param_name] = []
else:
example_args[param_name] = f"<{param_name}>"
lines.append(f"**Call it like this:**")
lines.append("<tool_call>")
example = {"name": name, "arguments": example_args}
lines.append(json.dumps(example, ensure_ascii=False, indent=2))
lines.append("</tool_call>")
lines.append("")
# معالجة tool_choice
if tool_choice:
if isinstance(tool_choice, dict):
if tool_choice.get("type") == "tool":
forced_tool = tool_choice.get("name", "")
if forced_tool:
lines.append("---")
lines.append(
f"**⚡ MANDATORY:** You MUST call `{forced_tool}` right now. "
f"Use the <tool_call> format above."
)
elif tool_choice.get("type") == "any":
lines.append("---")
lines.append(
"**⚡ MANDATORY:** You MUST call at least one tool. "
"Use the <tool_call> format above."
)
elif tool_choice == "any":
lines.append("---")
lines.append("**⚡ MANDATORY:** Use at least one tool now.")
lines.append("")
return "\n".join(lines)
@staticmethod
def format_tool_result_for_message(tool_use_id: str, content: Any) -> str:
result_text = ""
if isinstance(content, str):
result_text = content
elif isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif item.get("type") == "image":
parts.append("[Image content]")
else:
parts.append(json.dumps(item, ensure_ascii=False))
elif isinstance(item, str):
parts.append(item)
result_text = "\n".join(parts)
elif isinstance(content, dict):
result_text = json.dumps(content, ensure_ascii=False, indent=2)
else:
result_text = str(content) if content else ""
return (
f"[Tool Result - ID: {tool_use_id}]\n"
f"{result_text}\n"
f"[End Tool Result]"
)
# =====================================================
# MESSAGE CONVERTER
# =====================================================
class MessageConverter:
@staticmethod
def convert_messages(
messages: List[Dict],
system_prompt: str = "",
tools: Optional[List[Dict]] = None,
tool_choice: Any = None,
session_id: str = ""
) -> Tuple[str, List[Dict]]:
history = []
full_system = system_prompt if system_prompt else ""
# إضافة تعريف الأدوات
if tools:
tools_text = ToolsFormatter.format_tools_for_prompt(tools, tool_choice)
if full_system:
full_system = f"{full_system}\n\n{tools_text}"
else:
full_system = tools_text
# حقن سياق المهمة مع التوجيه التنفيذي
if session_id:
context_summary = CONTEXT_STORE.get_context_summary(session_id)
if context_summary:
if full_system:
full_system = f"{full_system}\n\n{context_summary}"
else:
full_system = context_summary
logger.info(
f"[Context] Injected context+directive for session {session_id[:8]}..."
)
# معالجة الرسائل
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
sys_text = extract_text_from_content(content)
if full_system:
full_system = f"{full_system}\n\n{sys_text}"
else:
full_system = sys_text
continue
converted_text = MessageConverter._convert_content(content, role)
if converted_text:
g4f_role = "user" if role == "user" else "assistant"
history.append({"role": g4f_role, "content": converted_text})
# استخراج الرسالة الأخيرة
if history:
last_msg = history.pop()
user_message = last_msg["content"]
else:
user_message = ""
# بناء الرسالة الكاملة
if full_system:
full_message = (
f"[System Instructions]\n{full_system}\n"
f"[/System Instructions]\n\n{user_message}"
)
else:
full_message = user_message
return full_message, history
@staticmethod
def _convert_content(content: Any, role: str) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict):
block_type = block.get("type", "")
if block_type == "text":
parts.append(block.get("text", ""))
elif block_type == "tool_use":
name = block.get("name", "unknown")
input_data = block.get("input", {})
tool_id = block.get("id", "")
parts.append(
f"<tool_call>"
f'{{"name": "{name}", '
f'"id": "{tool_id}", '
f'"arguments": {json.dumps(input_data, ensure_ascii=False)}}}'
f"</tool_call>"
)
elif block_type == "tool_result":
tool_use_id = block.get("tool_use_id", "")
result_content = block.get("content", "")
is_error = block.get("is_error", False)
result_text = ToolsFormatter.format_tool_result_for_message(
tool_use_id, result_content
)
if is_error:
result_text = f"[ERROR] {result_text}"
parts.append(result_text)
elif block_type == "image":
parts.append("[Image content - not supported in text mode]")
else:
if "text" in block:
parts.append(block["text"])
else:
parts.append(json.dumps(block, ensure_ascii=False))
return "\n".join(parts)
if content is not None:
return str(content)
return ""
# =====================================================
# SMART HISTORY MANAGER
# =====================================================
class SmartHistoryManager:
FIRST_MESSAGES_TO_KEEP = 5
LAST_MESSAGES_TO_KEEP = 20
MAX_TOTAL_MESSAGES = 30
@classmethod
def process_history(
cls,
history: List[Dict],
session_id: str = ""
) -> List[Dict]:
if not history:
return []
if session_id:
cls._extract_and_store_tool_results(history, session_id)
total = len(history)
if total <= cls.MAX_TOTAL_MESSAGES:
logger.info(f"[History] Full history: {total} messages")
return history
first_part = history[:cls.FIRST_MESSAGES_TO_KEEP]
last_part = history[-cls.LAST_MESSAGES_TO_KEEP:]
middle_part = history[cls.FIRST_MESSAGES_TO_KEEP:-cls.LAST_MESSAGES_TO_KEEP]
if middle_part:
summary = cls._summarize_middle(middle_part)
if summary:
summary_msg = {
"role": "assistant",
"content": (
f"[Context Summary - {len(middle_part)} messages compressed]\n"
f"{summary}"
)
}
smart_history = first_part + [summary_msg] + last_part
else:
smart_history = first_part + last_part
else:
smart_history = first_part + last_part
logger.info(
f"[History] Smart compression: {total}{len(smart_history)} messages"
)
return smart_history
@classmethod
def _extract_and_store_tool_results(
cls,
history: List[Dict],
session_id: str
):
for msg in history:
content = msg.get("content", "")
if isinstance(content, str):
if "[Tool Result" in content:
tool_match = re.search(
r'\[Tool Result - ID: ([^\]]+)\]\n(.*?)\n\[End Tool Result\]',
content,
re.DOTALL
)
if tool_match:
tool_id = tool_match.group(1)
result = tool_match.group(2)[:500]
CONTEXT_STORE.add_tool_result(
session_id, "tool", tool_id, result
)
@classmethod
def _summarize_middle(cls, messages: List[Dict]) -> str:
if not messages:
return ""
summary_parts = []
for msg in messages:
role = msg.get("role", "")
content = str(msg.get("content", ""))[:200]
summary_parts.append(f"[{role}]: {content}...")
return "\n".join(summary_parts[:10])
# =====================================================
# EXECUTION ENFORCER - القلب الحقيقي للحل
# =====================================================
class ExecutionEnforcer:
"""
المحرك التنفيذي الإجباري.
يحل المشاكل الأربع:
1. "المشاهد الذكي" → يكتشف الوصف ويُجبر على التنفيذ
2. "ضياع الأوامر" → يُعيد الطلب بتنسيق صحيح ومضمون
3. "غياب الدفعة الأخيرة" → يُرسل رسالة إجبارية للتنفيذ
4. "تشتت الهوية" → يُعرّف النموذج كـ EXECUTOR لا DESCRIBER
"""
MAX_EXECUTION_RETRIES = 2
@classmethod
def build_forced_execution_message(
cls,
tool_name: str,
tool_args: Dict,
reason: str = ""
) -> str:
"""
يبني رسالة تُجبر النموذج على تنفيذ أداة محددة.
هذا هو "الدفعة الأخيرة" المفقودة.
"""
tool_call_json = json.dumps(
{"name": tool_name, "arguments": tool_args},
ensure_ascii=False,
indent=2
)
msg_parts = [
"EXECUTE THIS TOOL CALL NOW.",
"",
]
if reason:
msg_parts.append(f"Reason: {reason}")
msg_parts.append("")
msg_parts.extend([
"Copy this EXACTLY:",
"",
"<tool_call>",
tool_call_json,
"</tool_call>",
"",
"No other text needed. Just the tool_call block."
])
return "\n".join(msg_parts)
@classmethod
def pre_process_request(
cls,
message: str,
session_id: str,
tools: List[Dict]
) -> str:
"""
معالجة الطلب قبل إرساله.
يضيف توجيهات إجبارية إذا كانت هناك أداة معلّقة.
"""
if not session_id or not tools:
return message
ctx = CONTEXT_STORE.get_or_create_session(session_id)
state = ctx.get("current_state", ExecutionState.IDLE)
pending = ctx.get("pending_tool_calls", [])
desc_count = ctx.get("consecutive_description_count", 0)
# إذا كانت هناك أداة معلّقة
if pending and state == ExecutionState.TOOL_PENDING:
pending_tool = pending[0]
tool_name = pending_tool.get("name", "unknown")
tool_args = pending_tool.get("arguments", {})
reminder = (
f"\n\n[EXECUTION REMINDER]: "
f"You must call `{tool_name}` using:\n"
f"<tool_call>\n"
f"{json.dumps({'name': tool_name, 'arguments': tool_args}, ensure_ascii=False, indent=2)}\n"
f"</tool_call>"
)
return message + reminder
# إذا وصف بدون تنفيذ مرات متعددة
if desc_count >= 2:
warning = (
f"\n\n[SYSTEM WARNING - EXECUTION REQUIRED]: "
f"You have described actions {desc_count} times without executing them. "
f"Your response MUST contain a <tool_call> block. "
f"Descriptions are not acceptable."
)
return message + warning
return message
@classmethod
def post_process_response(
cls,
response: str,
session_id: str,
tools: List[Dict],
tool_calls_found: List[Dict]
) -> Dict:
"""
تحليل الرد بعد استلامه.
يقرر هل يحتاج "دفعة" أخرى.
"""
if not session_id:
return {"needs_retry": False, "retry_message": "", "analysis": {}}
analysis = ResponseAnalyzer.analyze(response, tools)
# إذا وجد tool calls فعلية - نجاح
if tool_calls_found:
for tc in tool_calls_found:
CONTEXT_STORE.add_completed_step(
session_id,
f"Executed tool: {tc.get('name', 'unknown')} "
f"with args: {json.dumps(tc.get('input', {}))[:100]}"
)
# إعادة ضبط عداد الوصف
CONTEXT_STORE.update_session(
session_id,
described_but_not_executed=[],
consecutive_description_count=0,
current_state=ExecutionState.TOOL_EXECUTING
)
return {"needs_retry": False, "retry_message": "", "analysis": analysis}
# إذا وصف بدون تنفيذ
if analysis["has_description_without_execution"]:
desc_count = CONTEXT_STORE.increment_description_count(session_id, response)
if desc_count <= cls.MAX_EXECUTION_RETRIES:
intended_tool = analysis.get("intended_tool_name")
if intended_tool:
# بناء رسالة إجبارية للأداة المقصودة
tool_def = next(
(t for t in tools if t.get("name") == intended_tool),
None
)
example_args = {}
if tool_def:
schema = tool_def.get("input_schema", {})
for param_name in schema.get("required", []):
param_info = schema.get("properties", {}).get(param_name, {})
param_type = param_info.get("type", "string")
if param_type == "string":
example_args[param_name] = f"<{param_name}>"
elif param_type == "integer":
example_args[param_name] = 0
elif param_type == "boolean":
example_args[param_name] = True
else:
example_args[param_name] = None
retry_message = cls.build_forced_execution_message(
intended_tool,
example_args,
f"You mentioned using {intended_tool} but didn't call it"
)
else:
retry_message = (
"You described an action without executing it. "
"Use a <tool_call> block now:\n\n"
"<tool_call>\n"
'{"name": "TOOL_NAME", "arguments": {ARGUMENTS}}\n'
"</tool_call>"
)
return {
"needs_retry": True,
"retry_message": retry_message,
"analysis": analysis
}
return {"needs_retry": False, "retry_message": "", "analysis": analysis}
# =====================================================
# CHAT LOGIC - مع محرك التنفيذ الكامل
# =====================================================
def ask(
message: str,
history: List,
provider_name: str,
model_name: str,
stop_flag=None,
tools: Optional[List[Dict]] = None,
session_id: str = ""
):
"""
دالة الدردشة الرئيسية مع محرك التنفيذ الإجباري.
"""
message = (message or "").strip()
if not message:
yield ""
return
# تطبيق المعالجة المسبقة للتنفيذ
if tools and session_id:
message = ExecutionEnforcer.pre_process_request(message, session_id, tools)
# لا كاش عند وجود أدوات
if not tools:
key = f"{provider_name}|{model_name}|{message[:200]}"
cached = CACHE.get(key)
if cached:
yield cached
return
else:
key = None
# معالجة التاريخ
msgs = []
try:
if history:
if isinstance(history, list) and len(history) > 0:
if isinstance(history[0], dict):
smart_history = SmartHistoryManager.process_history(
history, session_id
)
for item in smart_history:
role = item.get("role")
content = item.get("content")
if role and content:
text = (
extract_text_from_content(content)
if not isinstance(content, str)
else content
)
if text:
msgs.append({"role": str(role), "content": text})
else:
if len(history) > 30:
smart_history_tuples = history[:5] + history[-25:]
else:
smart_history_tuples = history
for item in smart_history_tuples:
if isinstance(item, (list, tuple)) and len(item) == 2:
if item[0]:
msgs.append({"role": "user", "content": str(item[0])})
if item[1]:
msgs.append({"role": "assistant", "content": str(item[1])})
except Exception as e:
logger.warning(f"[History] Error: {e}")
msgs.append({"role": "user", "content": message})
# تحديث المخزن الدائم
if session_id:
ctx = CONTEXT_STORE.get_or_create_session(session_id)
if not ctx.get("original_task") and len(msgs) <= 2:
CONTEXT_STORE.update_session(
session_id,
original_task=message[:500],
current_state=ExecutionState.PLANNING
)
ctx["message_count"] = ctx.get("message_count", 0) + 1
# قائمة المزودين
all_provider_names = list(REAL_PROVIDERS.keys())
if provider_name in all_provider_names:
fallback_providers = [provider_name] + [
p for p in all_provider_names if p != provider_name
]
else:
fallback_providers = all_provider_names
used = []
for pname in fallback_providers:
if pname in used:
continue
used.append(pname)
pobj = REAL_PROVIDERS.get(pname)
if not pobj:
continue
models_list = discover_provider_models(pobj, pname)
if not models_list:
continue
if model_name in models_list:
model_candidates = [model_name] + [m for m in models_list if m != model_name]
else:
model_candidates = models_list
for m in model_candidates[:5]:
try:
logger.info(f"[Fallback] Trying {pname}/{m}")
if session_id:
provider_history = CONTEXT_STORE._store.get(
session_id, {}
).get("provider_history", [])
provider_history.append({
"provider": pname,
"model": m,
"timestamp": time.time()
})
CONTEXT_STORE._store[session_id]["provider_history"] = (
provider_history[-20:]
)
stream = g4f.ChatCompletion.create(
model=m,
provider=pobj,
messages=msgs,
stream=True,
timeout=60
)
buffer = []
got_response = False
for chunk in stream:
if stop_flag and stop_flag.is_set():
return
c = clean_stream(chunk)
if not c:
continue
got_response = True
buffer.append(c)
yield c
full = "".join(buffer)
if full.strip() and got_response:
if key:
CACHE.set(key, full)
if session_id:
CONTEXT_STORE.update_session(
session_id,
last_model_response=full[:500],
current_state=ExecutionState.AWAITING_NEXT
)
return
except Exception as e:
logger.warning(
f"[Fallback] {pname}/{m} failed: {str(e)[:200]}"
)
continue
yield "❌ فشلت جميع المزودات. تأكد من اتصال الإنترنت أو حاول لاحقاً."
# =====================================================
# SESSION ID EXTRACTOR
# =====================================================
def extract_session_id(request: Request, body: Dict) -> str:
session_id = (
request.headers.get("X-Session-ID", "")
or request.headers.get("x-session-id", "")
or request.headers.get("X-Conversation-ID", "")
or ""
)
if session_id:
return session_id
metadata = body.get("metadata", {})
if isinstance(metadata, dict):
session_id = (
metadata.get("session_id", "")
or metadata.get("conversation_id", "")
or metadata.get("user_id", "")
or ""
)
if session_id:
return session_id
messages = body.get("messages", [])
if messages:
first_msg = messages[0]
content = str(first_msg.get("content", ""))[:100]
import hashlib
hash_val = hashlib.md5(content.encode()).hexdigest()[:16]
return f"auto_{hash_val}"
return f"rand_{uuid.uuid4().hex[:16]}"
# =====================================================
# FASTAPI
# =====================================================
app = FastAPI(
title="G4F Execution Engine",
description="AI Gateway with Execution Enforcement"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
API_KEY = os.getenv("API_KEY", "mysecretkey123")
class ChatRequest(BaseModel):
message: str
provider: str = "Blackbox"
model: str = "gpt-4o"
history: List[Any] = []
session_id: str = ""
def verify_api_key(request: Request):
auth = request.headers.get("Authorization", "").strip()
x_key = request.headers.get("X-API-Key", "").strip()
x_api_key = request.headers.get("x-api-key", "").strip()
if auth.startswith("Bearer "):
key = auth[7:].strip()
if key and key == API_KEY:
return True
if x_key and x_key == API_KEY:
return True
if x_api_key and x_api_key == API_KEY:
return True
raise HTTPException(
status_code=401,
detail="Invalid API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'"
)
# =====================================================
# دعم HEAD
# =====================================================
@app.head("/")
async def head_root():
return Response(status_code=200)
@app.head("/health")
async def head_health():
return Response(status_code=200)
@app.head("/v1/models")
async def head_models():
return Response(status_code=200)
@app.head("/v1/messages")
async def head_messages():
return Response(status_code=200)
@app.head("/v1/chat/completions")
async def head_chat_completions():
return Response(status_code=200)
# =====================================================
# نقاط نهاية Anthropic API
# =====================================================
@app.get("/v1/models")
async def v1_models(request: Request):
models = []
seen_ids = set()
for pname, pobj in REAL_PROVIDERS.items():
models_list = discover_provider_models(pobj, pname)
for model in models_list[:5]:
if model not in seen_ids:
seen_ids.add(model)
models.append({
"id": model,
"object": "model",
"created": int(time.time()),
"owned_by": pname,
"type": "model",
"display_name": f"{pname} - {model}"
})
if not models:
models = [{
"id": "gpt-4o",
"object": "model",
"created": int(time.time()),
"owned_by": "system",
"type": "model",
"display_name": "Default"
}]
return {"object": "list", "data": models}
@app.post("/v1/messages")
async def v1_messages(request: Request):
verify_api_key(request)
body = await request.json()
session_id = extract_session_id(request, body)
logger.info(
f"[API] /v1/messages - model={body.get('model')}, "
f"stream={body.get('stream')}, "
f"tools_count={len(body.get('tools', []))}, "
f"session={session_id[:8]}..."
)
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
model = body.get("model", "gpt-4o")
system_prompt = body.get("system", "")
if isinstance(system_prompt, list):
sys_parts = []
for sp in system_prompt:
if isinstance(sp, dict) and sp.get("type") == "text":
sys_parts.append(sp.get("text", ""))
elif isinstance(sp, str):
sys_parts.append(sp)
system_prompt = "\n".join(sys_parts)
is_stream = body.get("stream", False)
max_tokens = body.get("max_tokens", 4096)
tools = body.get("tools", [])
tool_choice = body.get("tool_choice", "auto")
metadata = body.get("metadata", {})
# حفظ المهمة الأصلية
ctx = CONTEXT_STORE.get_or_create_session(session_id)
if not ctx.get("original_task"):
original_task = CONTEXT_STORE.extract_task_from_messages(messages)
if original_task:
CONTEXT_STORE.update_session(session_id, original_task=original_task)
# تحويل الرسائل
full_message, history = MessageConverter.convert_messages(
messages, system_prompt, tools, tool_choice, session_id
)
logger.info(
f"[API] Message length: {len(full_message)}, "
f"history: {len(history)}, "
f"tools: {[t.get('name', '') for t in tools]}"
)
if is_stream:
return await _handle_anthropic_stream(
full_message, history, model, max_tokens,
tools, tool_choice, metadata, session_id
)
# Non-stream
full_response = ""
for chunk in ask(
full_message, history, "Blackbox", model,
tools=tools, session_id=session_id
):
full_response += chunk
clean_text, tool_calls = ToolCallParser.parse_tool_calls(full_response, tools)
# تطبيق محرك التنفيذ
if tools and session_id:
enforcement = ExecutionEnforcer.post_process_response(
full_response, session_id, tools, tool_calls
)
if enforcement["needs_retry"]:
logger.info(
f"[Enforcer] Retry needed: {enforcement['retry_message'][:100]}"
)
retry_response = ""
for chunk in ask(
enforcement["retry_message"], [], "Blackbox", model,
tools=tools, session_id=session_id
):
retry_response += chunk
if retry_response:
clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls(
retry_response, tools
)
if tool_calls2:
tool_calls = tool_calls2
clean_text = clean_text2
logger.info(
f"[Enforcer] Retry SUCCESS: {len(tool_calls2)} tool calls"
)
message_id = f"msg_{int(time.time())}_{os.urandom(4).hex()}"
input_tokens = max(1, len(full_message) // 4)
output_tokens = max(1, len(full_response) // 4)
content_blocks = []
if clean_text.strip():
content_blocks.append({"type": "text", "text": clean_text.strip()})
for tc in tool_calls:
tool_id = tc.get("id") or ToolCallParser.generate_tool_id()
content_blocks.append({
"type": "tool_use",
"id": tool_id,
"name": tc["name"],
"input": tc.get("input", {})
})
logger.info(f"[API] Tool call: {tc['name']} (id: {tool_id})")
if session_id:
CONTEXT_STORE.add_completed_step(
session_id,
f"Called tool: {tc['name']} with {json.dumps(tc.get('input', {}))[:100]}"
)
if not content_blocks:
content_blocks.append({"type": "text", "text": full_response or ""})
stop_reason = "tool_use" if tool_calls else "end_turn"
return {
"id": message_id,
"type": "message",
"role": "assistant",
"content": content_blocks,
"model": model,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
}
# =====================================================
# معالج الـ streaming
# =====================================================
async def _handle_anthropic_stream(
full_message: str,
history: list,
model: str,
max_tokens: int,
tools: List[Dict] = None,
tool_choice: Any = None,
metadata: Dict = None,
session_id: str = ""
):
async def generate_stream():
message_id = f"msg_{int(time.time())}_{os.urandom(4).hex()}"
tools_list = tools or []
msg_start = {
"type": "message_start",
"message": {
"id": message_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {
"input_tokens": max(1, len(full_message) // 4),
"output_tokens": 0
}
}
}
yield (
f"event: message_start\n"
f"data: {json.dumps(msg_start, ensure_ascii=False)}\n\n"
)
tool_buffer = StreamToolBuffer(available_tools=tools_list)
current_block_index = 0
text_block_open = False
output_tokens = 0
has_tool_calls = False
detected_tool_calls = []
full_response_buffer = []
def make_text_block_start(index: int) -> str:
block_start = {
"type": "content_block_start",
"index": index,
"content_block": {"type": "text", "text": ""}
}
return (
f"event: content_block_start\n"
f"data: {json.dumps(block_start, ensure_ascii=False)}\n\n"
)
def make_text_delta(index: int, text: str) -> str:
delta_event = {
"type": "content_block_delta",
"index": index,
"delta": {"type": "text_delta", "text": text}
}
return (
f"event: content_block_delta\n"
f"data: {json.dumps(delta_event, ensure_ascii=False)}\n\n"
)
def make_block_stop(index: int) -> str:
block_stop = {"type": "content_block_stop", "index": index}
return (
f"event: content_block_stop\n"
f"data: {json.dumps(block_stop, ensure_ascii=False)}\n\n"
)
def make_tool_use_events(index: int, tool_call: Dict) -> str:
events_str = ""
tool_id = tool_call.get("id") or ToolCallParser.generate_tool_id()
tool_name = tool_call.get("name", "unknown")
tool_input = tool_call.get("input", {})
logger.info(f"[Stream] Sending tool_use: {tool_name} (id: {tool_id})")
block_start = {
"type": "content_block_start",
"index": index,
"content_block": {
"type": "tool_use",
"id": tool_id,
"name": tool_name,
"input": {}
}
}
events_str += (
f"event: content_block_start\n"
f"data: {json.dumps(block_start, ensure_ascii=False)}\n\n"
)
input_json = json.dumps(tool_input, ensure_ascii=False)
chunk_size = 100
for i in range(0, len(input_json), chunk_size):
json_chunk = input_json[i:i + chunk_size]
delta_event = {
"type": "content_block_delta",
"index": index,
"delta": {
"type": "input_json_delta",
"partial_json": json_chunk
}
}
events_str += (
f"event: content_block_delta\n"
f"data: {json.dumps(delta_event, ensure_ascii=False)}\n\n"
)
events_str += make_block_stop(index)
return events_str
# معالجة الـ Stream
try:
for chunk in ask(
full_message, history, "Blackbox", model,
tools=tools_list, session_id=session_id
):
if not chunk:
continue
full_response_buffer.append(chunk)
output_tokens += max(1, len(chunk) // 4)
if not tools_list:
if not text_block_open:
yield make_text_block_start(current_block_index)
text_block_open = True
yield make_text_delta(current_block_index, chunk)
continue
events = tool_buffer.feed(chunk)
for event in events:
if event.get("type") == "text":
text = event["text"]
if text:
if not text_block_open:
yield make_text_block_start(current_block_index)
text_block_open = True
yield make_text_delta(current_block_index, text)
elif event.get("type") == "tool_use":
if text_block_open:
yield make_block_stop(current_block_index)
current_block_index += 1
text_block_open = False
has_tool_calls = True
detected_tool_calls.append(event)
yield make_tool_use_events(current_block_index, event)
current_block_index += 1
# تفريغ الـ buffer
logger.info("[Stream] Flushing buffer...")
remaining_events = tool_buffer.flush()
for event in remaining_events:
if event.get("type") == "text":
text = event["text"]
if text:
if not text_block_open:
yield make_text_block_start(current_block_index)
text_block_open = True
yield make_text_delta(current_block_index, text)
elif event.get("type") == "tool_use":
if text_block_open:
yield make_block_stop(current_block_index)
current_block_index += 1
text_block_open = False
has_tool_calls = True
detected_tool_calls.append(event)
logger.info(
f"[Stream] Recovered tool call: {event.get('name')}"
)
yield make_tool_use_events(current_block_index, event)
current_block_index += 1
# ===== تطبيق محرك التنفيذ على الـ stream =====
if tools_list and session_id and not has_tool_calls:
full_response = "".join(full_response_buffer)
enforcement = ExecutionEnforcer.post_process_response(
full_response, session_id, tools_list, detected_tool_calls
)
if enforcement["needs_retry"]:
logger.info(
f"[Stream Enforcer] Sending execution push..."
)
retry_tool_buffer = StreamToolBuffer(available_tools=tools_list)
for retry_chunk in ask(
enforcement["retry_message"], [], "Blackbox", model,
tools=tools_list, session_id=session_id
):
if not retry_chunk:
continue
retry_events = retry_tool_buffer.feed(retry_chunk)
for event in retry_events:
if event.get("type") == "text":
text = event["text"]
if text:
if not text_block_open:
yield make_text_block_start(current_block_index)
text_block_open = True
yield make_text_delta(current_block_index, text)
elif event.get("type") == "tool_use":
if text_block_open:
yield make_block_stop(current_block_index)
current_block_index += 1
text_block_open = False
has_tool_calls = True
detected_tool_calls.append(event)
logger.info(
f"[Stream Enforcer] Tool call in retry: "
f"{event.get('name')}"
)
yield make_tool_use_events(current_block_index, event)
current_block_index += 1
# تفريغ retry buffer
retry_remaining = retry_tool_buffer.flush()
for event in retry_remaining:
if event.get("type") == "tool_use":
if text_block_open:
yield make_block_stop(current_block_index)
current_block_index += 1
text_block_open = False
has_tool_calls = True
detected_tool_calls.append(event)
yield make_tool_use_events(current_block_index, event)
current_block_index += 1
except Exception as e:
logger.error(f"[Stream] Error: {str(e)}")
if not text_block_open:
yield make_text_block_start(current_block_index)
text_block_open = True
yield make_text_delta(current_block_index, f"\n\n[Error: {str(e)}]")
# إغلاق الـ blocks المفتوحة
if text_block_open:
yield make_block_stop(current_block_index)
elif current_block_index == 0 and not has_tool_calls:
yield make_text_block_start(0)
yield make_text_delta(0, "")
yield make_block_stop(0)
# حفظ الأدوات المكتشفة
if session_id and detected_tool_calls:
for tc in detected_tool_calls:
CONTEXT_STORE.add_completed_step(
session_id,
f"Called tool: {tc.get('name', 'unknown')} "
f"with {json.dumps(tc.get('input', {}))[:100]}"
)
stop_reason = "tool_use" if has_tool_calls else "end_turn"
msg_delta = {
"type": "message_delta",
"delta": {
"stop_reason": stop_reason,
"stop_sequence": None
},
"usage": {"output_tokens": output_tokens}
}
yield (
f"event: message_delta\n"
f"data: {json.dumps(msg_delta, ensure_ascii=False)}\n\n"
)
yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n'
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# =====================================================
# نقطة /v1/messages/stream
# =====================================================
@app.post("/v1/messages/stream")
async def v1_messages_stream(request: Request):
verify_api_key(request)
body = await request.json()
session_id = extract_session_id(request, body)
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
model = body.get("model", "gpt-4o")
system_prompt = body.get("system", "")
if isinstance(system_prompt, list):
sys_parts = []
for sp in system_prompt:
if isinstance(sp, dict) and sp.get("type") == "text":
sys_parts.append(sp.get("text", ""))
elif isinstance(sp, str):
sys_parts.append(sp)
system_prompt = "\n".join(sys_parts)
max_tokens = body.get("max_tokens", 4096)
tools = body.get("tools", [])
tool_choice = body.get("tool_choice", "auto")
full_message, history = MessageConverter.convert_messages(
messages, system_prompt, tools, tool_choice, session_id
)
return await _handle_anthropic_stream(
full_message, history, model, max_tokens,
tools, tool_choice, {}, session_id
)
# =====================================================
# نقطة /v1/chat/completions (OpenAI)
# =====================================================
@app.post("/v1/chat/completions")
async def v1_chat_completions(request: Request):
verify_api_key(request)
body = await request.json()
session_id = extract_session_id(request, body)
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
model = body.get("model", "gpt-4o")
is_stream = body.get("stream", False)
last_message = messages[-1]
user_message = extract_text_from_content(last_message.get("content", ""))
history = []
for msg in messages[:-1]:
role = msg.get("role", "user")
content = extract_text_from_content(msg.get("content", ""))
if role and content:
history.append({"role": role, "content": content})
history = SmartHistoryManager.process_history(history, session_id)
completion_id = f"chatcmpl-{int(time.time())}_{os.urandom(4).hex()}"
if is_stream:
async def openai_stream():
for chunk in ask(
user_message, history, "Blackbox", model,
session_id=session_id
):
if chunk:
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": chunk},
"finish_reason": None
}]
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
final_data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_data, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
openai_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
full_response = ""
for chunk in ask(
user_message, history, "Blackbox", model,
session_id=session_id
):
full_response += chunk
return {
"id": completion_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": full_response
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": max(1, len(user_message) // 4),
"completion_tokens": max(1, len(full_response) // 4),
"total_tokens": max(1, (len(user_message) + len(full_response)) // 4)
}
}
# =====================================================
# نقاط نهاية إضافية
# =====================================================
@app.get("/")
async def root():
return {
"message": "G4F Execution Engine",
"version": "6.0.0",
"problems_solved": {
"1_smart_watcher_syndrome": (
"ResponseAnalyzer detects 'I will...' patterns "
"and flags them for immediate retry"
),
"2_lost_in_translation": (
"ToolCallParser._clean_json_string() fixes common JSON errors "
"before parsing - tolerant of extra spaces and characters"
),
"3_missing_final_push": (
"ExecutionEnforcer.build_forced_execution_message() "
"sends a mandatory execution message when description detected"
),
"4_identity_confusion": (
"ToolsFormatter defines the model as EXECUTOR not DESCRIBER "
"with clear forbidden patterns list"
),
},
"providers": list(REAL_PROVIDERS.keys()),
"cookies": COOKIE_STATUS,
"status": "✅ Execution Engine v6 Active"
}
@app.get("/health")
async def health():
return {
"status": "ok",
"version": "6.0.0",
"cookies": COOKIE_STATUS,
"providers": list(REAL_PROVIDERS.keys()),
"provider_count": len(REAL_PROVIDERS),
"context_store": {
"active_sessions": len(CONTEXT_STORE._store),
},
"execution_engine": {
"response_analyzer": True,
"execution_enforcer": True,
"stream_retry": True,
"json_cleaner": True,
"description_patterns": len(ResponseAnalyzer.DESCRIPTION_PHRASES),
},
"timestamp": int(time.time())
}
@app.get("/providers")
async def get_providers(request: Request):
verify_api_key(request)
result = {}
for pname, pobj in REAL_PROVIDERS.items():
models = discover_provider_models(pobj, pname)
result[pname] = {"available": True, "models": models}
return {"providers": result}
@app.get("/context/{session_id}")
async def get_session_context(session_id: str, request: Request):
verify_api_key(request)
ctx = CONTEXT_STORE.get_or_create_session(session_id)
return {
"session_id": session_id,
"context": ctx,
"summary": CONTEXT_STORE.get_context_summary(session_id),
"execution_directive": CONTEXT_STORE.get_execution_directive(session_id)
}
@app.delete("/context/{session_id}")
async def clear_session_context(session_id: str, request: Request):
verify_api_key(request)
with CONTEXT_STORE._lock:
if session_id in CONTEXT_STORE._store:
del CONTEXT_STORE._store[session_id]
return {"message": f"Session {session_id} cleared"}
return {"message": f"Session {session_id} not found"}
@app.get("/context")
async def list_sessions(request: Request):
verify_api_key(request)
sessions = []
with CONTEXT_STORE._lock:
for sid, ctx in CONTEXT_STORE._store.items():
sessions.append({
"session_id": sid,
"message_count": ctx.get("message_count", 0),
"original_task_preview": ctx.get("original_task", "")[:100],
"current_state": ctx.get("current_state", ExecutionState.IDLE),
"completed_steps": len(ctx.get("completed_steps", [])),
"tool_results": len(ctx.get("tool_results", [])),
"pending_tools": len(ctx.get("pending_tool_calls", [])),
"executed_tools": len(ctx.get("executed_tool_calls", [])),
"description_count": ctx.get("consecutive_description_count", 0),
"last_updated": ctx.get("last_updated", 0),
})
return {"sessions": sessions, "total": len(sessions)}
@app.post("/debug/test-execution-enforcer")
async def debug_test_execution_enforcer(request: Request):
verify_api_key(request)
test_cases = [
{
"name": "Arabic description without execution",
"response": "سأقوم الآن بإنشاء الملف باستخدام أداة write_file.",
"expected_needs_retry": True
},
{
"name": "English description without execution",
"response": "I will now create the file using the write_file tool.",
"expected_needs_retry": True
},
{
"name": "Actual tool call",
"response": (
"Creating the file now.\n"
"<tool_call>\n"
'{"name": "write_file", "arguments": {"path": "test.py", "content": "print(1)"}}\n'
"</tool_call>"
),
"expected_needs_retry": False
},
{
"name": "Let me create pattern",
"response": "Let me create a Python file with the requested functionality.",
"expected_needs_retry": True
},
{
"name": "Regular response (no tools needed)",
"response": "The answer to your question is 42.",
"expected_needs_retry": False
},
{
"name": "I'll use the tool pattern",
"response": "I'll use the write_file tool to save your code.",
"expected_needs_retry": True
},
]
test_tools = [{
"name": "write_file",
"description": "Write content to a file",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"content": {"type": "string", "description": "File content"}
},
"required": ["path", "content"]
}
}]
results = []
for tc in test_cases:
analysis = ResponseAnalyzer.analyze(tc["response"], test_tools)
test_session = f"test_{uuid.uuid4().hex[:8]}"
enforcement = ExecutionEnforcer.post_process_response(
tc["response"], test_session, test_tools, []
)
# تنظيف الجلسة التجريبية
with CONTEXT_STORE._lock:
if test_session in CONTEXT_STORE._store:
del CONTEXT_STORE._store[test_session]
results.append({
"test_name": tc["name"],
"response_preview": tc["response"][:100],
"expected_needs_retry": tc["expected_needs_retry"],
"actual_needs_retry": enforcement["needs_retry"],
"passed": enforcement["needs_retry"] == tc["expected_needs_retry"],
"analysis": {
"has_actual_tool_call": analysis["has_actual_tool_call"],
"has_description_without_execution": (
analysis["has_description_without_execution"]
),
"intended_tool": analysis.get("intended_tool_name"),
},
"retry_message_preview": (
enforcement["retry_message"][:150]
if enforcement["retry_message"]
else ""
)
})
passed = sum(1 for r in results if r["passed"])
total = len(results)
return {
"test_results": results,
"summary": {
"passed": passed,
"failed": total - passed,
"total": total,
"success_rate": f"{passed/total*100:.1f}%"
}
}
@app.get("/debug/test-tool-parse")
async def debug_test_tool_parse(request: Request):
verify_api_key(request)
test_cases = [
{
"name": "Standard tool_call tags",
"input": (
'I will create the file now.\n'
'<tool_call>{"name": "write_file", '
'"arguments": {"path": "test.py", "content": "print(\'hello\')"}}'
'</tool_call>'
),
},
{
"name": "Tool call with whitespace",
"input": (
'Let me help you.\n\n\n'
' <tool_call>\n'
'{"name": "read_file", "arguments": {"path": "config.json"}}\n'
'</tool_call>'
),
},
{
"name": "Code block format",
"input": (
'Let me write that.\n'
'```tool_call\n'
'{"name": "read_file", "arguments": {"path": "config.json"}}\n'
'```'
),
},
{
"name": "JSON with trailing comma (common error)",
"input": (
'<tool_call>'
'{"name": "write_file", "arguments": {"path": "test.py", "content": "data",}}'
'</tool_call>'
),
},
{
"name": "Multiple tool calls",
"input": (
'First read, then write.\n'
'<tool_call>{"name": "read_file", "arguments": {"path": "old.txt"}}'
'</tool_call>\n'
'Now writing:\n'
'<tool_call>{"name": "write_file", "arguments": {"path": "new.txt", "content": "data"}}'
'</tool_call>'
),
},
{
"name": "No tool calls (plain text)",
"input": "This is just a regular response with no tool calls.",
}
]
results = []
for tc in test_cases:
clean_text, tool_calls = ToolCallParser.parse_tool_calls(tc["input"])
results.append({
"test_name": tc["name"],
"input_preview": (
tc["input"][:100] + "..."
if len(tc["input"]) > 100
else tc["input"]
),
"clean_text": clean_text,
"tool_calls_found": len(tool_calls),
"tool_calls": tool_calls,
"has_tool_call_marker": ToolCallParser.might_contain_tool_call(tc["input"]),
})
return {"test_results": results}
@app.post("/debug/test-stream-buffer")
async def debug_test_stream_buffer(request: Request):
verify_api_key(request)
body = await request.json()
text = body.get("text", "")
tools = body.get("tools", [])
chunk_size = body.get("chunk_size", 10)
buffer = StreamToolBuffer(available_tools=tools)
all_events = []
for i in range(0, len(text), chunk_size):
chunk = text[i:i + chunk_size]
events = buffer.feed(chunk)
for event in events:
all_events.append({"chunk_index": i // chunk_size, "event": event})
remaining = buffer.flush()
for event in remaining:
all_events.append({"chunk_index": "flush", "event": event})
return {
"input_length": len(text),
"chunk_size": chunk_size,
"total_chunks": (len(text) + chunk_size - 1) // chunk_size,
"total_events": len(all_events),
"events": all_events
}
# =====================================================
# نقطة الدردشة البسيطة
# =====================================================
@app.post("/chat")
async def simple_chat(chat_req: ChatRequest, request: Request):
verify_api_key(request)
session_id = chat_req.session_id or extract_session_id(request, {})
full_response = ""
for chunk in ask(
chat_req.message,
chat_req.history,
chat_req.provider,
chat_req.model,
session_id=session_id
):
full_response += chunk
return {
"response": full_response,
"session_id": session_id,
"provider": chat_req.provider,
"model": chat_req.model
}
@app.post("/chat/stream")
async def simple_chat_stream(chat_req: ChatRequest, request: Request):
verify_api_key(request)
session_id = chat_req.session_id or extract_session_id(request, {})
async def generate():
for chunk in ask(
chat_req.message,
chat_req.history,
chat_req.provider,
chat_req.model,
session_id=session_id
):
if chunk:
yield f"data: {json.dumps({'text': chunk}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# =====================================================
# BACKGROUND CLEANUP TASK
# =====================================================
async def cleanup_task():
"""مهمة تنظيف دورية للجلسات القديمة"""
while True:
try:
await asyncio.sleep(3600) # كل ساعة
CONTEXT_STORE.cleanup_old_sessions(max_age_hours=24)
logger.info("[Cleanup] Old sessions cleaned")
except Exception as e:
logger.error(f"[Cleanup] Error: {e}")
@app.on_event("startup")
async def startup_event():
"""تشغيل المهام عند بدء التطبيق"""
asyncio.create_task(cleanup_task())
logger.info("=" * 60)
logger.info("🚀 G4F Execution Engine v6.0.0 Started")
logger.info(f"🍪 Cookies: {COOKIE_STATUS}")
logger.info(f"🔌 Providers: {list(REAL_PROVIDERS.keys())}")
logger.info("🔧 Execution Engine: ACTIVE")
logger.info("✅ Problems Fixed:")
logger.info(" 1. Smart Watcher Syndrome → ResponseAnalyzer")
logger.info(" 2. Lost in Translation → JSON Cleaner")
logger.info(" 3. Missing Final Push → ExecutionEnforcer")
logger.info(" 4. Identity Confusion → EXECUTOR mode")
logger.info("=" * 60)
@app.on_event("shutdown")
async def shutdown_event():
"""تنظيف عند إغلاق التطبيق"""
logger.info("[Shutdown] G4F Execution Engine stopping...")
active_sessions = len(CONTEXT_STORE._store)
logger.info(f"[Shutdown] Active sessions at shutdown: {active_sessions}")
# =====================================================
# نقطة تشخيص: عرض حالة النظام الكاملة
# =====================================================
@app.get("/debug/system-status")
async def debug_system_status(request: Request):
"""عرض حالة النظام الكاملة للتشخيص"""
verify_api_key(request)
# إحصائيات الجلسات
session_stats = {
"total_sessions": 0,
"sessions_with_pending_tools": 0,
"sessions_with_description_failures": 0,
"sessions_completed": 0,
"total_tool_executions": 0,
"total_description_failures": 0,
}
with CONTEXT_STORE._lock:
session_stats["total_sessions"] = len(CONTEXT_STORE._store)
for sid, ctx in CONTEXT_STORE._store.items():
pending = len(ctx.get("pending_tool_calls", []))
desc_failures = ctx.get("consecutive_description_count", 0)
executed = len(ctx.get("executed_tool_calls", []))
state = ctx.get("current_state", "")
if pending > 0:
session_stats["sessions_with_pending_tools"] += 1
if desc_failures > 0:
session_stats["sessions_with_description_failures"] += 1
if state == ExecutionState.COMPLETED:
session_stats["sessions_completed"] += 1
session_stats["total_tool_executions"] += executed
session_stats["total_description_failures"] += desc_failures
# إحصائيات المزودين
provider_stats = {}
for pname, pobj in REAL_PROVIDERS.items():
models = discover_provider_models(pobj, pname)
provider_stats[pname] = {
"available": True,
"model_count": len(models),
"models": models[:3], # أول 3 نماذج فقط
}
# إحصائيات محرك التنفيذ
engine_stats = {
"description_patterns_count": len(ResponseAnalyzer.DESCRIPTION_PHRASES),
"tool_call_start_markers": len(ToolCallParser.START_MARKERS),
"max_execution_retries": ExecutionEnforcer.MAX_EXECUTION_RETRIES,
"cache_size": len(CACHE.cache),
"cache_max_size": CACHE.max_size,
"cache_ttl_seconds": CACHE.ttl,
}
return {
"status": "operational",
"version": "6.0.0",
"uptime_info": {
"cookie_status": COOKIE_STATUS,
"providers_loaded": len(REAL_PROVIDERS),
},
"session_statistics": session_stats,
"provider_statistics": provider_stats,
"execution_engine_statistics": engine_stats,
"execution_states": {
"IDLE": ExecutionState.IDLE,
"PLANNING": ExecutionState.PLANNING,
"TOOL_PENDING": ExecutionState.TOOL_PENDING,
"TOOL_EXECUTING": ExecutionState.TOOL_EXECUTING,
"TOOL_DONE": ExecutionState.TOOL_DONE,
"AWAITING_NEXT": ExecutionState.AWAITING_NEXT,
"COMPLETED": ExecutionState.COMPLETED,
},
"timestamp": int(time.time())
}
# =====================================================
# نقطة تشخيص: اختبار مزود محدد
# =====================================================
@app.post("/debug/test-provider")
async def debug_test_provider(request: Request):
"""اختبار مزود محدد بشكل مباشر"""
verify_api_key(request)
body = await request.json()
provider_name = body.get("provider", "Blackbox")
model_name = body.get("model", "gpt-4o")
test_message = body.get("message", "Say 'hello' in one word.")
timeout_seconds = body.get("timeout", 30)
pobj = REAL_PROVIDERS.get(provider_name)
if not pobj:
return {
"success": False,
"error": f"Provider '{provider_name}' not found",
"available_providers": list(REAL_PROVIDERS.keys())
}
start_time = time.time()
response_chunks = []
error_msg = None
try:
msgs = [{"role": "user", "content": test_message}]
stream = g4f.ChatCompletion.create(
model=model_name,
provider=pobj,
messages=msgs,
stream=True,
timeout=timeout_seconds
)
for chunk in stream:
c = clean_stream(chunk)
if c:
response_chunks.append(c)
# توقف بعد 500 حرف للاختبار
if sum(len(r) for r in response_chunks) > 500:
break
except Exception as e:
error_msg = str(e)
elapsed = time.time() - start_time
full_response = "".join(response_chunks)
return {
"success": bool(full_response and not error_msg),
"provider": provider_name,
"model": model_name,
"test_message": test_message,
"response_preview": full_response[:300] if full_response else "",
"response_length": len(full_response),
"elapsed_seconds": round(elapsed, 2),
"error": error_msg,
"timestamp": int(time.time())
}
# =====================================================
# نقطة تشخيص: اختبار دورة كاملة مع أداة
# =====================================================
@app.post("/debug/test-full-tool-cycle")
async def debug_test_full_tool_cycle(request: Request):
"""
اختبار دورة كاملة: رسالة → كشف أداة → تنفيذ → نتيجة
يُظهر بوضوح هل المحرك يعمل أم لا
"""
verify_api_key(request)
body = await request.json()
test_message = body.get(
"message",
"Create a file called 'hello.py' with content: print('Hello World')"
)
model = body.get("model", "gpt-4o")
test_tools = [
{
"name": "write_file",
"description": "Write content to a file on disk",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The file path to write to"
},
"content": {
"type": "string",
"description": "The content to write to the file"
}
},
"required": ["path", "content"]
}
}
]
test_session = f"test_cycle_{uuid.uuid4().hex[:8]}"
results = {
"test_session": test_session,
"test_message": test_message,
"model": model,
"steps": [],
"final_tool_calls": [],
"success": False,
"description_attempts": 0,
"execution_attempts": 0,
}
# المرحلة 1: تحويل الرسالة
messages = [{"role": "user", "content": test_message}]
full_message, history = MessageConverter.convert_messages(
messages, "", test_tools, "auto", test_session
)
results["steps"].append({
"step": 1,
"name": "Message Conversion",
"message_length": len(full_message),
"has_tool_instructions": "tool_call" in full_message.lower(),
"has_executor_rule": "EXECUTOR" in full_message,
})
# المرحلة 2: إرسال للنموذج
step2_start = time.time()
full_response = ""
try:
for chunk in ask(
full_message, history, "Blackbox", model,
tools=test_tools, session_id=test_session
):
full_response += chunk
except Exception as e:
results["steps"].append({
"step": 2,
"name": "Model Request",
"error": str(e)
})
return results
step2_elapsed = time.time() - step2_start
# المرحلة 3: تحليل الرد
analysis = ResponseAnalyzer.analyze(full_response, test_tools)
clean_text, tool_calls = ToolCallParser.parse_tool_calls(full_response, test_tools)
results["steps"].append({
"step": 2,
"name": "Model Request",
"response_length": len(full_response),
"response_preview": full_response[:200],
"elapsed_seconds": round(step2_elapsed, 2),
"has_actual_tool_call": analysis["has_actual_tool_call"],
"has_description_without_execution": analysis["has_description_without_execution"],
"tool_calls_found": len(tool_calls),
"described_actions": analysis["described_actions"][:3],
"intended_tool": analysis.get("intended_tool_name"),
})
if analysis["has_description_without_execution"]:
results["description_attempts"] += 1
# المرحلة 4: تطبيق محرك التنفيذ إذا لزم
if not tool_calls and analysis["has_description_without_execution"]:
enforcement = ExecutionEnforcer.post_process_response(
full_response, test_session, test_tools, tool_calls
)
results["steps"].append({
"step": 3,
"name": "Execution Enforcer",
"needs_retry": enforcement["needs_retry"],
"retry_message_preview": enforcement["retry_message"][:200],
})
if enforcement["needs_retry"]:
results["execution_attempts"] += 1
retry_response = ""
retry_start = time.time()
try:
for chunk in ask(
enforcement["retry_message"], [], "Blackbox", model,
tools=test_tools, session_id=test_session
):
retry_response += chunk
except Exception as e:
results["steps"].append({
"step": 4,
"name": "Retry Request",
"error": str(e)
})
retry_elapsed = time.time() - retry_start
clean_text2, tool_calls2 = ToolCallParser.parse_tool_calls(
retry_response, test_tools
)
results["steps"].append({
"step": 4,
"name": "Retry Request",
"response_length": len(retry_response),
"response_preview": retry_response[:200],
"elapsed_seconds": round(retry_elapsed, 2),
"tool_calls_found": len(tool_calls2),
})
if tool_calls2:
tool_calls = tool_calls2
# النتيجة النهائية
results["final_tool_calls"] = tool_calls
results["success"] = len(tool_calls) > 0
# تنظيف الجلسة التجريبية
with CONTEXT_STORE._lock:
if test_session in CONTEXT_STORE._store:
del CONTEXT_STORE._store[test_session]
results["summary"] = {
"total_steps": len(results["steps"]),
"description_attempts": results["description_attempts"],
"execution_attempts": results["execution_attempts"],
"tool_calls_extracted": len(results["final_tool_calls"]),
"verdict": (
"✅ SUCCESS: Tool was actually called"
if results["success"]
else "❌ FAILURE: Model described but didn't execute"
)
}
return results
# =====================================================
# نقطة: مسح كل الجلسات (للتطوير فقط)
# =====================================================
@app.delete("/context/all/clear")
async def clear_all_sessions(request: Request):
"""مسح جميع الجلسات - للتطوير فقط"""
verify_api_key(request)
with CONTEXT_STORE._lock:
count = len(CONTEXT_STORE._store)
CONTEXT_STORE._store.clear()
return {
"message": f"Cleared {count} sessions",
"timestamp": int(time.time())
}
# =====================================================
# نقطة: إضافة نتيجة أداة يدوياً (للاختبار)
# =====================================================
@app.post("/context/{session_id}/add-tool-result")
async def add_tool_result_manually(
session_id: str,
request: Request
):
"""إضافة نتيجة أداة يدوياً - مفيد للاختبار"""
verify_api_key(request)
body = await request.json()
tool_name = body.get("tool_name", "unknown")
tool_id = body.get("tool_id", ToolCallParser.generate_tool_id())
result = body.get("result", "")
CONTEXT_STORE.add_tool_result(session_id, tool_name, tool_id, result)
return {
"message": f"Tool result added for session {session_id}",
"tool_name": tool_name,
"tool_id": tool_id,
"result_preview": result[:100],
"session_state": CONTEXT_STORE.get_or_create_session(session_id).get(
"current_state", ExecutionState.IDLE
)
}
# =====================================================
# نقطة: تحديث حالة الجلسة يدوياً
# =====================================================
@app.post("/context/{session_id}/set-state")
async def set_session_state(session_id: str, request: Request):
"""تحديث حالة الجلسة يدوياً"""
verify_api_key(request)
body = await request.json()
new_state = body.get("state", ExecutionState.IDLE)
next_action = body.get("next_required_action", "")
valid_states = [
ExecutionState.IDLE,
ExecutionState.PLANNING,
ExecutionState.TOOL_PENDING,
ExecutionState.TOOL_EXECUTING,
ExecutionState.TOOL_DONE,
ExecutionState.AWAITING_NEXT,
ExecutionState.COMPLETED,
]
if new_state not in valid_states:
raise HTTPException(
status_code=400,
detail=f"Invalid state. Valid states: {valid_states}"
)
CONTEXT_STORE.update_session(
session_id,
current_state=new_state,
next_required_action=next_action
)
return {
"message": f"Session {session_id} state updated",
"new_state": new_state,
"next_required_action": next_action,
"execution_directive": CONTEXT_STORE.get_execution_directive(session_id)
}
# =====================================================
# نقطة: تسجيل أداة معلّقة يدوياً
# =====================================================
@app.post("/context/{session_id}/register-pending-tool")
async def register_pending_tool_manually(
session_id: str,
request: Request
):
"""تسجيل أداة معلّقة يدوياً - لإجبار البوت على تنفيذها"""
verify_api_key(request)
body = await request.json()
tool_name = body.get("name", "")
tool_args = body.get("arguments", {})
reason = body.get("reason", "Manually registered")
if not tool_name:
raise HTTPException(status_code=400, detail="Tool name is required")
tool_id = ToolCallParser.generate_tool_id()
pending_tool = PendingToolCall(
name=tool_name,
arguments=tool_args,
tool_id=tool_id,
reason=reason
)
CONTEXT_STORE.register_pending_tool(session_id, pending_tool)
return {
"message": f"Pending tool '{tool_name}' registered for session {session_id}",
"tool_id": tool_id,
"directive": CONTEXT_STORE.get_execution_directive(session_id)
}
# =====================================================
# نقطة: اختبار JSON cleaner
# =====================================================
@app.post("/debug/test-json-cleaner")
async def debug_test_json_cleaner(request: Request):
"""اختبار مُصلح JSON على حالات شائعة"""
verify_api_key(request)
test_cases = [
{
"name": "Trailing comma",
"input": '{"name": "write_file", "arguments": {"path": "test.py",}}',
"expected_valid": True
},
{
"name": "Curly quotes",
"input": '{\u201cname\u201d: \u201cwrite_file\u201d, \u201carguments\u201d: {}}',
"expected_valid": True
},
{
"name": "Missing closing brace",
"input": '{"name": "write_file", "arguments": {"path": "test.py"',
"expected_valid": True
},
{
"name": "Valid JSON",
"input": '{"name": "read_file", "arguments": {"path": "config.json"}}',
"expected_valid": True
},
{
"name": "Double trailing commas",
"input": '{"name": "tool",, "arguments": {}}',
"expected_valid": False # هذا صعب الإصلاح
},
]
results = []
for tc in test_cases:
cleaned = ToolCallParser._clean_json_string(tc["input"])
parse_success = False
parse_result = None
error_msg = None
try:
parse_result = json.loads(cleaned)
parse_success = True
except json.JSONDecodeError as e:
error_msg = str(e)
# محاولة الإصلاح التلقائي
fixed = ToolCallParser._try_fix_json(tc["input"])
if fixed:
parse_result = fixed
parse_success = True
error_msg = None
results.append({
"test_name": tc["name"],
"original_input": tc["input"],
"cleaned_output": cleaned,
"parse_success": parse_success,
"parse_result": parse_result,
"error": error_msg,
"expected_valid": tc["expected_valid"],
"passed": parse_success == tc["expected_valid"]
})
passed = sum(1 for r in results if r["passed"])
return {
"test_results": results,
"summary": {
"passed": passed,
"failed": len(results) - passed,
"total": len(results)
}
}
# =====================================================
# نقطة: اختبار ResponseAnalyzer على نص مخصص
# =====================================================
@app.post("/debug/analyze-response")
async def debug_analyze_response(request: Request):
"""تحليل نص مخصص بواسطة ResponseAnalyzer"""
verify_api_key(request)
body = await request.json()
response_text = body.get("response", "")
tools = body.get("tools", [])
if not response_text:
raise HTTPException(status_code=400, detail="response field is required")
analysis = ResponseAnalyzer.analyze(response_text, tools)
# محاكاة قرار المحرك
test_session = f"analyze_{uuid.uuid4().hex[:8]}"
enforcement = ExecutionEnforcer.post_process_response(
response_text, test_session, tools, analysis["tool_calls_found"]
)
# تنظيف
with CONTEXT_STORE._lock:
if test_session in CONTEXT_STORE._store:
del CONTEXT_STORE._store[test_session]
return {
"input_response_preview": response_text[:300],
"analysis": {
"has_actual_tool_call": analysis["has_actual_tool_call"],
"has_description_without_execution": (
analysis["has_description_without_execution"]
),
"needs_execution_push": analysis["needs_execution_push"],
"described_actions_found": analysis["described_actions"],
"intended_tool_name": analysis.get("intended_tool_name"),
"tool_calls_extracted": len(analysis["tool_calls_found"]),
"tool_calls_details": analysis["tool_calls_found"],
},
"enforcer_decision": {
"needs_retry": enforcement["needs_retry"],
"retry_message": enforcement["retry_message"],
},
"verdict": (
"✅ EXECUTOR: Model actually called a tool"
if analysis["has_actual_tool_call"]
else (
"⚠️ DESCRIBER: Model described without executing"
if analysis["has_description_without_execution"]
else "ℹ️ NEUTRAL: Regular text response (no tools involved)"
)
)
}
# =====================================================
# نقطة: مراقبة الكاش
# =====================================================
@app.get("/debug/cache-stats")
async def debug_cache_stats(request: Request):
"""إحصائيات الكاش"""
verify_api_key(request)
with CACHE._lock:
cache_size = len(CACHE.cache)
now = time.time()
expired_count = sum(
1 for _, ts in CACHE.cache.values()
if now - ts > CACHE.ttl
)
entries = []
for key, (value, ts) in list(CACHE.cache.items())[:10]:
entries.append({
"key_preview": key[:50],
"value_length": len(value),
"age_seconds": round(now - ts),
"expired": now - ts > CACHE.ttl
})
return {
"cache_size": cache_size,
"max_size": CACHE.max_size,
"ttl_seconds": CACHE.ttl,
"expired_entries": expired_count,
"active_entries": cache_size - expired_count,
"sample_entries": entries,
"timestamp": int(time.time())
}
# =====================================================
# نقطة: تصدير سياق جلسة كاملة
# =====================================================
@app.get("/context/{session_id}/export")
async def export_session_context(session_id: str, request: Request):
"""تصدير سياق الجلسة كاملاً بصيغة JSON"""
verify_api_key(request)
ctx = CONTEXT_STORE.get_or_create_session(session_id)
summary = CONTEXT_STORE.get_context_summary(session_id)
directive = CONTEXT_STORE.get_execution_directive(session_id)
export_data = {
"session_id": session_id,
"exported_at": int(time.time()),
"context": {
"original_task": ctx.get("original_task", ""),
"current_state": ctx.get("current_state", ExecutionState.IDLE),
"message_count": ctx.get("message_count", 0),
"completed_steps": ctx.get("completed_steps", []),
"tool_results": ctx.get("tool_results", []),
"pending_tool_calls": ctx.get("pending_tool_calls", []),
"executed_tool_calls": ctx.get("executed_tool_calls", []),
"described_but_not_executed": ctx.get("described_but_not_executed", []),
"consecutive_description_count": ctx.get(
"consecutive_description_count", 0
),
"next_required_action": ctx.get("next_required_action", ""),
"provider_history": ctx.get("provider_history", [])[-5:],
"created_at": ctx.get("created_at", 0),
"last_updated": ctx.get("last_updated", 0),
},
"human_readable_summary": summary,
"current_execution_directive": directive,
}
return export_data
# =====================================================
# نقطة: استيراد سياق جلسة
# =====================================================
@app.post("/context/{session_id}/import")
async def import_session_context(session_id: str, request: Request):
"""استيراد سياق جلسة من بيانات JSON"""
verify_api_key(request)
body = await request.json()
context_data = body.get("context", {})
if not context_data:
raise HTTPException(status_code=400, detail="context field is required")
# التحقق من الحقول المطلوبة وتعيين القيم الافتراضية
safe_context = {
"original_task": context_data.get("original_task", ""),
"task_breakdown": context_data.get("task_breakdown", []),
"completed_steps": context_data.get("completed_steps", []),
"tool_results": context_data.get("tool_results", []),
"pending_tool_calls": context_data.get("pending_tool_calls", []),
"executed_tool_calls": context_data.get("executed_tool_calls", []),
"current_state": context_data.get("current_state", ExecutionState.IDLE),
"next_required_action": context_data.get("next_required_action", ""),
"created_at": context_data.get("created_at", time.time()),
"last_updated": time.time(),
"message_count": context_data.get("message_count", 0),
"provider_history": context_data.get("provider_history", []),
"last_model_response": context_data.get("last_model_response", ""),
"described_but_not_executed": context_data.get("described_but_not_executed", []),
"consecutive_description_count": context_data.get(
"consecutive_description_count", 0
),
}
with CONTEXT_STORE._lock:
CONTEXT_STORE._store[session_id] = safe_context
return {
"message": f"Context imported for session {session_id}",
"session_id": session_id,
"state": safe_context["current_state"],
"directive": CONTEXT_STORE.get_execution_directive(session_id)
}
# =====================================================
# نقطة: اختبار pattern matching للأنماط المحظورة
# =====================================================
@app.post("/debug/test-patterns")
async def debug_test_patterns(request: Request):
"""اختبار أنماط كشف الوصف بدون تنفيذ"""
verify_api_key(request)
body = await request.json()
test_texts = body.get("texts", [
"سأقوم بإنشاء الملف الآن",
"I will create the file",
"Let me write the code",
"I'm going to use write_file",
"Here is the result: 42",
"سأستخدم أداة write_file",
"Now I'll create the Python file",
"The file has been created successfully",
"I'll use the write_file tool to save your code",
"Using the write_file tool to create the file",
])
results = []
for text in test_texts:
matched_patterns = []
for pattern in ResponseAnalyzer.DESCRIPTION_PHRASES:
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
matched_patterns.append({
"pattern": pattern,
"matches": matches
})
is_description = len(matched_patterns) > 0
results.append({
"text": text,
"is_description_without_execution": is_description,
"matched_patterns_count": len(matched_patterns),
"matched_patterns": matched_patterns[:3], # أول 3 فقط
"verdict": (
"⚠️ DESCRIPTION (needs enforcement)"
if is_description
else "✅ OK (not a description pattern)"
)
})
description_count = sum(1 for r in results if r["is_description_without_execution"])
return {
"test_results": results,
"summary": {
"total_texts": len(results),
"description_patterns_found": description_count,
"clean_texts": len(results) - description_count,
"total_patterns_available": len(ResponseAnalyzer.DESCRIPTION_PHRASES),
}
}
# =====================================================
# نقطة: إحصائيات المزودين مع الأداء
# =====================================================
@app.get("/providers/stats")
async def get_provider_stats(request: Request):
"""إحصائيات المزودين مع معلومات الأداء من التاريخ"""
verify_api_key(request)
provider_usage = {}
for pname in REAL_PROVIDERS.keys():
provider_usage[pname] = {
"total_uses": 0,
"recent_uses": 0,
"models_used": set(),
}
# تحليل تاريخ الاستخدام من الجلسات
with CONTEXT_STORE._lock:
for sid, ctx in CONTEXT_STORE._store.items():
for ph in ctx.get("provider_history", []):
pname = ph.get("provider", "")
model = ph.get("model", "")
ts = ph.get("timestamp", 0)
if pname in provider_usage:
provider_usage[pname]["total_uses"] += 1
if model:
provider_usage[pname]["models_used"].add(model)
# استخدام خلال آخر ساعة
if time.time() - ts < 3600:
provider_usage[pname]["recent_uses"] += 1
# تحويل sets إلى lists للـ JSON
result = {}
for pname, stats in provider_usage.items():
pobj = REAL_PROVIDERS.get(pname)
available_models = discover_provider_models(pobj, pname) if pobj else []
result[pname] = {
"available": True,
"total_uses": stats["total_uses"],
"recent_uses_last_hour": stats["recent_uses"],
"models_actually_used": list(stats["models_used"]),
"all_available_models": available_models,
}
return {
"providers": result,
"timestamp": int(time.time())
}
# =====================================================
# نقطة: اختبار stream buffer مع أداة كاملة
# =====================================================
@app.post("/debug/test-stream-with-tool")
async def debug_test_stream_with_tool(request: Request):
"""اختبار stream buffer مع أداة كاملة مقسّمة على chunks"""
verify_api_key(request)
body = await request.json()
chunk_size = body.get("chunk_size", 5)
test_tools = [{
"name": "write_file",
"description": "Write content to a file",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
}]
# نص يحتوي على tool call كاملة
test_text = (
'I will create the file for you.\n'
'<tool_call>\n'
'{\n'
' "name": "write_file",\n'
' "arguments": {\n'
' "path": "hello.py",\n'
' "content": "print(\'Hello World\')"\n'
' }\n'
'}\n'
'</tool_call>\n'
'The file has been created.'
)
buffer = StreamToolBuffer(available_tools=test_tools)
all_events = []
text_events = []
tool_events = []
# محاكاة الـ streaming chunk by chunk
for i in range(0, len(test_text), chunk_size):
chunk = test_text[i:i + chunk_size]
events = buffer.feed(chunk)
for event in events:
all_events.append({
"chunk_index": i // chunk_size,
"chunk_preview": chunk[:10],
"event_type": event.get("type", "tool_use"),
"event_preview": (
event.get("text", "")[:30]
if event.get("type") == "text"
else f"tool: {event.get('name', '')}"
)
})
if event.get("type") == "text":
text_events.append(event["text"])
else:
tool_events.append(event)
# تفريغ ما تبقى
remaining = buffer.flush()
for event in remaining:
all_events.append({
"chunk_index": "flush",
"event_type": event.get("type", "tool_use"),
"event_preview": (
event.get("text", "")[:30]
if event.get("type") == "text"
else f"tool: {event.get('name', '')}"
)
})
if event.get("type") == "text":
text_events.append(event["text"])
else:
tool_events.append(event)
return {
"input_text_length": len(test_text),
"chunk_size": chunk_size,
"total_chunks": (len(test_text) + chunk_size - 1) // chunk_size,
"total_events": len(all_events),
"text_events_count": len(text_events),
"tool_events_count": len(tool_events),
"reconstructed_text": "".join(text_events),
"tool_calls_detected": tool_events,
"success": len(tool_events) > 0,
"verdict": (
"✅ Stream buffer correctly detected tool call"
if len(tool_events) > 0
else "❌ Stream buffer missed the tool call"
),
"all_events_sample": all_events[:20]
}
# =====================================================
# MAIN ENTRY POINT
# =====================================================
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8000))
host = os.getenv("HOST", "0.0.0.0")
workers = int(os.getenv("WORKERS", 1))
log_level = os.getenv("LOG_LEVEL", "info").lower()
logger.info(f"Starting G4F Execution Engine v6.0.0")
logger.info(f"Host: {host}, Port: {port}")
logger.info(f"Workers: {workers}")
logger.info(f"Cookie Status: {COOKIE_STATUS}")
logger.info(f"Providers: {list(REAL_PROVIDERS.keys())}")
uvicorn.run(
"app:app",
host=host,
port=port,
workers=workers,
log_level=log_level,
reload=False,
access_log=True,
)