File size: 7,082 Bytes
5bf0029
 
 
 
 
 
 
 
cbee027
5bf0029
 
c10889a
5bf0029
 
 
c10889a
5bf0029
 
cbee027
 
 
c10889a
cbee027
5bf0029
 
 
 
c10889a
5bf0029
 
 
c10889a
5bf0029
 
 
 
c10889a
5bf0029
 
 
 
 
 
 
 
 
 
 
 
 
c10889a
5bf0029
 
 
 
c10889a
5bf0029
c10889a
 
5bf0029
 
 
 
c10889a
5bf0029
 
 
 
 
c10889a
5bf0029
c10889a
5bf0029
 
 
 
 
 
c10889a
5bf0029
c10889a
5bf0029
 
 
 
 
 
 
d03718f
5bf0029
c10889a
 
5bf0029
 
 
 
c10889a
 
 
5bf0029
c10889a
 
5bf0029
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c10889a
d03718f
c10889a
 
 
 
 
cbee027
c10889a
 
 
 
 
 
d03718f
c10889a
 
 
 
 
 
 
 
 
cbee027
5bf0029
cbee027
5bf0029
d03718f
5bf0029
 
 
 
c10889a
5bf0029
 
 
c10889a
 
 
5bf0029
cbee027
 
 
c10889a
 
 
cbee027
c10889a
 
5bf0029
 
 
c10889a
5bf0029
 
 
c10889a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import onnxruntime as ort
import numpy as np
import chess
import time
import logging
import os
from typing import Optional, Tuple

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class NexusNanoEngine:
    PIECE_VALUES = {chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3, chess.ROOK: 5, chess.QUEEN: 9, chess.KING: 0}
    
    def __init__(self, model_path: str):
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model not found: {model_path}")
        
        logger.info(f"Loading: {model_path} ({os.path.getsize(model_path)/(1024*1024):.2f} MB)")
        
        sess_options = ort.SessionOptions()
        sess_options.intra_op_num_threads = 2
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        self.session = ort.InferenceSession(model_path, sess_options=sess_options, providers=['CPUExecutionProvider'])
        self.input_name = self.session.get_inputs()[0].name
        self.output_name = self.session.get_outputs()[0].name
        self.nodes = 0
        logger.info("βœ… Engine ready")
    
    def fen_to_tensor(self, fen: str) -> np.ndarray:
        board = chess.Board(fen)
        tensor = np.zeros((1, 12, 8, 8), dtype=np.float32)
        piece_map = {chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2, chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5}
        for sq, piece in board.piece_map().items():
            r, f = divmod(sq, 8)
            ch = piece_map[piece.piece_type] + (6 if piece.color == chess.BLACK else 0)
            tensor[0, ch, r, f] = 1.0
        return tensor
    
    def evaluate(self, board: chess.Board) -> float:
        self.nodes += 1
        tensor = self.fen_to_tensor(board.fen())
        output = self.session.run([self.output_name], {self.input_name: tensor})
        score = float(output[0][0][0]) * 400.0
        return -score if board.turn == chess.BLACK else score
    
    def order_moves(self, board, moves):
        scored = []
        for m in moves:
            s = 0
            if board.is_capture(m):
                v, a = board.piece_at(m.to_square), board.piece_at(m.from_square)
                if v and a:
                    s = self.PIECE_VALUES.get(v.piece_type, 0) * 10 - self.PIECE_VALUES.get(a.piece_type, 0)
            if m.promotion == chess.QUEEN: s += 90
            scored.append((s, m))
        scored.sort(key=lambda x: x[0], reverse=True)
        return [m for _, m in scored]
    
    def alpha_beta(self, board, depth, alpha, beta) -> Tuple[float, Optional[chess.Move]]:
        if board.is_game_over():
            return (-10000 if board.is_checkmate() else 0), None
        if depth == 0:
            return self.evaluate(board), None
        moves = list(board.legal_moves)
        if not moves: return 0, None
        moves = self.order_moves(board, moves)
        best_move, best_score = moves[0], float('-inf')
        for move in moves:
            board.push(move)
            score, _ = self.alpha_beta(board, depth - 1, -beta, -alpha)
            score = -score
            board.pop()
            if score > best_score:
                best_score, best_move = score, move
            alpha = max(alpha, score)
            if alpha >= beta: break
        return best_score, best_move
    
    def search(self, fen: str, depth: int = 3):
        board = chess.Board(fen)
        self.nodes = 0
        moves = list(board.legal_moves)
        if len(moves) == 0:
            return {'best_move': '0000', 'evaluation': 0.0, 'nodes': 0, 'depth': 0}
        if len(moves) == 1:
            return {'best_move': moves[0].uci(), 'evaluation': round(self.evaluate(board)/100, 2), 'nodes': 1, 'depth': 0}
        best_move, best_score, current_depth = moves[0], float('-inf'), 1
        for d in range(1, depth + 1):
            try:
                score, move = self.alpha_beta(board, d, float('-inf'), float('inf'))
                if move:
                    best_move, best_score, current_depth = move, score, d
            except: break
        return {'best_move': best_move.uci(), 'evaluation': round(best_score/100, 2), 'depth': current_depth, 'nodes': self.nodes}

app = FastAPI(title="Nexus-Nano API", version="1.0.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

engine = None

class MoveRequest(BaseModel):
    fen: str
    depth: Optional[int] = Field(3, ge=1, le=5)

class MoveResponse(BaseModel):
    best_move: str
    evaluation: float
    depth_searched: int
    nodes_evaluated: int
    time_taken: int

@app.on_event("startup")
async def startup():
    global engine
    logger.info("πŸš€ Starting Nexus-Nano...")
    
    # FIXED: Check both possible paths
    possible_paths = [
        "/app/app/models/nexus-nano.onnx",  # When uploaded to app/models/
        "/app/models/nexus-nano.onnx"        # When uploaded to models/
    ]
    
    model_path = None
    for path in possible_paths:
        if os.path.exists(path):
            model_path = path
            logger.info(f"βœ… Found model at: {path}")
            break
    
    if not model_path:
        logger.error("❌ Model not found in any expected location")
        logger.error(f"Checked paths: {possible_paths}")
        # List all files
        for root, dirs, files in os.walk("/app"):
            for file in files:
                if file.endswith('.onnx'):
                    logger.error(f"Found .onnx at: {os.path.join(root, file)}")
        raise FileNotFoundError("Model not found")
    
    try:
        engine = NexusNanoEngine(model_path)
    except Exception as e:
        logger.error(f"❌ Load failed: {e}", exc_info=True)
        raise

@app.get("/health")
async def health():
    return {"status": "healthy" if engine else "unhealthy", "model": "nexus-nano", "version": "1.0.0"}

@app.post("/get-move", response_model=MoveResponse)
async def get_move(req: MoveRequest):
    if not engine: raise HTTPException(503, "Not loaded")
    try: chess.Board(req.fen)
    except: raise HTTPException(400, "Invalid FEN")
    start = time.time()
    try:
        result = engine.search(req.fen, req.depth)
        elapsed = int((time.time() - start) * 1000)
        logger.info(f"βœ“ {result['best_move']} | {result['evaluation']:+.2f} | {elapsed}ms")
        return MoveResponse(best_move=result['best_move'], evaluation=result['evaluation'], 
                          depth_searched=result['depth'], nodes_evaluated=result['nodes'], time_taken=elapsed)
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        raise HTTPException(500, str(e))

@app.get("/")
async def root():
    return {"name": "Nexus-Nano", "version": "1.0.0", "status": "online" if engine else "starting"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info")