SEUyishu commited on
Commit
50c2a7f
·
verified ·
1 Parent(s): 268cef6

Update start_mcp.py

Browse files
Files changed (1) hide show
  1. start_mcp.py +42 -15
start_mcp.py CHANGED
@@ -36,8 +36,15 @@ def run_sse_server(host: str, port: int):
36
  from starlette.responses import JSONResponse, HTMLResponse, StreamingResponse
37
  from starlette.requests import Request
38
 
39
- # Import MCP service
40
- from mcp_service import mcp
 
 
 
 
 
 
 
41
 
42
  # Store SSE connections
43
  connections = {}
@@ -52,8 +59,7 @@ def run_sse_server(host: str, port: int):
52
 
53
  async def generate():
54
  try:
55
- # Send the message endpoint URL
56
- # MCP expects: event: endpoint, data: /messages?sessionId=xxx
57
  yield f"event: endpoint\ndata: /messages?sessionId={conn_id}\n\n"
58
 
59
  while True:
@@ -64,6 +70,8 @@ def run_sse_server(host: str, port: int):
64
  yield ": keepalive\n\n"
65
  except asyncio.CancelledError:
66
  pass
 
 
67
  finally:
68
  connections.pop(conn_id, None)
69
  logger.info(f"SSE closed: {conn_id}")
@@ -72,9 +80,10 @@ def run_sse_server(host: str, port: int):
72
  generate(),
73
  media_type="text/event-stream",
74
  headers={
75
- "Cache-Control": "no-cache",
76
  "Connection": "keep-alive",
77
  "X-Accel-Buffering": "no",
 
78
  }
79
  )
80
 
@@ -83,16 +92,20 @@ def run_sse_server(host: str, port: int):
83
  session_id = request.query_params.get("sessionId")
84
 
85
  if not session_id or session_id not in connections:
 
86
  return JSONResponse({"error": "Invalid session"}, status_code=400)
87
 
 
 
 
88
  try:
89
  body = await request.json()
90
- logger.info(f"Request: {body.get('method')}")
91
-
92
  method = body.get("method", "")
93
  params = body.get("params", {})
94
  msg_id = body.get("id")
95
 
 
 
96
  # Process MCP methods
97
  if method == "initialize":
98
  result = {
@@ -101,17 +114,18 @@ def run_sse_server(host: str, port: int):
101
  "capabilities": {"tools": {}}
102
  }
103
  elif method == "notifications/initialized":
104
- # Just acknowledge, no response needed
105
  return JSONResponse({"ok": True})
106
  elif method == "tools/list":
107
  tools = []
108
- for name, func in mcp._tool_manager._tools.items():
 
109
  tools.append({
110
- "name": name,
111
- "description": (func.__doc__ or "").split("\n")[0].strip(),
112
- "inputSchema": {"type": "object", "properties": {}}
113
  })
114
  result = {"tools": tools}
 
115
  elif method == "tools/call":
116
  tool_name = params.get("name")
117
  tool_args = params.get("arguments", {})
@@ -119,18 +133,31 @@ def run_sse_server(host: str, port: int):
119
  if tool_name not in mcp._tool_manager._tools:
120
  raise Exception(f"Unknown tool: {tool_name}")
121
 
122
- tool_result = mcp._tool_manager._tools[tool_name](**tool_args)
 
 
 
 
 
 
 
 
 
 
123
  result = {"content": [{"type": "text", "text": json.dumps(tool_result)}]}
 
124
  else:
125
  raise Exception(f"Unknown method: {method}")
126
 
127
  response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
128
 
129
  except Exception as e:
130
- logger.error(f"Error: {e}")
 
 
131
  response = {
132
  "jsonrpc": "2.0",
133
- "id": body.get("id"),
134
  "error": {"code": -32000, "message": str(e)}
135
  }
136
 
 
36
  from starlette.responses import JSONResponse, HTMLResponse, StreamingResponse
37
  from starlette.requests import Request
38
 
39
+ # Try import MCP service
40
+ try:
41
+ from mcp_service import mcp
42
+ mcp_available = True
43
+ logger.info("MCP service loaded successfully")
44
+ except Exception as e:
45
+ mcp_available = False
46
+ mcp = None
47
+ logger.error(f"Failed to load MCP service: {e}")
48
 
49
  # Store SSE connections
50
  connections = {}
 
59
 
60
  async def generate():
61
  try:
62
+ # Send the message endpoint URL (just the path, not JSON)
 
63
  yield f"event: endpoint\ndata: /messages?sessionId={conn_id}\n\n"
64
 
65
  while True:
 
70
  yield ": keepalive\n\n"
71
  except asyncio.CancelledError:
72
  pass
73
+ except Exception as e:
74
+ logger.error(f"SSE generate error: {e}")
75
  finally:
76
  connections.pop(conn_id, None)
77
  logger.info(f"SSE closed: {conn_id}")
 
80
  generate(),
81
  media_type="text/event-stream",
82
  headers={
83
+ "Cache-Control": "no-cache, no-store, must-revalidate",
84
  "Connection": "keep-alive",
85
  "X-Accel-Buffering": "no",
86
+ "Access-Control-Allow-Origin": "*",
87
  }
88
  )
89
 
 
92
  session_id = request.query_params.get("sessionId")
93
 
94
  if not session_id or session_id not in connections:
95
+ logger.error(f"Invalid session: {session_id}, active: {list(connections.keys())}")
96
  return JSONResponse({"error": "Invalid session"}, status_code=400)
97
 
98
+ if not mcp_available:
99
+ return JSONResponse({"error": "MCP service not available"}, status_code=500)
100
+
101
  try:
102
  body = await request.json()
 
 
103
  method = body.get("method", "")
104
  params = body.get("params", {})
105
  msg_id = body.get("id")
106
 
107
+ logger.info(f"Method: {method}, ID: {msg_id}")
108
+
109
  # Process MCP methods
110
  if method == "initialize":
111
  result = {
 
114
  "capabilities": {"tools": {}}
115
  }
116
  elif method == "notifications/initialized":
 
117
  return JSONResponse({"ok": True})
118
  elif method == "tools/list":
119
  tools = []
120
+ for name, tool in mcp._tool_manager._tools.items():
121
+ # Tool object has name, description, parameters attributes
122
  tools.append({
123
+ "name": tool.name,
124
+ "description": tool.description or name,
125
+ "inputSchema": tool.parameters if hasattr(tool, 'parameters') else {"type": "object", "properties": {}}
126
  })
127
  result = {"tools": tools}
128
+ logger.info(f"Listed {len(tools)} tools")
129
  elif method == "tools/call":
130
  tool_name = params.get("name")
131
  tool_args = params.get("arguments", {})
 
133
  if tool_name not in mcp._tool_manager._tools:
134
  raise Exception(f"Unknown tool: {tool_name}")
135
 
136
+ logger.info(f"Calling tool: {tool_name} with args: {list(tool_args.keys())}")
137
+
138
+ # Get the Tool object and call its fn (the original function)
139
+ tool = mcp._tool_manager._tools[tool_name]
140
+
141
+ # Call the underlying function directly
142
+ if tool.is_async:
143
+ tool_result = await tool.fn(**tool_args)
144
+ else:
145
+ tool_result = tool.fn(**tool_args)
146
+
147
  result = {"content": [{"type": "text", "text": json.dumps(tool_result)}]}
148
+ logger.info(f"Tool {tool_name} completed successfully")
149
  else:
150
  raise Exception(f"Unknown method: {method}")
151
 
152
  response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
153
 
154
  except Exception as e:
155
+ logger.error(f"Error processing request: {e}")
156
+ import traceback
157
+ traceback.print_exc()
158
  response = {
159
  "jsonrpc": "2.0",
160
+ "id": body.get("id") if 'body' in dir() else None,
161
  "error": {"code": -32000, "message": str(e)}
162
  }
163