SEUyishu commited on
Commit
16e0e02
·
verified ·
1 Parent(s): 50c2a7f

Update start_mcp.py

Browse files
Files changed (1) hide show
  1. start_mcp.py +75 -62
start_mcp.py CHANGED
@@ -64,9 +64,11 @@ def run_sse_server(host: str, port: int):
64
 
65
  while True:
66
  try:
67
- msg = await asyncio.wait_for(queue.get(), timeout=30)
 
68
  yield f"event: message\ndata: {json.dumps(msg)}\n\n"
69
  except asyncio.TimeoutError:
 
70
  yield ": keepalive\n\n"
71
  except asyncio.CancelledError:
72
  pass
@@ -98,71 +100,82 @@ def run_sse_server(host: str, port: int):
98
  if not mcp_available:
99
  return JSONResponse({"error": "MCP service not available"}, status_code=500)
100
 
101
- try:
102
- body = await request.json()
103
- method = body.get("method", "")
104
- params = body.get("params", {})
105
- msg_id = body.get("id")
106
-
107
- logger.info(f"Method: {method}, ID: {msg_id}")
108
-
109
- # Process MCP methods
110
- if method == "initialize":
111
- result = {
112
- "protocolVersion": "2024-11-05",
113
- "serverInfo": {"name": "MaTableGPT-MCP", "version": "1.0.0"},
114
- "capabilities": {"tools": {}}
115
- }
116
- elif method == "notifications/initialized":
117
- return JSONResponse({"ok": True})
118
- elif method == "tools/list":
119
- tools = []
120
- for name, tool in mcp._tool_manager._tools.items():
121
- # Tool object has name, description, parameters attributes
122
- tools.append({
123
- "name": tool.name,
124
- "description": tool.description or name,
125
- "inputSchema": tool.parameters if hasattr(tool, 'parameters') else {"type": "object", "properties": {}}
126
- })
127
- result = {"tools": tools}
128
- logger.info(f"Listed {len(tools)} tools")
129
- elif method == "tools/call":
130
- tool_name = params.get("name")
131
- tool_args = params.get("arguments", {})
132
-
133
- if tool_name not in mcp._tool_manager._tools:
134
- raise Exception(f"Unknown tool: {tool_name}")
135
-
136
- logger.info(f"Calling tool: {tool_name} with args: {list(tool_args.keys())}")
137
-
138
- # Get the Tool object and call its fn (the original function)
139
- tool = mcp._tool_manager._tools[tool_name]
140
-
141
- # Call the underlying function directly
142
- if tool.is_async:
143
- tool_result = await tool.fn(**tool_args)
 
 
 
 
144
  else:
145
- tool_result = tool.fn(**tool_args)
146
 
147
- result = {"content": [{"type": "text", "text": json.dumps(tool_result)}]}
148
- logger.info(f"Tool {tool_name} completed successfully")
149
- else:
150
- raise Exception(f"Unknown method: {method}")
151
-
152
- response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
 
 
 
 
 
153
 
154
- except Exception as e:
155
- logger.error(f"Error processing request: {e}")
156
- import traceback
157
- traceback.print_exc()
158
- response = {
159
- "jsonrpc": "2.0",
160
- "id": body.get("id") if 'body' in dir() else None,
161
- "error": {"code": -32000, "message": str(e)}
162
- }
 
 
 
 
163
 
164
- # Send via SSE
165
- await connections[session_id].put(response)
166
  return JSONResponse({"ok": True})
167
 
168
  async def health(request: Request):
 
64
 
65
  while True:
66
  try:
67
+ # Shorter timeout for more frequent keepalives
68
+ msg = await asyncio.wait_for(queue.get(), timeout=15)
69
  yield f"event: message\ndata: {json.dumps(msg)}\n\n"
70
  except asyncio.TimeoutError:
71
+ # Send keepalive more frequently
72
  yield ": keepalive\n\n"
73
  except asyncio.CancelledError:
74
  pass
 
100
  if not mcp_available:
101
  return JSONResponse({"error": "MCP service not available"}, status_code=500)
102
 
103
+ body = await request.json()
104
+ method = body.get("method", "")
105
+ params = body.get("params", {})
106
+ msg_id = body.get("id")
107
+
108
+ logger.info(f"Method: {method}, ID: {msg_id}")
109
+
110
+ async def process_request():
111
+ """Process the request in background."""
112
+ try:
113
+ # Process MCP methods
114
+ if method == "initialize":
115
+ result = {
116
+ "protocolVersion": "2024-11-05",
117
+ "serverInfo": {"name": "MaTableGPT-MCP", "version": "1.0.0"},
118
+ "capabilities": {"tools": {}}
119
+ }
120
+ elif method == "tools/list":
121
+ tools = []
122
+ for name, tool in mcp._tool_manager._tools.items():
123
+ tools.append({
124
+ "name": tool.name,
125
+ "description": tool.description or name,
126
+ "inputSchema": tool.parameters if hasattr(tool, 'parameters') else {"type": "object", "properties": {}}
127
+ })
128
+ result = {"tools": tools}
129
+ logger.info(f"Listed {len(tools)} tools")
130
+ elif method == "tools/call":
131
+ tool_name = params.get("name")
132
+ tool_args = params.get("arguments", {})
133
+
134
+ if tool_name not in mcp._tool_manager._tools:
135
+ raise Exception(f"Unknown tool: {tool_name}")
136
+
137
+ logger.info(f"Calling tool: {tool_name}")
138
+
139
+ tool = mcp._tool_manager._tools[tool_name]
140
+
141
+ # Run in executor if sync to avoid blocking
142
+ if tool.is_async:
143
+ tool_result = await tool.fn(**tool_args)
144
+ else:
145
+ loop = asyncio.get_event_loop()
146
+ tool_result = await loop.run_in_executor(None, lambda: tool.fn(**tool_args))
147
+
148
+ result = {"content": [{"type": "text", "text": json.dumps(tool_result)}]}
149
+ logger.info(f"Tool {tool_name} completed")
150
  else:
151
+ raise Exception(f"Unknown method: {method}")
152
 
153
+ response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
154
+
155
+ except Exception as e:
156
+ logger.error(f"Error: {e}")
157
+ import traceback
158
+ traceback.print_exc()
159
+ response = {
160
+ "jsonrpc": "2.0",
161
+ "id": msg_id,
162
+ "error": {"code": -32000, "message": str(e)}
163
+ }
164
 
165
+ # Send response via SSE
166
+ if session_id in connections:
167
+ await connections[session_id].put(response)
168
+ logger.info(f"Response sent for {method}")
169
+ else:
170
+ logger.error(f"Session {session_id} disconnected before response")
171
+
172
+ # Handle notifications immediately
173
+ if method == "notifications/initialized":
174
+ return JSONResponse({"ok": True})
175
+
176
+ # Start background task for other requests
177
+ asyncio.create_task(process_request())
178
 
 
 
179
  return JSONResponse({"ok": True})
180
 
181
  async def health(request: Request):