| import glob as globlib, json, os, re, subprocess, openai, inspect |
| from datetime import datetime |
|
|
| MODEL = "claude-haiku-4.5" |
| RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" |
| BLUE, CYAN, GREEN, YELLOW, RED = ( |
| "\033[34m", |
| "\033[36m", |
| "\033[32m", |
| "\033[33m", |
| "\033[31m", |
| ) |
|
|
| CORE_TOOLS = {} |
| HIDDEN_TOOLS = {} |
| ACTIVATED_TOOLS = set() |
|
|
| def get_schema(f): |
| """Generate OpenAI tool schema from function signature and docstring""" |
| sig = inspect.signature(f) |
| doc = inspect.getdoc(f) or "" |
| properties = {} |
| required = [] |
| for name, param in sig.parameters.items(): |
| p_type = "string" |
| if param.annotation == int or param.annotation == float: p_type = "number" |
| elif param.annotation == bool: p_type = "boolean" |
| properties[name] = {"type": p_type} |
| if param.default is inspect.Parameter.empty: required.append(name) |
| return { |
| "type": "function", |
| "function": { |
| "name": f.__name__, |
| "description": doc, |
| "parameters": { |
| "type": "object", |
| "properties": properties, |
| "required": required, |
| }, |
| }, |
| } |
|
|
| def tool(f): |
| """Decorator for hidden tools""" |
| HIDDEN_TOOLS[f.__name__] = {"fn": f, "schema": get_schema(f)} |
| return f |
|
|
| def core_tool(f): |
| """Decorator for core tools""" |
| CORE_TOOLS[f.__name__] = {"fn": f, "schema": get_schema(f)} |
| return f |
|
|
| @core_tool |
| def search_tools(query: str): |
| """Search for hidden tools by fuzzy matching name, description, or parameters""" |
| results = [] |
| q = query.lower() |
| for name, data in HIDDEN_TOOLS.items(): |
| schema = data["schema"]["function"] |
| desc = schema.get("description", "").lower() |
| if q in name.lower() or q in desc or q in json.dumps(schema.get("parameters", {})): |
| results.append(data["schema"]) |
| ACTIVATED_TOOLS.add(name) |
| return json.dumps(results) if results else "No tools found" |
|
|
| @core_tool |
| def read(path: str, offset: int = 0, limit: int = None): |
| """Read file with line numbers""" |
| with open(path) as f: |
| lines = f.readlines() |
| l_limit = limit if limit is not None else len(lines) |
| selected = lines[offset : offset + l_limit] if limit else lines[offset:] |
| return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected)) |
|
|
| @core_tool |
| def write(path: str, content: str): |
| """Write content to file""" |
| with open(path, "w") as f: |
| f.write(content) |
| return "ok" |
|
|
| @core_tool |
| def edit(path: str, old: str, new: str, all: bool = False): |
| """Replace old with new in file""" |
| with open(path) as f: |
| text = f.read() |
| if old not in text: return "error: old_string not found" |
| count = text.count(old) |
| if not all and count > 1: return f"error: old_string appears {count} times, must be unique (use all=true)" |
| replacement = text.replace(old, new) if all else text.replace(old, new, 1) |
| with open(path, "w") as f: |
| f.write(replacement) |
| return "ok" |
|
|
| @core_tool |
| def glob(pat: str, path: str = "."): |
| """Find files by pattern""" |
| pattern = (path + "/" + pat).replace("//", "/") |
| files = globlib.glob(pattern, recursive=True) |
| files = sorted(files, key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) |
| return "\n".join(files) or "none" |
|
|
| @core_tool |
| def grep(pat: str, path: str = "."): |
| """Search files for regex pattern""" |
| pattern = re.compile(pat) |
| hits = [] |
| for filepath in globlib.glob(path + "/**", recursive=True): |
| try: |
| with open(filepath) as f: |
| for line_num, line in enumerate(f, 1): |
| if pattern.search(line): hits.append(f"{filepath}:{line_num}:{line.rstrip()}") |
| except: pass |
| return "\n".join(hits[:50]) or "none" |
|
|
| @core_tool |
| def bash(cmd: str): |
| """Run shell command""" |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) |
| return (result.stdout + result.stderr).strip() or "(empty)" |
|
|
| @tool |
| def get_time(): |
| """Get current time""" |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
| @tool |
| def get_weather(): |
| """Get current weather information based on the season""" |
| now = datetime.now() |
| month = now.strftime("%B") |
| m = now.month |
| if m in [12, 1, 2]: season = "winter" |
| elif m in [3, 4, 5]: season = "spring" |
| elif m in [6, 7, 8]: season = "summer" |
| else: season = "autumn" |
| return f"Its {month}, its the {season} season.." |
|
|
| def run_tool(name, args): |
| try: |
| if name in CORE_TOOLS: return CORE_TOOLS[name]["fn"](**args) |
| if name in HIDDEN_TOOLS: return HIDDEN_TOOLS[name]["fn"](**args) |
| return f"error: tool {name} not found" |
| except Exception as err: |
| return f"error: {err}" |
|
|
| client = openai.OpenAI( |
| api_key=os.environ.get("POE_API_KEY", ""), |
| base_url="https://api.poe.com/v1", |
| ) |
|
|
| def separator(): |
| try: cols = os.get_terminal_size().columns |
| except: cols = 80 |
| return f"{DIM}{'─' * min(cols, 80)}{RESET}" |
|
|
| def render_markdown(text): |
| return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text) |
|
|
| def main(): |
| print(f"{RED}ag-mini-cli{RESET} | {DIM}{MODEL} | {os.getcwd()}{RESET}\n") |
| messages = [] |
| system_prompt = f""" |
| You are a concise coding assistant. cwd: {os.getcwd()} |
| |
| When you are tasked with something that you do not have the tools to do, you should try searching for tools that can help you. |
| |
| Search for tools like weather, extra operations, math, etc. |
| |
| """ |
|
|
| while True: |
| try: |
| print(separator()) |
| user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip() |
| print(separator()) |
| if not user_input: continue |
| if user_input in ("/q", "exit"): break |
| if user_input == "/c": |
| messages = [] |
| ACTIVATED_TOOLS.clear() |
| print(f"{GREEN}⏺ Cleared conversation{RESET}") |
| continue |
|
|
| messages.append({"role": "user", "content": user_input}) |
|
|
| while True: |
| current_tools = [data["schema"] for data in CORE_TOOLS.values()] |
| current_tools += [HIDDEN_TOOLS[name]["schema"] for name in ACTIVATED_TOOLS if name in HIDDEN_TOOLS] |
| |
| response = client.chat.completions.create( |
| model=MODEL, |
| messages=[{"role": "system", "content": system_prompt}] + messages, |
| tools=current_tools if current_tools else None, |
| ) |
| |
| resp_msg = response.choices[0].message |
| msg_dict = {"role": "assistant", "content": resp_msg.content} |
| if resp_msg.tool_calls: msg_dict["tool_calls"] = resp_msg.tool_calls |
| messages.append(msg_dict) |
|
|
| if resp_msg.content: print(f"\n{CYAN}⏺{RESET} {render_markdown(resp_msg.content)}") |
| if not resp_msg.tool_calls: break |
|
|
| for tool_call in resp_msg.tool_calls: |
| name = tool_call.function.name |
| args = json.loads(tool_call.function.arguments) |
| arg_preview = str(list(args.values())[0])[:50] if args else "" |
| print(f"\n{GREEN}⏺ {name.capitalize()}{RESET}({DIM}{arg_preview}{RESET})") |
| result = run_tool(name, args) |
| res_lines = str(result).split("\n") |
| preview = res_lines[0][:60] |
| if len(res_lines) > 1: preview += f" ... +{len(res_lines) - 1} lines" |
| elif len(res_lines[0]) > 60: preview += "..." |
| print(f" {DIM}⎿ {preview}{RESET}") |
| messages.append({"role": "tool", "tool_call_id": tool_call.id, "name": name, "content": str(result)}) |
| print() |
| except (KeyboardInterrupt, EOFError): break |
| except Exception as err: print(f"{RED}⏺ Error: {err}{RESET}") |
|
|
| if __name__ == "__main__": |
| main() |