| """ |
| Nexus-Nano Search Engine |
| Fast alpha-beta with minimal overhead |
| |
| Focus: Speed > Depth |
| Target: Sub-second responses |
| """ |
|
|
| import chess |
| import logging |
| from typing import Optional, Tuple, List, Dict |
|
|
| from .evaluate import NexusNanoEvaluator |
| from .transposition import TranspositionTable, NodeType |
| from .move_ordering import MoveOrderer |
| from .time_manager import TimeManager |
| from .endgame import EndgameDetector |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class NexusNanoEngine: |
| """Ultra-fast 2.8M parameter chess engine""" |
| |
| MATE_SCORE = 100000 |
| MAX_PLY = 100 |
| |
| def __init__(self, model_path: str, num_threads: int = 1): |
| """Initialize with single-threaded config""" |
| |
| self.evaluator = NexusNanoEvaluator(model_path, num_threads) |
| self.tt = TranspositionTable(size_mb=64) |
| self.move_orderer = MoveOrderer() |
| self.time_manager = TimeManager() |
| self.endgame_detector = EndgameDetector() |
| |
| self.nodes_evaluated = 0 |
| self.depth_reached = 0 |
| self.sel_depth = 0 |
| self.principal_variation = [] |
| |
| logger.info("⚡ Nexus-Nano Engine initialized") |
| logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") |
| logger.info(f" TT: 64 MB") |
| |
| def get_best_move( |
| self, |
| fen: str, |
| depth: int = 4, |
| time_limit: int = 2000 |
| ) -> Dict: |
| """ |
| Fast move search |
| |
| Args: |
| fen: Position |
| depth: Max depth (1-6 recommended) |
| time_limit: Time in ms |
| """ |
| |
| board = chess.Board(fen) |
| |
| |
| self.nodes_evaluated = 0 |
| self.depth_reached = 0 |
| self.sel_depth = 0 |
| self.principal_variation = [] |
| |
| |
| time_limit_sec = time_limit / 1000.0 |
| self.time_manager.start_search(time_limit_sec, time_limit_sec) |
| |
| |
| self.move_orderer.clear() |
| self.tt.increment_age() |
| |
| |
| legal_moves = list(board.legal_moves) |
| |
| if len(legal_moves) == 0: |
| return self._no_legal_moves() |
| |
| if len(legal_moves) == 1: |
| return self._single_move(board, legal_moves[0]) |
| |
| |
| best_move = legal_moves[0] |
| best_score = float('-inf') |
| |
| for current_depth in range(1, depth + 1): |
| if self.time_manager.should_stop(current_depth): |
| break |
| |
| score, move, pv = self._search_root( |
| board, current_depth, float('-inf'), float('inf') |
| ) |
| |
| if move: |
| best_move = move |
| best_score = score |
| self.depth_reached = current_depth |
| self.principal_variation = pv |
| |
| return { |
| 'best_move': best_move.uci(), |
| 'evaluation': round(best_score / 100.0, 2), |
| 'depth_searched': self.depth_reached, |
| 'seldepth': self.sel_depth, |
| 'nodes_evaluated': self.nodes_evaluated, |
| 'time_taken': int(self.time_manager.elapsed() * 1000), |
| 'pv': [m.uci() for m in self.principal_variation], |
| 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), |
| 'tt_stats': self.tt.get_stats(), |
| 'move_ordering_stats': self.move_orderer.get_stats() |
| } |
| |
| def _search_root( |
| self, |
| board: chess.Board, |
| depth: int, |
| alpha: float, |
| beta: float |
| ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: |
| """Root search""" |
| |
| legal_moves = list(board.legal_moves) |
| |
| |
| zobrist_key = self.tt.compute_zobrist_key(board) |
| tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
| tt_move = tt_result[1] if tt_result else None |
| |
| |
| ordered_moves = self.move_orderer.order_moves( |
| board, legal_moves, depth, tt_move |
| ) |
| |
| best_move = ordered_moves[0] |
| best_score = float('-inf') |
| best_pv = [] |
| |
| for move in ordered_moves: |
| board.push(move) |
| score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
| score = -score |
| board.pop() |
| |
| if score > best_score: |
| best_score = score |
| best_move = move |
| best_pv = [move] + pv |
| |
| if score > alpha: |
| alpha = score |
| |
| if self.time_manager.should_stop(depth): |
| break |
| |
| self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move) |
| |
| return best_score, best_move, best_pv |
| |
| def _alpha_beta( |
| self, |
| board: chess.Board, |
| depth: int, |
| alpha: float, |
| beta: float |
| ) -> Tuple[float, List[chess.Move]]: |
| """Fast alpha-beta search""" |
| |
| self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) |
| |
| |
| if board.is_repetition(2) or board.is_fifty_moves(): |
| return 0, [] |
| |
| |
| zobrist_key = self.tt.compute_zobrist_key(board) |
| tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
| |
| if tt_result and tt_result[0] is not None: |
| return tt_result[0], [] |
| |
| tt_move = tt_result[1] if tt_result else None |
| |
| |
| if depth <= 0: |
| return self._quiescence(board, alpha, beta, 0), [] |
| |
| |
| legal_moves = list(board.legal_moves) |
| if not legal_moves: |
| if board.is_check(): |
| return -self.MATE_SCORE + (self.MAX_PLY - depth), [] |
| return 0, [] |
| |
| ordered_moves = self.move_orderer.order_moves( |
| board, legal_moves, depth, tt_move |
| ) |
| |
| |
| best_score = float('-inf') |
| best_pv = [] |
| node_type = NodeType.UPPER_BOUND |
| |
| for move in ordered_moves: |
| board.push(move) |
| score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
| score = -score |
| board.pop() |
| |
| if score > best_score: |
| best_score = score |
| best_pv = [move] + pv |
| |
| if score > alpha: |
| alpha = score |
| node_type = NodeType.EXACT |
| |
| if not board.is_capture(move): |
| self.move_orderer.update_killer_move(move, depth) |
| |
| if score >= beta: |
| node_type = NodeType.LOWER_BOUND |
| break |
| |
| self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) |
| |
| return best_score, best_pv |
| |
| def _quiescence( |
| self, |
| board: chess.Board, |
| alpha: float, |
| beta: float, |
| qs_depth: int |
| ) -> float: |
| """Fast quiescence (captures only)""" |
| |
| self.nodes_evaluated += 1 |
| |
| |
| stand_pat = self.evaluator.evaluate_hybrid(board) |
| stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) |
| |
| if stand_pat >= beta: |
| return beta |
| if alpha < stand_pat: |
| alpha = stand_pat |
| |
| |
| if qs_depth >= 6: |
| return stand_pat |
| |
| |
| captures = [m for m in board.legal_moves if board.is_capture(m)] |
| |
| if not captures: |
| return stand_pat |
| |
| captures = self.move_orderer.order_moves(board, captures, 0) |
| |
| for move in captures: |
| board.push(move) |
| score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) |
| board.pop() |
| |
| if score >= beta: |
| return beta |
| if score > alpha: |
| alpha = score |
| |
| return alpha |
| |
| def _no_legal_moves(self) -> Dict: |
| return { |
| 'best_move': '0000', |
| 'evaluation': 0.0, |
| 'depth_searched': 0, |
| 'nodes_evaluated': 0, |
| 'time_taken': 0 |
| } |
| |
| def _single_move(self, board: chess.Board, move: chess.Move) -> Dict: |
| eval_score = self.evaluator.evaluate_hybrid(board) |
| |
| return { |
| 'best_move': move.uci(), |
| 'evaluation': round(eval_score / 100.0, 2), |
| 'depth_searched': 0, |
| 'nodes_evaluated': 1, |
| 'time_taken': 0, |
| 'pv': [move.uci()] |
| } |
| |
| def validate_fen(self, fen: str) -> bool: |
| try: |
| chess.Board(fen) |
| return True |
| except: |
| return False |
| |
| def get_model_size(self) -> float: |
| return self.evaluator.get_model_size_mb() |