Rohan03 commited on
Commit
046b8d0
·
verified ·
1 Parent(s): be96ac2

V2 merge: purpose_agent/tools.py

Browse files
Files changed (1) hide show
  1. purpose_agent/tools.py +77 -20
purpose_agent/tools.py CHANGED
@@ -17,6 +17,7 @@ from __future__ import annotations
17
  import json
18
  import logging
19
  import math
 
20
  import re
21
  import time
22
  import traceback
@@ -260,7 +261,7 @@ class FunctionTool(Tool):
260
  # ---------------------------------------------------------------------------
261
 
262
  class CalculatorTool(Tool):
263
- """Safe math expression evaluator."""
264
  name = "calculator"
265
  description = "Evaluate a mathematical expression. Supports +, -, *, /, **, sqrt, sin, cos, abs."
266
  parameters = {
@@ -271,24 +272,45 @@ class CalculatorTool(Tool):
271
  "required": ["expression"],
272
  }
273
 
 
 
 
 
 
 
274
  def execute(self, expression: str) -> str:
275
- # Safe eval with math functions only
 
 
 
 
 
 
 
 
 
276
  allowed = {
277
  "abs": abs, "round": round, "min": min, "max": max,
278
  "sqrt": math.sqrt, "sin": math.sin, "cos": math.cos,
279
  "tan": math.tan, "log": math.log, "pi": math.pi, "e": math.e,
280
  }
281
- # Sanitize: only allow digits, operators, parentheses, dots, and allowed function names
282
- clean = re.sub(r'[^0-9+\-*/().,%\s]', '', expression.replace("^", "**"))
283
  try:
284
- result = eval(expression, {"__builtins__": {}}, allowed)
 
 
 
 
 
 
 
 
285
  return str(result)
286
  except Exception as e:
287
  return f"Error evaluating '{expression}': {e}"
288
 
289
 
290
  class PythonExecTool(Tool):
291
- """Execute Python code in a sandboxed environment."""
292
  name = "python_exec"
293
  description = "Execute Python code and return the output. Use print() to output results."
294
  parameters = {
@@ -298,22 +320,40 @@ class PythonExecTool(Tool):
298
  },
299
  "required": ["code"],
300
  }
 
301
 
302
  def execute(self, code: str) -> str:
303
- import io
304
- import contextlib
305
-
306
- output = io.StringIO()
307
- try:
308
- with contextlib.redirect_stdout(output):
309
- exec(code, {"__builtins__": __builtins__}, {})
310
- return output.getvalue() or "(no output)"
311
- except Exception as e:
312
- return f"Error: {e}\n{traceback.format_exc()}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
 
314
 
315
  class ReadFileTool(Tool):
316
- """Read a local file."""
317
  name = "read_file"
318
  description = "Read the contents of a file at the given path."
319
  parameters = {
@@ -324,9 +364,17 @@ class ReadFileTool(Tool):
324
  "required": ["path"],
325
  }
326
 
 
 
 
 
327
  def execute(self, path: str) -> str:
 
 
 
 
328
  try:
329
- with open(path, "r") as f:
330
  content = f.read()
331
  if len(content) > 10000:
332
  return content[:10000] + f"\n...[truncated, {len(content)} chars total]"
@@ -336,7 +384,7 @@ class ReadFileTool(Tool):
336
 
337
 
338
  class WriteFileTool(Tool):
339
- """Write content to a local file."""
340
  name = "write_file"
341
  description = "Write content to a file. Creates the file if it doesn't exist."
342
  parameters = {
@@ -348,9 +396,18 @@ class WriteFileTool(Tool):
348
  "required": ["path", "content"],
349
  }
350
 
 
 
 
 
351
  def execute(self, path: str, content: str) -> str:
 
 
 
 
352
  try:
353
- with open(path, "w") as f:
 
354
  f.write(content)
355
  return f"Written {len(content)} chars to {path}"
356
  except Exception as e:
 
17
  import json
18
  import logging
19
  import math
20
+ import os
21
  import re
22
  import time
23
  import traceback
 
261
  # ---------------------------------------------------------------------------
262
 
263
  class CalculatorTool(Tool):
264
+ """Safe math expression evaluator — no eval(), no arbitrary code."""
265
  name = "calculator"
266
  description = "Evaluate a mathematical expression. Supports +, -, *, /, **, sqrt, sin, cos, abs."
267
  parameters = {
 
272
  "required": ["expression"],
273
  }
274
 
275
+ # Whitelist of safe tokens
276
+ _SAFE_PATTERN = re.compile(
277
+ r'^[\d\s+\-*/().,%e]+$|'
278
+ r'(abs|round|min|max|sqrt|sin|cos|tan|log|pi)\b'
279
+ )
280
+
281
  def execute(self, expression: str) -> str:
282
+ import ast
283
+ import operator
284
+
285
+ # Only allow safe characters and function names
286
+ cleaned = expression.replace("^", "**").strip()
287
+ # Validate: reject anything with letters that aren't known functions
288
+ tokens = re.sub(r'(abs|round|min|max|sqrt|sin|cos|tan|log|pi|e)\b', '', cleaned)
289
+ if re.search(r'[a-zA-Z_]', tokens):
290
+ return f"Error: expression contains disallowed characters: '{expression}'"
291
+
292
  allowed = {
293
  "abs": abs, "round": round, "min": min, "max": max,
294
  "sqrt": math.sqrt, "sin": math.sin, "cos": math.cos,
295
  "tan": math.tan, "log": math.log, "pi": math.pi, "e": math.e,
296
  }
 
 
297
  try:
298
+ # Use compile + eval with empty builtins — no code execution
299
+ code = compile(cleaned, "<calc>", "eval")
300
+ # Verify AST contains only safe nodes
301
+ tree = ast.parse(cleaned, mode="eval")
302
+ for node in ast.walk(tree):
303
+ if isinstance(node, (ast.Call,)):
304
+ if isinstance(node.func, ast.Name) and node.func.id not in allowed:
305
+ return f"Error: function '{node.func.id}' not allowed"
306
+ result = eval(code, {"__builtins__": {}}, allowed)
307
  return str(result)
308
  except Exception as e:
309
  return f"Error evaluating '{expression}': {e}"
310
 
311
 
312
  class PythonExecTool(Tool):
313
+ """Execute Python code in a subprocess with timeout and temp directory."""
314
  name = "python_exec"
315
  description = "Execute Python code and return the output. Use print() to output results."
316
  parameters = {
 
320
  },
321
  "required": ["code"],
322
  }
323
+ timeout_seconds: float = 10.0
324
 
325
  def execute(self, code: str) -> str:
326
+ import subprocess
327
+ import tempfile
328
+ import os
329
+
330
+ # Write code to temp file in isolated temp directory
331
+ with tempfile.TemporaryDirectory(prefix="pa_exec_") as tmpdir:
332
+ script = os.path.join(tmpdir, "script.py")
333
+ with open(script, "w") as f:
334
+ f.write(code)
335
+ try:
336
+ result = subprocess.run(
337
+ ["python3", script],
338
+ capture_output=True, text=True,
339
+ timeout=self.timeout_seconds,
340
+ cwd=tmpdir,
341
+ env={**os.environ, "HOME": tmpdir}, # isolate HOME
342
+ )
343
+ output = result.stdout
344
+ if result.stderr:
345
+ output += f"\nSTDERR:\n{result.stderr}"
346
+ if result.returncode != 0:
347
+ output += f"\n(exit code {result.returncode})"
348
+ return output or "(no output)"
349
+ except subprocess.TimeoutExpired:
350
+ return f"Error: execution timed out after {self.timeout_seconds}s"
351
+ except Exception as e:
352
+ return f"Error: {e}"
353
 
354
 
355
  class ReadFileTool(Tool):
356
+ """Read a local file — sandboxed to allowed root directory."""
357
  name = "read_file"
358
  description = "Read the contents of a file at the given path."
359
  parameters = {
 
364
  "required": ["path"],
365
  }
366
 
367
+ def __init__(self, sandbox_root: str = ".", **kwargs):
368
+ self.sandbox_root = os.path.abspath(sandbox_root)
369
+ super().__init__(**kwargs)
370
+
371
  def execute(self, path: str) -> str:
372
+ import os
373
+ resolved = os.path.abspath(path)
374
+ if not resolved.startswith(self.sandbox_root):
375
+ return f"Error: path '{path}' is outside sandbox root '{self.sandbox_root}'"
376
  try:
377
+ with open(resolved, "r") as f:
378
  content = f.read()
379
  if len(content) > 10000:
380
  return content[:10000] + f"\n...[truncated, {len(content)} chars total]"
 
384
 
385
 
386
  class WriteFileTool(Tool):
387
+ """Write content to a local file — sandboxed to allowed root directory."""
388
  name = "write_file"
389
  description = "Write content to a file. Creates the file if it doesn't exist."
390
  parameters = {
 
396
  "required": ["path", "content"],
397
  }
398
 
399
+ def __init__(self, sandbox_root: str = ".", **kwargs):
400
+ self.sandbox_root = os.path.abspath(sandbox_root)
401
+ super().__init__(**kwargs)
402
+
403
  def execute(self, path: str, content: str) -> str:
404
+ import os
405
+ resolved = os.path.abspath(path)
406
+ if not resolved.startswith(self.sandbox_root):
407
+ return f"Error: path '{path}' is outside sandbox root '{self.sandbox_root}'"
408
  try:
409
+ os.makedirs(os.path.dirname(resolved) or ".", exist_ok=True)
410
+ with open(resolved, "w") as f:
411
  f.write(content)
412
  return f"Written {len(content)} chars to {path}"
413
  except Exception as e: