| |
| |
| """ |
| Llama3 Agent β Deluxe Client (better-than-Gemini-CLI style) |
| |
| Features |
| β’ Pretty planning & execution UX (Rich) |
| β’ Execute All or Step-by-Step with per-step confirmation |
| β’ Edit plan JSON in your $EDITOR before you run it |
| β’ Dry-run mode |
| β’ Danger detection for risky shell commands (extra confirmation) |
| β’ Transcripts & plans auto-saved to ~/.llama3_agent/sessions/ |
| β’ Health check & robust error handling |
| β’ Server configurable via --server or $LLAMA_SERVER |
| |
| Usage |
| python client.py # uses $LLAMA_SERVER or http://127.0.0.1:5005 |
| python client.py --server http://IP:PORT |
| |
| Hotkeys during prompts: |
| [a] Execute all [s] Step-by-step [e] Edit plan [d] Toggle dry-run [c] Cancel |
| |
| """ |
|
|
| import os |
| import re |
| import json |
| import time |
| import uuid |
| import shlex |
| import tempfile |
| import subprocess |
| from pathlib import Path |
| from datetime import datetime |
|
|
| import click |
| import requests |
| from requests.exceptions import RequestException |
|
|
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.table import Table |
| from rich.prompt import Prompt, Confirm |
| from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn |
| from rich.syntax import Syntax |
| from rich.box import ROUNDED |
| from getpass import getpass |
| import platform |
| import io, contextlib, traceback |
|
|
| KEY_FILE = Path(__file__).resolve().parent / "key.json" |
|
|
| def read_local_key() -> str | None: |
| try: |
| if KEY_FILE.exists(): |
| data = json.loads(KEY_FILE.read_text()) |
| k = (data or {}).get("api_key", "") |
| return k.strip() or None |
| except Exception: |
| return None |
| return None |
|
|
| def write_local_key(k: str) -> None: |
| KEY_FILE.write_text(json.dumps({"api_key": k}, indent=2)) |
|
|
| def get_api_key() -> str: |
| |
| k = os.environ.get("LLAMA_API_KEY", "").strip() |
| if k: |
| return k |
| |
| k = read_local_key() |
| if k: |
| return k |
| |
| k = getpass("Enter API key: ").strip() |
| if not k: |
| raise RuntimeError("No API key provided.") |
| write_local_key(k) |
| return k |
|
|
| def auth_headers() -> dict: |
| try: |
| return {"Authorization": f"Bearer {get_api_key()}"} |
| except Exception: |
| return {} |
|
|
| |
| |
| |
| APP_HOME = Path(os.path.expanduser("~/.llama3_agent")) |
| SESS_DIR = APP_HOME / "sessions" |
| CONFIG_FILE = APP_HOME / "config.json" |
|
|
| console = Console(highlight=True, soft_wrap=False) |
|
|
| DEFAULT_SERVER = os.environ.get("LLAMA_SERVER", "https://tandevllc-axis.hf.space") |
|
|
| DANGER_PATTERNS = [ |
| r"rm\s+-rf\s+/\b", |
| r"rm\s+-rf\s+--no-preserve-root", |
| r"mkfs\.", |
| r"dd\s+if=", |
| r":\(\)\s*\{\s*:.*\|\s*:\s*&\s*\};\s*:", |
| r"shutdown\b", |
| r"reboot\b", |
| r"init\s+0\b", |
| r"halt\b", |
| r"\|\s*(sh|bash)\s*$", |
| r"(curl|wget).*\|\s*(sh|bash)", |
| r"chown\s+-R\s+root:/\b", |
| r"chmod\s+777\s+-R\s+/\b", |
| ] |
|
|
| |
| |
| |
| def ensure_dirs(): |
| APP_HOME.mkdir(parents=True, exist_ok=True) |
| SESS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| def load_config(): |
| ensure_dirs() |
| if CONFIG_FILE.exists(): |
| try: |
| return json.loads(CONFIG_FILE.read_text()) |
| except Exception: |
| return {} |
| return {} |
|
|
| def save_config(cfg): |
| ensure_dirs() |
| CONFIG_FILE.write_text(json.dumps(cfg, indent=2)) |
|
|
| def now_stamp(): |
| return datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
| def new_session_path(): |
| sid = f"{now_stamp()}_{uuid.uuid4().hex[:8]}" |
| folder = SESS_DIR / sid |
| folder.mkdir(parents=True, exist_ok=True) |
| return folder |
|
|
| def danger_check_shell(cmd: str) -> bool: |
| for pattern in DANGER_PATTERNS: |
| if re.search(pattern, cmd): |
| return True |
| |
| if re.search(r"\brm\s+-rf\s+[/~][\w\-/\.]*", cmd) and (" --preserve-root" not in cmd): |
| return True |
| return False |
|
|
| def _flatten_cmds_for_check(cmd_val): |
| if isinstance(cmd_val, str): |
| return [cmd_val] |
| if isinstance(cmd_val, dict): |
| return [v for v in cmd_val.values() if isinstance(v, str)] |
| return [] |
|
|
| def pretty_json(obj) -> str: |
| return json.dumps(obj, indent=2, ensure_ascii=False) |
|
|
| def pager(text: str): |
| |
| console.pager(text) |
|
|
| def editor_edit_json(original: dict) -> dict | None: |
| """Open $EDITOR to edit a JSON plan. Returns dict or None if cancelled/invalid.""" |
| editor = os.environ.get("EDITOR", "nano") |
| with tempfile.NamedTemporaryFile("w+", suffix=".json", delete=False) as tf: |
| path = tf.name |
| tf.write(pretty_json(original)) |
| tf.flush() |
| try: |
| subprocess.call([editor, path]) |
| |
| new_text = Path(path).read_text() |
| try: |
| data = json.loads(new_text) |
| return data |
| except json.JSONDecodeError as e: |
| console.print(f"[red]Invalid JSON after editing: {e}[/red]") |
| return None |
| finally: |
| try: |
| os.unlink(path) |
| except Exception: |
| pass |
|
|
| def show_plan_table(plan: dict): |
| table = Table(title="π§ Planned Steps", show_lines=True, box=ROUNDED) |
| table.add_column("Step#", style="bold cyan", no_wrap=True) |
| table.add_column("Action", style="bold green") |
| table.add_column("Quick details", style="yellow", overflow="fold") |
|
|
| steps = plan.get("steps", []) |
| if not isinstance(steps, list): |
| steps = [] |
|
|
| for i, s in enumerate(steps, 1): |
| t = s.get("type", "?") |
| if t == "shell": |
| action = "Run shell" |
| cmd_val = s.get('cmd', '') |
| if isinstance(cmd_val, dict): |
| shown = ", ".join(f"{k}:{v}" for k, v in cmd_val.items() if isinstance(v, str)) |
| details = f"[cyan]{shown}[/cyan]" |
| else: |
| details = f"[cyan]{cmd_val}[/cyan]" |
| if s.get("timeout"): details += f" (timeout={s['timeout']}s)" |
| if s.get("cwd"): details += f" [dim]cwd={s['cwd']}[/dim]" |
|
|
| elif t == "read_file": |
| action = "Read file" |
| details = s.get("path","") |
|
|
| elif t == "rewrite_file": |
| action = "rewrite_file" |
| details = s.get("path","?") |
|
|
| elif t in {"write_file","edit_file","append_file"}: |
| action = {"write_file":"Write file","edit_file":"Edit file","append_file":"Append file"}[t] |
| details = f"{s.get('path','')} [dim]mode={s.get('mode','w' if t!='append_file' else 'a')}[/dim]" |
|
|
| elif t == "generate_file": |
| action = "Generate file" |
| fmt = s.get('format','text') |
| details = f"{s.get('path','')} ({fmt}, {s.get('length','medium')})" |
|
|
| elif t == "generate_tree": |
| action = "Generate project tree" |
| base = s.get("base", ".") |
| files = s.get("files", []) |
| details = f"{base} β {len(files)} file(s)" |
|
|
| elif t == "generate_large_file": |
| action = "Generate large file" |
| details = f"{s.get('path','?')} [{len(s.get('chunks',[]))} chunks]" |
|
|
| elif t == "mkdirs": |
| action = "Make directories" |
| details = ", ".join(s.get("paths", []) or []) |
|
|
| elif t == "python": |
| action = "Run Python" |
| code = (s.get("code","").strip().splitlines() or [""])[0][:60] |
| details = code + ("β¦" if len(code)==60 else "") |
|
|
| elif t == "respond_llm": |
| action = "LLM respond" |
| inst = (s.get("instruction","") or "").strip() |
| details = (inst[:80] + "β¦") if len(inst) > 80 else inst |
|
|
| elif t == "respond": |
| action = "Respond" |
| details = (s.get("text","")[:80] + "β¦") if len(s.get("text","")) > 80 else s.get("text","") |
|
|
| elif t == "fs": |
| action = "fs" |
| op = s.get("op", "?") |
| path_or_patt = s.get("path") or s.get("pattern") or "" |
| details = f"{op} {path_or_patt}" |
|
|
| else: |
| action = t |
| details = pretty_json(s) |
|
|
| table.add_row(str(i), action, details) |
| console.print(table) |
|
|
| def render_result(res: dict): |
| """Pretty renderer for a single step result.""" |
| rtype = res.get("type", "unknown") |
|
|
| |
| if rtype == "mkdirs": |
| created = res.get("created", []) |
| ok = res.get("ok", True) |
| body = "No directories created." if not created else "[bold]Created:[/bold]\n" + "\n".join(created) |
| console.print(Panel(body, title="π mkdirs", border_style=("green" if ok else "red"))) |
| return |
|
|
| |
| if rtype == "generate_tree": |
| base = res.get("base","?") |
| written = res.get("written", []) |
| ok = res.get("ok", True) |
| n = len(written) |
| lines = [f"{w.get('path','?')}" for w in written[:20]] |
| more = f"\n⦠and {n-20} more" if n > 20 else "" |
| body = f"[bold]Base:[/bold] {base}\n[bold]Files written:[/bold] {n}\n" + ("\n".join(lines) + more if n else "None") |
| console.print(Panel(body, title="π² Project tree", border_style=("green" if ok else "red"))) |
| return |
|
|
| |
| if rtype == "generate_large_file": |
| ok = res.get("ok", True) |
| path = res.get("path","?") |
| chunks = res.get("chunks",0) |
| bytes_ = res.get("bytes",0) |
| body = f"[bold]Path:[/bold] {path}\n[bold]Chunks:[/bold] {chunks}\n[bold]Bytes:[/bold] {bytes_}" |
| console.print(Panel(body, title="π§± Large file", border_style=("green" if ok else "red"))) |
| return |
|
|
| |
| if rtype == "shell": |
| cmd = res.get("cmd", "") |
| rc = res.get("returncode") |
| ok = (rc == 0) |
| icon = "β
" if ok else "β" |
| title = f"{icon} Shell β {('Succeeded' if ok else 'Failed')} (rc={rc})" |
| subtitle = f"[bold]Command:[/bold] [cyan]{cmd}[/cyan]" |
| if res.get("cwd"): |
| subtitle += f"\n[bold]CWD:[/bold] {res['cwd']}" |
| console.print(Panel(subtitle, title=title, border_style=("green" if ok else "red"))) |
|
|
| |
| pre = res.get("preinstall") |
| if pre: |
| if len(pre) <= 1200 and pre.count("\n") <= 40: |
| console.print(Panel(pre, title="π§° Preinstall (auto-installed tools)", border_style="yellow")) |
| else: |
| console.print(Panel("Auto-install log is long. Opening pagerβ¦", title="π§° Preinstall", border_style="yellow")) |
| pager(pre) |
| console.rule(style="dim") |
|
|
| stdout = res.get("stdout") or "" |
| stderr = res.get("stderr") or "" |
|
|
| if not stdout.strip() and not stderr.strip(): |
| hint = "" |
| import re as _re |
| if _re.search(r"\bdig\b", cmd) and _re.search(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", cmd) and "-x" not in cmd: |
| hint = "\n[dim]Hint: Reverse DNS uses[/dim] [cyan]dig -x <IP> +short[/cyan]" |
| console.print(Panel("No output from command." + hint, border_style="yellow")) |
| return |
|
|
| if stdout.strip(): |
| console.print("[bold]stdout[/bold]") |
| if len(stdout) <= 1200 and stdout.count("\n") <= 40: |
| console.print(Syntax(stdout.rstrip("\n"), "bash", theme="ansi_dark")) |
| else: |
| console.print("[cyan]Opening pager for long stdoutβ¦[/cyan]") |
| pager(stdout) |
| console.rule(style="dim") |
|
|
| if stderr.strip(): |
| console.print("[bold red]stderr[/bold red]") |
| if len(stderr) <= 1200 and stderr.count("\n") <= 40: |
| console.print(Syntax(stderr.rstrip("\n"), "bash", theme="ansi_dark")) |
| else: |
| console.print("[cyan]Opening pager for long stderrβ¦[/cyan]") |
| pager(stderr) |
| console.rule(style="dim") |
| return |
|
|
| |
| if rtype == "read_file": |
| path = res.get("path", "?") |
| content = res.get("content", "") |
| size = len(content.encode("utf-8")) |
| title = f"π Read File β {path}" |
| meta = f"[bold]Bytes:[/bold] {size}" |
| console.print(Panel(meta, title=title, border_style="cyan")) |
|
|
| ext = Path(path).suffix.lower() |
| lang = "text" |
| if ext in {".py"}: lang = "python" |
| elif ext in {".sh", ".bash"}: lang = "bash" |
| elif ext in {".html", ".htm"}: lang = "html" |
| elif ext in {".md"}: lang = "markdown" |
| elif ext in {".json"}: lang = "json" |
| elif ext in {".yml", ".yaml"}: lang = "yaml" |
|
|
| if len(content) <= 1500 and content.count("\n") <= 60: |
| console.print(Syntax(content, lang, theme="ansi_dark")) |
| else: |
| console.print("[cyan]Opening pager for long contentβ¦[/cyan]") |
| pager(content) |
| return |
| |
| |
| if rtype == "fs": |
| op = res.get("op") |
| title = f"βΉ fs β {op}" |
| if op == "list": |
| entries = res.get("entries", []) or [] |
| body = f"[bold]Path:[/bold] {res.get('path','?')}\n[bold]Count:[/bold] {len(entries)}" |
| if entries: |
| body += "\n" + "\n".join(entries[:50]) |
| if len(entries) > 50: |
| body += f"\n⦠and {len(entries)-50} more" |
| console.print(Panel(body, title=title, border_style="cyan")) |
| return |
|
|
| if op == "read": |
| content = res.get("content", "") |
| path = res.get("path", "?") |
| meta = f"[bold]Path:[/bold] {path} [bold]Bytes:[/bold] {len(content.encode())}" |
| console.print(Panel(meta, title=title, border_style="cyan")) |
| if len(content) <= 1500 and content.count("\n") <= 60: |
| console.print(Syntax(content, "markdown", theme="ansi_dark")) |
| else: |
| console.print("[cyan]Opening pager for long contentβ¦[/cyan]") |
| pager(content) |
| return |
|
|
| if op == "exists": |
| body = f"[bold]Path:[/bold] {res.get('path','?')}\n[bold]Exists:[/bold] {res.get('exists')}" |
| console.print(Panel(body, title=title, border_style="cyan")) |
| return |
|
|
| if op == "glob": |
| patt = res.get("pattern", "") |
| matches = res.get("matches", []) or [] |
| body = f"[bold]Pattern:[/bold] {patt}\n[bold]Count:[/bold] {len(matches)}" |
| if matches: |
| body += "\n" + "\n".join(matches[:50]) |
| if len(matches) > 50: |
| body += f"\n⦠and {len(matches)-50} more" |
| console.print(Panel(body, title=title, border_style="cyan")) |
| return |
|
|
| |
| console.print(Panel(pretty_json(res), title=title, border_style="cyan")) |
| return |
|
|
| |
| if rtype == "generate_file": |
| path = res.get("path", "?") |
| n = res.get("bytes", 0) |
| ok = (res.get("status") == "ok") |
| icon = "β
" if ok else "β" |
| title = f"{icon} Generated File β {path}" |
| meta = f"[bold]Bytes written:[/bold] {n}" |
| tip = f"[dim]Tip: try[/dim] [cyan]head -n 40 {path}[/cyan]" |
| console.print(Panel(f"{meta}\n{tip}", title=title, border_style=("green" if ok else "red"))) |
| return |
|
|
| |
| if rtype in {"write_file", "edit_file", "append_file"}: |
| path = res.get("path", "?") |
| mode = res.get("mode", "w" if rtype != "append_file" else "a") |
| ok = (res.get("status") == "ok") |
| icon = "β
" if ok else "β" |
| action = {"write_file":"Write","edit_file":"Edit","append_file":"Append"}[rtype] |
| console.print(Panel(f"[bold]Path:[/bold] {path}\n[bold]Mode:[/bold] {mode}", |
| title=f"{icon} {action} File", |
| border_style=("green" if ok else "red"))) |
| return |
|
|
| |
| if rtype == "python": |
| out = res.get("stdout", "") |
| title = "π Python β Output" |
| if out.strip(): |
| if len(out) <= 1500 and out.count("\n") <= 60: |
| console.print(Panel(Syntax(out, "python", theme="ansi_dark"), |
| title=title, border_style="magenta")) |
| else: |
| console.print(Panel("Output is long. Opening pagerβ¦", title=title, border_style="magenta")) |
| pager(out) |
| else: |
| console.print(Panel("No output.", title=title, border_style="magenta")) |
| return |
|
|
| |
| if rtype == "respond": |
| console.print(Panel(res.get("text", ""), title="π¬ Response", border_style="blue")) |
| return |
|
|
| |
| console.print(Panel(pretty_json(res), title=f"βΉ {rtype}", border_style="yellow")) |
|
|
| def show_result_panels(results: list[dict]): |
| """Pretty print execution results with smart renderers per step type.""" |
| if not results: |
| console.print("[yellow]No steps executed.[/yellow]") |
| return |
| for i, res in enumerate(results, 1): |
| console.rule(f"[bold magenta]Step {i}[/bold magenta]") |
| render_result(res) |
|
|
| def warn_if_danger(plan: dict) -> bool: |
| """Return True if anything looks dangerous; also print warnings.""" |
| dangerous = [] |
| for idx, step in enumerate(plan.get("steps", []), 1): |
| if step.get("type") == "shell": |
| for cand in _flatten_cmds_for_check(step.get("cmd", "")): |
| if danger_check_shell(cand): |
| dangerous.append((idx, cand)) |
| break |
| if dangerous: |
| console.print("[bold red]β Potentially dangerous shell steps detected![/bold red]") |
| for idx, cmd in dangerous: |
| console.print(f"[red] - Step {idx}: {cmd}[/red]") |
| return True |
| return False |
|
|
| def confirm_danger(): |
| console.print("[bold red]Type 'proceed' to continue, or anything else to cancel.[/bold red]") |
| resp = Prompt.ask("[red]Confirmation[/red]", default="") |
| return resp.strip().lower() == "proceed" |
|
|
| def health_check(server: str) -> tuple[bool, str]: |
| base = server.rstrip("/") |
| try: |
| |
| r = requests.get(base, headers=auth_headers(), timeout=5) |
| if r.ok: |
| return True, "OK (/)" |
| except RequestException as e: |
| last_err = str(e) |
| else: |
| last_err = f"HTTP {r.status_code}: {r.text}" |
|
|
| |
| try: |
| r = requests.post(f"{base}/infer", |
| json={"prompt": "healthcheck"}, |
| headers=auth_headers(), |
| timeout=(5, 90)) |
| if r.ok: |
| return True, "OK (/infer)" |
| return False, f"HTTP {r.status_code}: {r.text}" |
| except RequestException as e: |
| return False, str(e) if str(e) else last_err |
|
|
| def api_infer(server: str, prompt: str, context: str | None = None) -> dict: |
| payload = {"prompt": prompt} |
| ctx_parts = [] |
| if context: |
| ctx_parts.append(context) |
| try: |
| ctx_parts.append(f"CLIENT_OS: {platform.system().lower()}") |
| except Exception: |
| pass |
| if ctx_parts: |
| payload["context"] = "\n".join(ctx_parts) |
|
|
| r = requests.post(f"{server}/infer", json=payload, |
| headers=auth_headers(), timeout=3000) |
| r.raise_for_status() |
| return r.json()["plan"] |
|
|
| def api_execute(server: str, plan: dict) -> list: |
| r = requests.post(f"{server}/execute", json={"plan": plan}, |
| headers=auth_headers(), timeout=1200) |
| r.raise_for_status() |
| return r.json()["results"] |
|
|
| def resolve_cmd_by_os(cmd_value): |
| |
| server_os = platform.system().lower() |
| if isinstance(cmd_value, str): |
| return cmd_value |
| if isinstance(cmd_value, dict): |
| c = cmd_value.get(server_os) |
| if c: |
| return c |
| if server_os in ("linux","darwin") and cmd_value.get("unix"): |
| return cmd_value["unix"] |
| if cmd_value.get("default"): |
| return cmd_value["default"] |
| for v in cmd_value.values(): |
| if isinstance(v, str) and v.strip(): |
| return v |
| raise ValueError("Invalid 'cmd' in shell step: expected string or {os: cmd} map.") |
|
|
| def safe_exec_python(code: str) -> str: |
| buf = io.StringIO() |
| with contextlib.redirect_stdout(buf): |
| try: |
| exec(code, {"__name__":"__main__"}) |
| except Exception: |
| traceback.print_exc() |
| return buf.getvalue() |
|
|
| def api_gen(server: str, fmt: str, instruction: str, length: str = "medium") -> str: |
| r = requests.post(f"{server}/gen", |
| json={"format": fmt, "instruction": instruction, "length": length}, |
| headers=auth_headers(), timeout=3000) |
| r.raise_for_status() |
| return (r.json() or {}).get("content","") |
|
|
| def local_execute_step(server: str, step: dict) -> dict: |
| t = (step.get("type") or "").lower() |
| started = time.time() |
| try: |
| if t == "mkdirs": |
| made = [] |
| for d in step.get("paths", []) or []: |
| if not d: continue |
| os.makedirs(d, exist_ok=True) |
| made.append(d) |
| return {"type":"mkdirs","created":made,"ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t in {"write_file","edit_file","append_file"}: |
| path = step["path"] |
| content = step.get("content","") |
| mode = "w" if t != "append_file" else "a" |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
| with open(path, mode, encoding="utf-8") as f: |
| f.write(content) |
| return {"type":t, "path":path, "mode":mode, "status":"ok", |
| "bytes":len(content.encode("utf-8")), "ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "generate_file": |
| path = step["path"] |
| fmt = step.get("format","text") |
| instr= step.get("instruction","") |
| length = step.get("length","medium") |
| content = api_gen(server, fmt, instr, length) |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
| with open(path,"w",encoding="utf-8") as f: |
| f.write(content) |
| return {"type":"generate_file","path":path,"status":"ok", |
| "bytes":len(content.encode("utf-8")), "ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| |
| |
| if t == "rewrite_file": |
| path = step["path"]; instr = step.get("instruction",""); length = step.get("length","long") |
| current = Path(path).read_text(errors="ignore") if Path(path).exists() else "" |
| r = requests.post(f"{server}/assist/rewrite", |
| json={"instruction": instr, "current": current, "length": length}, |
| headers=auth_headers(), timeout=3000) |
| r.raise_for_status() |
| new_content = r.json()["new_content"] |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| Path(path).write_text(new_content, encoding="utf-8") |
| return { |
| "type":"rewrite_file", |
| "path": path, |
| "bytes": len(new_content.encode("utf-8")), |
| "ok": True, |
| "duration_ms": int((time.time()-started)*1000) |
| } |
| |
| |
| if t == "respond_llm": |
| try: |
| r = requests.post( |
| f"{server}/execute", |
| json={"plan": {"steps": [step]}}, |
| headers=auth_headers(), |
| timeout=600, |
| ) |
| r.raise_for_status() |
| results = (r.json() or {}).get("results", []) |
| res = results[0] if results else {"type": "error", "error": "Empty result from server", "ok": False} |
| |
| if res.get("type") == "error": |
| return { |
| "type": "respond", |
| "text": f"β Server error: {res.get('error')}\n\n{res.get('trace', '')}", |
| "ok": False, |
| "duration_ms": int((time.time() - started) * 1000), |
| } |
| if res.get("type") != "respond": |
| res = {"type": "respond", "text": res.get("text") or "", "ok": res.get("ok", True)} |
| res["duration_ms"] = int((time.time() - started) * 1000) |
| return res |
| except Exception as e: |
| |
| text = step.get("text") or step.get("instruction") or "Done." |
| return { |
| "type": "respond", |
| "text": text, |
| "ok": False, |
| "error": str(e), |
| "duration_ms": int((time.time() - started) * 1000), |
| } |
|
|
| |
| if t == "respond": |
| text = step.get("text") or "Done." |
| return {"type": "respond", "text": text, "ok": True, "duration_ms": int((time.time() - started) * 1000)} |
|
|
| if t == "fs": |
| op = (step.get("op") or "").lower() |
| path = step.get("path") |
| if path: path = os.path.expanduser(path) |
| try: |
| if op == "list": |
| entries = sorted(os.listdir(path)) |
| return {"type":"fs","op":op,"path":path,"entries":entries,"count":len(entries),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "read": |
| content = Path(path).read_text(errors="ignore") |
| return {"type":"fs","op":op,"path":path,"content":content,"bytes":len(content.encode()),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "write": |
| content = step.get("content","") |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| Path(path).write_text(content, encoding="utf-8") |
| return {"type":"fs","op":op,"path":path,"bytes":len(content.encode()),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "append": |
| content = step.get("content","") |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| with open(path,"a",encoding="utf-8") as f: f.write(content) |
| return {"type":"fs","op":op,"path":path,"bytes":len(content.encode()),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "mkdir": |
| Path(path).mkdir(parents=True, exist_ok=True) |
| return {"type":"fs","op":op,"path":path,"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "remove": |
| p = Path(path) |
| if p.is_dir(): |
| p.rmdir() |
| else: |
| p.unlink(missing_ok=False) |
| return {"type":"fs","op":op,"path":path,"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "move": |
| to = os.path.expanduser(step["to"]) |
| Path(to).parent.mkdir(parents=True, exist_ok=True) |
| Path(path).replace(to) |
| return {"type":"fs","op":op,"path":path,"to":to,"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "copy": |
| import shutil as _sh |
| to = os.path.expanduser(step["to"]) |
| Path(to).parent.mkdir(parents=True, exist_ok=True) |
| _sh.copy2(path, to) |
| return {"type":"fs","op":op,"path":path,"to":to,"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "exists": |
| return {"type":"fs","op":op,"path":path,"exists":Path(path).exists(),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| elif op == "glob": |
| import glob as _glob |
| patt = step.get("pattern") or path |
| matches = sorted(_glob.glob(os.path.expanduser(patt))) |
| return {"type":"fs","op":op,"pattern":patt,"matches":matches,"count":len(matches),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
| else: |
| return {"type":"error","error":f"Unknown fs op '{op}'","ok":False, |
| "duration_ms": int((time.time()-started)*1000)} |
| except Exception as e: |
| return {"type":"error","error":str(e),"ok":False, |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "generate_tree": |
| base = step.get("base") or "." |
| files = step.get("files") or [] |
| os.makedirs(base, exist_ok=True) |
| written = [] |
| for f in files: |
| rel = f.get("path"); |
| if not rel: continue |
| path = os.path.join(base, rel) |
| fmt = f.get("format","text") |
| instr = f.get("instruction","") |
| length= f.get("length","medium") |
| content = api_gen(server, fmt, instr, length) |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
| with open(path,"w",encoding="utf-8") as fp: |
| fp.write(content) |
| written.append({"path": path, "bytes": len(content.encode("utf-8"))}) |
| return {"type":"generate_tree","base":base,"written":written,"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "generate_large_file": |
| path = step["path"] |
| chunks = step.get("chunks") or [] |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
| total = 0 |
| with open(path,"w",encoding="utf-8") as fp: |
| for ck in chunks: |
| instr = ck.get("instruction","") |
| length = ck.get("length","medium") |
| piece = api_gen(server, "text", instr, length) |
| fp.write(piece + ("\n" if not piece.endswith("\n") else "")) |
| total += len(piece.encode("utf-8")) |
| return {"type":"generate_large_file","path":path,"bytes":total,"chunks":len(chunks),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "read_file": |
| path = step["path"] |
| with open(path,"r",errors="ignore") as f: |
| content = f.read() |
| return {"type":"read_file","path":path,"content":content, |
| "bytes":len(content.encode("utf-8")), "line_count":content.count("\n")+1 if content else 0, |
| "ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "list_dir": |
| path = step.get("path",".") |
| entries = sorted(os.listdir(path)) |
| return {"type":"list_dir","path":path,"entries":entries,"count":len(entries),"ok":True, |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "python": |
| out = safe_exec_python(step.get("code","")) |
| ok_flag = ("Traceback (most recent call last):" not in out) |
| return {"type":"python","stdout":out,"ok":ok_flag, "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t == "shell": |
| cmd = resolve_cmd_by_os(step["cmd"]) |
| cwd = step.get("cwd") or None |
| timeout = float(step.get("timeout", 120)) |
| env = os.environ.copy() |
| env.update(step.get("env", {})) |
| proc = subprocess.run(cmd, shell=True, capture_output=True, text=True, cwd=cwd, timeout=timeout, env=env) |
| return {"type":"shell","cmd":cmd,"cwd":cwd,"stdout":proc.stdout,"stderr":proc.stderr, |
| "returncode":proc.returncode,"ok":(proc.returncode==0), |
| "duration_ms": int((time.time()-started)*1000)} |
|
|
| if t in {"respond","respond_llm"}: |
| text = step.get("text") or step.get("instruction") or "Done." |
| return {"type":"respond","text":text,"ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
| return {"type":"error","error":f"Unknown step type {t}","ok":False, "duration_ms": int((time.time()-started)*1000)} |
|
|
| except Exception as e: |
| return {"type":"error","error":str(e),"ok":False, "duration_ms": int((time.time()-started)*1000)} |
|
|
| def local_execute(server: str, plan: dict) -> list[dict]: |
| results = [] |
| for step in (plan.get("steps") or []): |
| res = local_execute_step(server, step) |
| results.append(res) |
| return results |
|
|
| |
| |
| |
| def interactive_menu(plan: dict, *, dry_run: bool, server: str, session_dir: Path): |
| """Show plan and present menu actions.""" |
| while True: |
| show_plan_table(plan) |
| console.print( |
| "[bold]Choose:[/bold] " |
| "[green][A][/green] Execute all β’ " |
| "[green][S][/green] Step-by-step β’ " |
| "[green][E][/green] Edit plan β’ " |
| f"[green][D][/green] Toggle dry-run (now: {'ON' if dry_run else 'OFF'}) β’ " |
| "[green][C][/green] Cancel" |
| ) |
| choice = Prompt.ask("[yellow]Select[/yellow]", choices=["a","s","e","d","c"], default="a").lower() |
|
|
| if choice == "d": |
| dry_run = not dry_run |
| console.print(f"[cyan]Dry-run is now {'ON' if dry_run else 'OFF'}[/cyan]") |
| continue |
|
|
| if choice == "e": |
| edited = editor_edit_json(plan) |
| if edited is None: |
| console.print("[yellow]Keeping original plan.[/yellow]") |
| else: |
| plan = edited |
| (session_dir / "plan.edited.json").write_text(pretty_json(plan)) |
| continue |
|
|
| if choice == "c": |
| console.print("[red]Cancelled.[/red]") |
| return |
|
|
| |
| if choice in ("a", "s") and warn_if_danger(plan): |
| if not confirm_danger(): |
| console.print("[red]Aborted due to danger check.[/red]") |
| return |
|
|
| if dry_run: |
| console.print(Panel("DRY-RUN: No commands will be executed.", style="bold yellow")) |
| return |
|
|
| if choice == "a": |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| TimeElapsedColumn(), |
| transient=True, |
| ) as progress: |
| progress.add_task(description="Executing all stepsβ¦", total=None) |
| try: |
| results = local_execute(server, plan) |
| except Exception as e: |
| console.print(f"[red]Execution failed:[/red] {e}") |
| return |
|
|
| (session_dir / "results.json").write_text(pretty_json(results)) |
| show_result_panels(results) |
| return |
|
|
| if choice == "s": |
| stepwise_execute(server, plan, session_dir) |
| return |
|
|
|
|
| def stepwise_execute(server: str, plan: dict, session_dir: Path): |
| steps = plan.get("steps", []) |
| if not isinstance(steps, list) or not steps: |
| console.print("[yellow]No steps to execute.[/yellow]") |
| return |
|
|
| accumulated_results = [] |
| for idx, step in enumerate(steps, 1): |
| |
| stype = step.get("type", "unknown") |
| summary = stype |
| if stype == "shell": |
| summary = f"shell β [cyan]{step.get('cmd','')}[/cyan]" |
| elif stype == "read_file": |
| summary = f"read_file β {step.get('path','')}" |
| elif stype in {"write_file", "edit_file", "append_file"}: |
| summary = f"{stype} β {step.get('path','')}" |
| elif stype == "generate_file": |
| summary = f"generate_file β {step.get('path','')} ({step.get('format','text')})" |
| elif stype == "generate_tree": |
| summary = f"generate_tree β base={step.get('base','.')}, files={len(step.get('files',[]))}" |
| elif stype == "generate_large_file": |
| summary = f"generate_large_file β {step.get('path','?')} ({len(step.get('chunks',[]))} chunks)" |
| elif stype == "mkdirs": |
| summary = f"mkdirs β {', '.join(step.get('paths', []) or [])}" |
| elif stype == "python": |
| code = step.get("code","").strip().splitlines() |
| summary = f"python β {code[0][:60]}β¦" if code else "python" |
| elif stype == "respond_llm": |
| inst = (step.get("instruction","") or "").strip() |
| summary = f"respond_llm β {inst[:60]}β¦" if len(inst) > 60 else f"respond_llm β {inst}" |
|
|
| console.print(Panel.fit(f"Step {idx}/{len(steps)} β {summary}", style="bold blue")) |
| console.print(Syntax(pretty_json(step), "json", theme="ansi_dark")) |
|
|
| |
| if stype == "shell" and any( |
| danger_check_shell(c) for c in _flatten_cmds_for_check(step.get("cmd","")) |
| ): |
| console.print("[bold red]Dangerous command detected for this step.[/bold red]") |
| if not confirm_danger(): |
| console.print("[red]Skipping step.[/red]") |
| continue |
|
|
| console.print("[bold]Choose:[/bold] " |
| "[green][Y][/green] run β’ " |
| "[green][E][/green] edit β’ " |
| "[green][S][/green] skip β’ " |
| "[green][Q][/green] quit all") |
| choice = Prompt.ask("[yellow]Select[/yellow]", choices=["y","e","s","q"], default="y").lower() |
|
|
| if choice == "q": |
| console.print("[red]Stopped by user.[/red]") |
| break |
|
|
| if choice == "e": |
| edited = editor_edit_json(step) |
| if edited is None: |
| console.print("[yellow]Keeping original step.[/yellow]") |
| else: |
| steps[idx-1] = edited |
| step = edited |
| (session_dir / f"step_{idx}.edited.json").write_text(pretty_json(step)) |
| choice = Prompt.ask("[yellow]Run edited step now?[/yellow]", choices=["y","n"], default="y") |
| if choice.lower() != "y": |
| console.print("[yellow]Skipping.[/yellow]") |
| continue |
|
|
| if choice == "s": |
| console.print("[yellow]Skipped.[/yellow]") |
| continue |
|
|
| |
| single_plan = {"steps": [step]} |
| try: |
| res = local_execute_step(server, step) |
| results = [res] |
| accumulated_results.append(res) |
| (session_dir / f"step_{idx}.result.json").write_text(pretty_json(res)) |
|
|
| console.rule(f"[bold magenta]Result β Step {idx}[/bold magenta]") |
| render_result(res) |
|
|
| except Exception as e: |
| console.print(f"[red]Step failed:[/red] {e}") |
| continue |
|
|
| if accumulated_results: |
| (session_dir / "results.stepwise.json").write_text(pretty_json(accumulated_results)) |
| console.print(Panel(f"Done. {len(accumulated_results)} step(s) executed.", style="bold green")) |
| else: |
| console.print("[yellow]No steps executed.[/yellow]") |
|
|
| |
| |
| |
| @click.command(context_settings={"help_option_names": ["-h", "--help"]}) |
| @click.option("--server", default=DEFAULT_SERVER, show_default=True, |
| help="Server URL, e.g. http://127.0.0.1:5005") |
| @click.option("--context", "context_str", default="", |
| help="Optional context string to pass to /infer") |
| @click.option("--dry-run", is_flag=True, default=False, |
| help="Preview only, do not execute") |
| def main(server: str, context_str: str, dry_run: bool): |
| ensure_dirs() |
| cfg = load_config() |
| cfg["server"] = server |
| save_config(cfg) |
|
|
| try: |
| _ = get_api_key() |
| except Exception as e: |
| console.print(f"[red]{e}[/red]") |
| return |
|
|
| console.print(Panel.fit(f"Axis Client\n[green]{server}[/green]", style="bold blue")) |
|
|
| ok, msg = health_check(server) |
| if not ok: |
| console.print(f"[red]Server health check failed:[/red] {msg}") |
| return |
|
|
| session_dir = new_session_path() |
| console.print(f"[dim]Session folder:[/dim] {session_dir}") |
|
|
| while True: |
| try: |
| user_in = Prompt.ask("\n[bold yellow]>>>[/bold yellow]").strip() |
| if not user_in: |
| continue |
|
|
| if user_in.lower() in ("exit", "quit", ":q"): |
| console.print("[red]Goodbye.[/red]") |
| break |
|
|
| |
| if user_in.startswith(":"): |
| handled = handle_local_command(user_in, server, session_dir) |
| if not handled: |
| console.print("[yellow]Unknown client command.[/yellow]") |
| continue |
|
|
| |
| with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as prog: |
| prog.add_task(description="Planningβ¦", total=None) |
| try: |
| plan = api_infer(server, user_in, context_str or None) |
| except Exception as e: |
| console.print(f"[red]Plan failed:[/red] {e}") |
| continue |
|
|
| |
| (session_dir / "prompt.txt").write_text(user_in) |
| (session_dir / "plan.json").write_text(pretty_json(plan)) |
|
|
| interactive_menu(plan, dry_run=dry_run, server=server, session_dir=session_dir) |
|
|
| except KeyboardInterrupt: |
| console.print("\n[red]Interrupted.[/red]") |
| break |
| except Exception as e: |
| console.print(f"[red]Error:[/red] {e}") |
| continue |
|
|
| |
| |
| |
| def handle_local_command(cmdline: str, server: str, session_dir: Path) -> bool: |
| """ |
| Supported: |
| :server -> show server |
| :server <URL> -> set new server |
| :history -> list previous sessions |
| :open <path> -> view a file with pager |
| :clear -> clear screen |
| :help -> show help |
| """ |
| parts = shlex.split(cmdline) |
| if not parts: |
| return True |
| cmd = parts[0].lower() |
|
|
| if cmd == ":server": |
| if len(parts) == 1: |
| console.print(f"[cyan]Current server:[/cyan] {server}") |
| else: |
| new_s = parts[1] |
| cfg = load_config() |
| cfg["server"] = new_s |
| save_config(cfg) |
| console.print(f"[green]Server updated:[/green] {new_s}") |
| return True |
| |
| if cmd == ":apikey": |
| sub = parts[1].lower() if len(parts) > 1 else "" |
| if sub == "set": |
| new_key = getpass("New API key: ").strip() |
| if new_key: |
| write_local_key(new_key) |
| console.print("[green]API key updated in key.json.[/green]") |
| elif sub == "clear": |
| try: |
| KEY_FILE.unlink(missing_ok=True) |
| console.print("[yellow]key.json removed. Next run will prompt.[/yellow]") |
| except Exception as e: |
| console.print(f"[red]Failed to remove key.json:[/red] {e}") |
| else: |
| console.print("[cyan]Usage:[/cyan] :apikey set | :apikey clear") |
| return True |
|
|
| if cmd == ":history": |
| rows = [] |
| for p in sorted(SESS_DIR.glob("*"), reverse=True)[:20]: |
| stamp = p.name |
| prompt = "" |
| try: |
| pt = (p / "prompt.txt").read_text().strip() |
| prompt = (pt[:80] + "β¦") if len(pt) > 80 else pt |
| except Exception: |
| pass |
| rows.append((stamp, prompt)) |
| if not rows: |
| console.print("[yellow]No sessions yet.[/yellow]") |
| return True |
| t = Table(title="Recent Sessions", show_lines=False, box=ROUNDED) |
| t.add_column("Session", style="cyan") |
| t.add_column("Prompt", style="white") |
| for s, pr in rows: |
| t.add_row(s, pr) |
| console.print(t) |
| return True |
|
|
| if cmd == ":open" and len(parts) >= 2: |
| path = Path(parts[1]).expanduser() |
| if not path.exists(): |
| console.print(f"[red]No such file:[/red] {path}") |
| return True |
| try: |
| pager(path.read_text()) |
| except Exception as e: |
| console.print(f"[red]Failed to open:[/red] {e}") |
| return True |
|
|
| if cmd == ":clear": |
| console.clear() |
| return True |
|
|
| if cmd == ":help": |
| console.print(Panel.fit( |
| "Meta commands:\n" |
| " :server [URL] Show or set server URL\n" |
| " :apikey set Save/replace API key to key.json\n" |
| " :apikey clear Remove saved API key (prompt next run)\n" |
| " :history Show last sessions\n" |
| " :open <path> Page a local file\n" |
| " :clear Clear the screen\n" |
| " :help This help", |
| title="Client Help", style="bold cyan" |
| )) |
| return True |
|
|
| return False |
|
|
| |
| if __name__ == "__main__": |
| main() |
|
|