import asyncio import argparse import sys import logging from typing import Optional from openspace.tool_layer import OpenSpace, OpenSpaceConfig from openspace.utils.logging import Logger from openspace.utils.ui import create_ui, OpenSpaceUI from openspace.utils.ui_integration import UIIntegration from openspace.utils.cli_display import CLIDisplay from openspace.utils.display import colorize logger = Logger.get_logger(__name__) class UIManager: def __init__(self, ui: Optional[OpenSpaceUI], ui_integration: Optional[UIIntegration]): self.ui = ui self.ui_integration = ui_integration self._original_log_levels = {} async def start_live_display(self): if not self.ui or not self.ui_integration: return print() print(colorize(" ▣ Starting real-time visualization...", 'c')) print() await asyncio.sleep(1) self._suppress_logs() await self.ui.start_live_display() await self.ui_integration.start_monitoring(poll_interval=2.0) async def stop_live_display(self): if not self.ui or not self.ui_integration: return await self.ui_integration.stop_monitoring() await self.ui.stop_live_display() self._restore_logs() def print_summary(self, result: dict): if self.ui: self.ui.print_summary(result) else: CLIDisplay.print_result_summary(result) def _suppress_logs(self): log_names = ["openspace", "openspace.grounding", "openspace.agents"] for name in log_names: log = logging.getLogger(name) self._original_log_levels[name] = log.level log.setLevel(logging.CRITICAL) def _restore_logs(self): for name, level in self._original_log_levels.items(): logging.getLogger(name).setLevel(level) self._original_log_levels.clear() async def _execute_task(openspace: OpenSpace, query: str, ui_manager: UIManager): await ui_manager.start_live_display() result = await openspace.execute(query) await ui_manager.stop_live_display() ui_manager.print_summary(result) return result async def interactive_mode(openspace: OpenSpace, ui_manager: UIManager): CLIDisplay.print_interactive_header() while True: try: prompt = colorize(">>> ", 'c', bold=True) query = input(f"\n{prompt}").strip() if not query: continue if query.lower() in ['exit', 'quit', 'q']: print("\nExiting...") break if query.lower() == 'status': _print_status(openspace) continue if query.lower() == 'help': CLIDisplay.print_help() continue CLIDisplay.print_task_header(query) await _execute_task(openspace, query, ui_manager) except KeyboardInterrupt: print("\n\nInterrupt signal detected, exiting...") break except Exception as e: logger.error(f"Error: {e}", exc_info=True) print(f"\nError: {e}") async def single_query_mode(openspace: OpenSpace, query: str, ui_manager: UIManager): CLIDisplay.print_task_header(query, title="▶ Single Query Execution") await _execute_task(openspace, query, ui_manager) def _print_status(openspace: OpenSpace): """Print system status""" from openspace.utils.display import Box, BoxStyle box = Box(width=70, style=BoxStyle.ROUNDED, color='bl') print() print(box.text_line(colorize("System Status", 'bl', bold=True), align='center', indent=4, text_color='')) print(box.separator_line(indent=4)) status_lines = [ f"Initialized: {colorize('Yes' if openspace.is_initialized() else 'No', 'g' if openspace.is_initialized() else 'rd')}", f"Running: {colorize('Yes' if openspace.is_running() else 'No', 'y' if openspace.is_running() else 'g')}", f"Model: {colorize(openspace.config.llm_model, 'c')}", ] if openspace.is_initialized(): backends = openspace.list_backends() status_lines.append(f"Backends: {colorize(', '.join(backends), 'c')}") sessions = openspace.list_sessions() status_lines.append(f"Active Sessions: {colorize(str(len(sessions)), 'y')}") for line in status_lines: print(box.text_line(f" {line}", indent=4, text_color='')) print(box.bottom_line(indent=4)) print() def _create_argument_parser() -> argparse.ArgumentParser: """Create command-line argument parser""" parser = argparse.ArgumentParser( description='OpenSpace - Self-Evolving Skill Worker & Community', formatter_class=argparse.RawDescriptionHelpFormatter, ) # Subcommands subparsers = parser.add_subparsers(dest='command', help='Available commands') # refresh-cache subcommand cache_parser = subparsers.add_parser( 'refresh-cache', help='Refresh MCP tool cache (starts all servers once)' ) cache_parser.add_argument( '--config', '-c', type=str, help='MCP configuration file path' ) # Basic arguments (for run mode) parser.add_argument('--config', '-c', type=str, help='Configuration file path (JSON format)') parser.add_argument('--query', '-q', type=str, help='Single query mode: execute query directly') # LLM arguments parser.add_argument('--model', '-m', type=str, help='LLM model name') # Logging arguments parser.add_argument('--log-level', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], help='Log level') # Execution arguments parser.add_argument('--max-iterations', type=int, help='Maximum iteration count') parser.add_argument('--timeout', type=float, help='LLM API call timeout (seconds)') # UI arguments parser.add_argument('--interactive', '-i', action='store_true', help='Force interactive mode') parser.add_argument('--no-ui', action='store_true', help='Disable visualization UI') parser.add_argument('--ui-compact', action='store_true', help='Use compact UI layout') return parser async def refresh_mcp_cache(config_path: Optional[str] = None): """Refresh MCP tool cache by starting servers one by one and saving tool metadata.""" from openspace.grounding.backends.mcp import MCPProvider, get_tool_cache from openspace.grounding.core.types import SessionConfig, BackendType from openspace.config import load_config, get_config print("Refreshing MCP tool cache...") print("Servers will be started one by one (start -> get tools -> close).") print() # Load config if config_path: config = load_config(config_path) else: config = get_config() # Get MCP config mcp_config = getattr(config, 'mcp', None) or {} if hasattr(mcp_config, 'model_dump'): mcp_config = mcp_config.model_dump() # Skip dependency checks for refresh-cache (servers are pre-validated) mcp_config["check_dependencies"] = False # Create provider provider = MCPProvider(config=mcp_config) await provider.initialize() servers = provider.list_servers() total = len(servers) print(f"Found {total} MCP servers configured") print() cache = get_tool_cache() cache.set_server_order(servers) # Preserve config order when saving total_tools = 0 success_count = 0 skipped_count = 0 failed_servers = [] # Load existing cache to skip already processed servers existing_cache = cache.get_all_tools() # Timeout for each server (in seconds) SERVER_TIMEOUT = 60 # Process servers one by one for i, server_name in enumerate(servers, 1): # Skip if already cached (resume support) if server_name in existing_cache: cached_tools = existing_cache[server_name] total_tools += len(cached_tools) skipped_count += 1 print(f"[{i}/{total}] {server_name}... ⏭ cached ({len(cached_tools)} tools)") continue print(f"[{i}/{total}] {server_name}...", end=" ", flush=True) session_id = f"mcp-{server_name}" try: # Create session and get tools with timeout protection async with asyncio.timeout(SERVER_TIMEOUT): # Create session for this server cfg = SessionConfig( session_name=session_id, backend_type=BackendType.MCP, connection_params={"server": server_name}, ) session = await provider.create_session(cfg) # Get tools from this server tools = await session.list_tools() # Convert to metadata format tool_metadata = [] for tool in tools: tool_metadata.append({ "name": tool.schema.name, "description": tool.schema.description or "", "parameters": tool.schema.parameters or {}, }) # Save to cache (incremental) cache.save_server(server_name, tool_metadata) # Close session immediately to free resources await provider.close_session(session_id) total_tools += len(tools) success_count += 1 print(f"✓ {len(tools)} tools") except asyncio.TimeoutError: error_msg = f"Timeout after {SERVER_TIMEOUT}s" failed_servers.append((server_name, error_msg)) print(f"✗ {error_msg}") # Save failed server info to cache cache.save_failed_server(server_name, error_msg) # Try to close session if it was created try: await provider.close_session(session_id) except Exception: pass except Exception as e: error_msg = str(e) failed_servers.append((server_name, error_msg)) print(f"✗ {error_msg[:50]}") # Save failed server info to cache cache.save_failed_server(server_name, error_msg) # Try to close session if it was created try: await provider.close_session(session_id) except Exception: pass print() print(f"{'='*50}") print(f"✓ Collected {total_tools} tools from {success_count + skipped_count}/{total} servers") if skipped_count > 0: print(f" (skipped {skipped_count} cached, processed {success_count} new)") print(f"✓ Cache saved to: {cache.cache_path}") if failed_servers: print(f"✗ Failed servers ({len(failed_servers)}):") for name, err in failed_servers[:10]: print(f" - {name}: {err[:60]}") if len(failed_servers) > 10: print(f" ... and {len(failed_servers) - 10} more (see cache file for details)") print() print("Done! Future list_tools() calls will use cache (no server startup).") def _load_config(args) -> OpenSpaceConfig: """Load configuration""" cli_overrides = {} if args.model: cli_overrides['llm_model'] = args.model if args.max_iterations is not None: cli_overrides['grounding_max_iterations'] = args.max_iterations if args.timeout is not None: cli_overrides['llm_timeout'] = args.timeout if args.log_level: cli_overrides['log_level'] = args.log_level try: # Load from config file if provided if args.config: import json with open(args.config, 'r', encoding='utf-8') as f: config_dict = json.load(f) # Apply CLI overrides config_dict.update(cli_overrides) config = OpenSpaceConfig(**config_dict) print(f"✓ Loaded from config file: {args.config}") else: # Use default config + CLI overrides config = OpenSpaceConfig(**cli_overrides) print("✓ Using default configuration") if cli_overrides: print(f"✓ CLI overrides: {', '.join(cli_overrides.keys())}") if args.log_level: Logger.set_level(args.log_level) return config except Exception as e: logger.error(f"Failed to load configuration: {e}") sys.exit(1) def _setup_ui(args) -> tuple[Optional[OpenSpaceUI], Optional[UIIntegration]]: if args.no_ui: CLIDisplay.print_banner() return None, None ui = create_ui(enable_live=True, compact=args.ui_compact) ui.print_banner() ui_integration = UIIntegration(ui) return ui, ui_integration async def _initialize_openspace(config: OpenSpaceConfig, args) -> OpenSpace: openspace = OpenSpace(config) init_steps = [("Initializing OpenSpace...", "loading")] CLIDisplay.print_initialization_progress(init_steps, show_header=False) if not args.config: original_log_level = Logger.get_logger("openspace").level for log_name in ["openspace", "openspace.grounding", "openspace.agents"]: Logger.get_logger(log_name).setLevel(logging.WARNING) await openspace.initialize() # Restore log level if not args.config: for log_name in ["openspace", "openspace.grounding", "openspace.agents"]: Logger.get_logger(log_name).setLevel(original_log_level) # Print initialization results backends = openspace.list_backends() init_steps = [ ("LLM Client", "ok"), (f"Grounding Backends ({len(backends)} available)", "ok"), ("Grounding Agent", "ok"), ] if config.enable_recording: init_steps.append(("Recording Manager", "ok")) CLIDisplay.print_initialization_progress(init_steps, show_header=True) return openspace async def main(): parser = _create_argument_parser() args = parser.parse_args() # Handle subcommands if args.command == 'refresh-cache': await refresh_mcp_cache(args.config) return 0 # Load configuration config = _load_config(args) # Setup UI ui, ui_integration = _setup_ui(args) # Print configuration CLIDisplay.print_configuration(config) openspace = None try: # Initialize OpenSpace openspace = await _initialize_openspace(config, args) # Connect UI (if enabled) if ui_integration: ui_integration.attach_llm_client(openspace._llm_client) ui_integration.attach_grounding_client(openspace._grounding_client) CLIDisplay.print_system_ready() ui_manager = UIManager(ui, ui_integration) # Run appropriate mode if args.query: await single_query_mode(openspace, args.query, ui_manager) else: await interactive_mode(openspace, ui_manager) except KeyboardInterrupt: print("\n\nInterrupt signal detected") except Exception as e: logger.error(f"Error: {e}", exc_info=True) print(f"\nError: {e}") return 1 finally: if openspace: print("\nCleaning up resources...") await openspace.cleanup() print("\nGoodbye!") return 0 def run_main(): """Run main function""" try: exit_code = asyncio.run(main()) sys.exit(exit_code) except KeyboardInterrupt: print("\n\nProgram interrupted") sys.exit(0) if __name__ == "__main__": run_main()