| """ |
| parse.py |
| Translate .p/ commands to function calls in transformerOS |
| |
| This module handles the parsing and translation of the .p/ command language, |
| transforming structured .p/ commands into executable function calls in the |
| transformerOS framework. |
| """ |
|
|
| import re |
| import yaml |
| import json |
| import logging |
| from typing import Dict, List, Optional, Tuple, Union, Any |
| from pathlib import Path |
|
|
| |
| log = logging.getLogger("transformerOS.pareto_lang.parse") |
| log.setLevel(logging.INFO) |
|
|
| class CommandParser: |
| """ |
| Parser for the .p/ command language |
| |
| This class handles the parsing and translation of .p/ commands into |
| executable function calls, enabling a consistent interface between |
| the symbolic command language and the underlying system operations. |
| """ |
| |
| def __init__(self, commands_file: Optional[str] = None): |
| """ |
| Initialize the command parser |
| |
| Parameters: |
| ----------- |
| commands_file : Optional[str] |
| Path to the YAML file containing command definitions |
| If None, uses the default commands.yaml in the same directory |
| """ |
| |
| if commands_file is None: |
| |
| commands_file = Path(__file__).parent / "commands.yaml" |
| |
| self.commands = self._load_commands(commands_file) |
| |
| |
| self.command_pattern = re.compile(r'\.p/([a-zA-Z_]+)\.([a-zA-Z_]+)(\{.*\})?') |
| self.param_pattern = re.compile(r'([a-zA-Z_]+)\s*=\s*([^,}]+)') |
| |
| log.info(f"CommandParser initialized with {len(self.commands)} command definitions") |
|
|
| def _load_commands(self, commands_file: str) -> Dict: |
| """Load command definitions from YAML file""" |
| try: |
| with open(commands_file, 'r') as f: |
| commands = yaml.safe_load(f) |
| |
| |
| if not isinstance(commands, dict) or "commands" not in commands: |
| raise ValueError(f"Invalid command file format: {commands_file}") |
| |
| log.info(f"Loaded {len(commands['commands'])} command definitions from {commands_file}") |
| return commands |
| |
| except Exception as e: |
| log.error(f"Failed to load commands from {commands_file}: {e}") |
| |
| return {"commands": {}, "version": "unknown"} |
|
|
| def parse_command(self, command_str: str) -> Dict: |
| """ |
| Parse a .p/ command string into a structured command object |
| |
| Parameters: |
| ----------- |
| command_str : str |
| The .p/ command string to parse |
| |
| Returns: |
| -------- |
| Dict containing the parsed command structure |
| """ |
| |
| command_str = command_str.strip() |
| |
| |
| if not command_str.startswith(".p/"): |
| raise ValueError(f"Not a valid .p/ command: {command_str}") |
| |
| |
| match = self.command_pattern.match(command_str) |
| if not match: |
| raise ValueError(f"Invalid command format: {command_str}") |
| |
| domain, operation, params_str = match.groups() |
| |
| |
| params = {} |
| if params_str: |
| |
| params_str = params_str.strip('{}') |
| |
| |
| param_matches = self.param_pattern.findall(params_str) |
| for param_name, param_value in param_matches: |
| |
| params[param_name] = self._parse_parameter_value(param_value) |
| |
| |
| parsed_command = { |
| "domain": domain, |
| "operation": operation, |
| "parameters": params, |
| "original": command_str |
| } |
| |
| log.debug(f"Parsed command: {parsed_command}") |
| return parsed_command |
|
|
| def _parse_parameter_value(self, value_str: str) -> Any: |
| """Parse a parameter value string into the appropriate type""" |
| |
| value_str = value_str.strip() |
| |
| |
| if (value_str.startswith('"') and value_str.endswith('"')) or \ |
| (value_str.startswith("'") and value_str.endswith("'")): |
| return value_str[1:-1] |
| |
| |
| try: |
| |
| if value_str.isdigit(): |
| return int(value_str) |
| |
| |
| return float(value_str) |
| except ValueError: |
| pass |
| |
| |
| if value_str.lower() == "true": |
| return True |
| elif value_str.lower() == "false": |
| return False |
| |
| |
| if value_str.lower() == "null" or value_str.lower() == "none": |
| return None |
| |
| |
| if value_str.startswith('[') and value_str.endswith(']'): |
| |
| items = value_str[1:-1].split(',') |
| return [self._parse_parameter_value(item) for item in items] |
| |
| |
| if value_str.lower() == "complete": |
| return "complete" |
| |
| |
| return value_str |
|
|
| def validate_command(self, parsed_command: Dict) -> Tuple[bool, Optional[str]]: |
| """ |
| Validate a parsed command against command definitions |
| |
| Parameters: |
| ----------- |
| parsed_command : Dict |
| The parsed command structure to validate |
| |
| Returns: |
| -------- |
| Tuple of (is_valid, error_message) |
| """ |
| domain = parsed_command["domain"] |
| operation = parsed_command["operation"] |
| parameters = parsed_command["parameters"] |
| |
| |
| if domain not in self.commands["commands"]: |
| return False, f"Unknown command domain: {domain}" |
| |
| |
| domain_commands = self.commands["commands"][domain] |
| if operation not in domain_commands: |
| return False, f"Unknown operation '{operation}' in domain '{domain}'" |
| |
| |
| command_def = domain_commands[operation] |
| |
| |
| if "required_parameters" in command_def: |
| for required_param in command_def["required_parameters"]: |
| if required_param not in parameters: |
| return False, f"Missing required parameter: {required_param}" |
| |
| |
| if "parameters" in command_def: |
| for param_name, param_value in parameters.items(): |
| if param_name in command_def["parameters"]: |
| expected_type = command_def["parameters"][param_name]["type"] |
| |
| |
| if not self._validate_parameter_type(param_value, expected_type): |
| return False, f"Parameter '{param_name}' has invalid type. Expected {expected_type}." |
| |
| return True, None |
|
|
| def _validate_parameter_type(self, value: Any, expected_type: str) -> bool: |
| """Validate a parameter value against its expected type""" |
| if expected_type == "string": |
| return isinstance(value, str) |
| elif expected_type == "int": |
| return isinstance(value, int) |
| elif expected_type == "float": |
| return isinstance(value, (int, float)) |
| elif expected_type == "bool": |
| return isinstance(value, bool) |
| elif expected_type == "list": |
| return isinstance(value, list) |
| elif expected_type == "any": |
| return True |
| elif expected_type == "string_or_int": |
| return isinstance(value, (str, int)) |
| elif expected_type == "null": |
| return value is None |
| else: |
| |
| return True |
|
|
| def get_function_mapping(self, parsed_command: Dict) -> Dict: |
| """ |
| Get the function mapping for a parsed command |
| |
| Parameters: |
| ----------- |
| parsed_command : Dict |
| The parsed command structure |
| |
| Returns: |
| -------- |
| Dict containing function mapping information |
| """ |
| domain = parsed_command["domain"] |
| operation = parsed_command["operation"] |
| |
| |
| try: |
| command_def = self.commands["commands"][domain][operation] |
| except KeyError: |
| log.warning(f"No function mapping found for {domain}.{operation}") |
| return {"function": None, "module": None, "parameters": {}} |
| |
| |
| function_mapping = command_def.get("function_mapping", {}) |
| |
| |
| mapping = { |
| "function": function_mapping.get("function", None), |
| "module": function_mapping.get("module", None), |
| "parameters": self._map_parameters(parsed_command["parameters"], function_mapping), |
| "original_command": parsed_command["original"] |
| } |
| |
| return mapping |
|
|
| def _map_parameters(self, cmd_params: Dict, function_mapping: Dict) -> Dict: |
| """Map command parameters to function parameters based on mapping rules""" |
| result_params = {} |
| |
| |
| param_mapping = function_mapping.get("parameter_mapping", {}) |
| for cmd_param, func_param in param_mapping.items(): |
| if cmd_param in cmd_params: |
| result_params[func_param] = cmd_params[cmd_param] |
| |
| |
| for param_name, param_value in cmd_params.items(): |
| if param_name not in param_mapping.values(): |
| |
| result_params[param_name] = param_value |
| |
| return result_params |
|
|
| def extract_commands(self, text: str) -> List[Dict]: |
| """ |
| Extract all .p/ commands from a text |
| |
| Parameters: |
| ----------- |
| text : str |
| The text to extract commands from |
| |
| Returns: |
| -------- |
| List of parsed command dictionaries |
| """ |
| |
| pattern = r'(\.p/[a-zA-Z_]+\.[a-zA-Z_]+(?:\{[^}]*\})?)' |
| command_matches = re.findall(pattern, text) |
| |
| |
| parsed_commands = [] |
| for cmd_str in command_matches: |
| try: |
| parsed_cmd = self.parse_command(cmd_str) |
| parsed_commands.append(parsed_cmd) |
| except ValueError as e: |
| log.warning(f"Failed to parse command '{cmd_str}': {e}") |
| |
| log.info(f"Extracted {len(parsed_commands)} commands from text") |
| return parsed_commands |
|
|
|
|
| |
| def execute_parsed_command(parsed_mapping: Dict) -> Dict: |
| """ |
| Execute a parsed command mapping using dynamic imports |
| |
| Parameters: |
| ----------- |
| parsed_mapping : Dict |
| The parsed function mapping to execute |
| |
| Returns: |
| -------- |
| Dict containing execution results |
| """ |
| function_name = parsed_mapping["function"] |
| module_name = parsed_mapping["module"] |
| parameters = parsed_mapping["parameters"] |
| |
| if not function_name or not module_name: |
| raise ValueError("Invalid function mapping: missing function or module name") |
| |
| try: |
| |
| module = __import__(module_name, fromlist=[function_name]) |
| |
| |
| function = getattr(module, function_name) |
| |
| |
| result = function(**parameters) |
| |
| |
| return { |
| "status": "success", |
| "result": result, |
| "command": parsed_mapping["original_command"] |
| } |
| |
| except ImportError as e: |
| log.error(f"Failed to import module {module_name}: {e}") |
| return { |
| "status": "error", |
| "error": f"Module not found: {module_name}", |
| "command": parsed_mapping["original_command"] |
| } |
| |
| except AttributeError as e: |
| log.error(f"Function {function_name} not found in module {module_name}: {e}") |
| return { |
| "status": "error", |
| "error": f"Function not found: {function_name}", |
| "command": parsed_mapping["original_command"] |
| } |
| |
| except Exception as e: |
| log.error(f"Error executing function {function_name}: {e}") |
| return { |
| "status": "error", |
| "error": str(e), |
| "command": parsed_mapping["original_command"] |
| } |
|
|
|
|
| |
| if __name__ == "__main__": |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="Parse .p/ commands") |
| parser.add_argument("command", help=".p/ command to parse") |
| parser.add_argument("--commands-file", help="Path to commands YAML file") |
| parser.add_argument("--execute", action="store_true", help="Execute the parsed command") |
| parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") |
| |
| args = parser.parse_args() |
| |
| |
| if args.verbose: |
| logging.basicConfig(level=logging.DEBUG) |
| else: |
| logging.basicConfig(level=logging.INFO) |
| |
| |
| cmd_parser = CommandParser(args.commands_file) |
| |
| try: |
| |
| parsed_cmd = cmd_parser.parse_command(args.command) |
| print("Parsed Command:") |
| print(json.dumps(parsed_cmd, indent=2)) |
| |
| |
| valid, error = cmd_parser.validate_command(parsed_cmd) |
| if valid: |
| print("Command validation: PASSED") |
| else: |
| print(f"Command validation: FAILED - {error}") |
| |
| |
| func_mapping = cmd_parser.get_function_mapping(parsed_cmd) |
| print("\nFunction Mapping:") |
| print(json.dumps(func_mapping, indent=2)) |
| |
| |
| if args.execute and valid: |
| print("\nExecuting command...") |
| result = execute_parsed_command(func_mapping) |
| print("\nExecution Result:") |
| print(json.dumps(result, indent=2)) |
| |
| except Exception as e: |
| print(f"Error: {e}") |
|
|