|
|
| import os
|
| import json
|
| import time
|
| import re
|
|
|
| def create_response(success, message="", data=None, status_code=200):
|
| """创建标准化的API响应"""
|
| response = {"success": success}
|
|
|
| if message:
|
| response["message"] = message
|
|
|
| if data is not None:
|
| response["data"] = data
|
|
|
| return response, status_code
|
|
|
| def validate_agent_access(agent_id, token=None):
|
| """验证Agent访问权限"""
|
| DATA_DIR = os.getenv("DATA_DIR", ".")
|
| agent_path = os.path.join(DATA_DIR, "agents", f"{agent_id}.json")
|
|
|
| if not os.path.exists(agent_path):
|
| return False, "Agent不存在"
|
|
|
| try:
|
| with open(agent_path, 'r', encoding='utf-8') as f:
|
| agent_config = json.load(f)
|
|
|
|
|
| if not token:
|
| return True, agent_config
|
|
|
|
|
| if "distributions" in agent_config:
|
| for dist in agent_config["distributions"]:
|
| if dist.get("token") == token:
|
|
|
| if dist.get("expires_at", 0) > 0 and dist.get("expires_at", 0) < time.time():
|
| return False, "访问令牌已过期"
|
|
|
|
|
| dist["usage_count"] = dist.get("usage_count", 0) + 1
|
|
|
|
|
| if "stats" not in agent_config:
|
| agent_config["stats"] = {}
|
|
|
| agent_config["stats"]["usage_count"] = agent_config["stats"].get("usage_count", 0) + 1
|
| agent_config["stats"]["last_used"] = int(time.time())
|
|
|
|
|
| with open(agent_path, 'w', encoding='utf-8') as f:
|
| json.dump(agent_config, f, ensure_ascii=False, indent=2)
|
|
|
| return True, agent_config
|
|
|
| return False, "访问令牌无效"
|
|
|
| except Exception as e:
|
| return False, f"验证过程出错: {str(e)}"
|
|
|
| def extract_code_from_text(text):
|
| """从文本中提取代码块"""
|
|
|
| pattern = r'```(?:python)?\n([\s\S]*?)\n```'
|
| match = re.search(pattern, text)
|
|
|
| if match:
|
| return match.group(1)
|
|
|
|
|
| pattern = r'```\n([\s\S]*?)\n```'
|
| match = re.search(pattern, text)
|
|
|
| if match:
|
| return match.group(1)
|
|
|
|
|
| return text |