Coverage for src/qdrant_loader_mcp_server/mcp/handler.py: 90%

112 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1"""MCP Handler implementation.""" 

2 

3from typing import Any 

4 

5from ..search.engine import SearchEngine 

6from ..search.processor import QueryProcessor 

7from ..utils import LoggingConfig, get_version 

8from .intelligence_handler import IntelligenceHandler 

9from .protocol import MCPProtocol 

10from .schemas import MCPSchemas 

11from .search_handler import SearchHandler 

12 

13# Get logger for this module 

14logger = LoggingConfig.get_logger("src.mcp.handler") 

15 

16 

17class MCPHandler: 

18 """MCP Handler for processing RAG requests.""" 

19 

20 def __init__(self, search_engine: SearchEngine, query_processor: QueryProcessor): 

21 """Initialize MCP Handler.""" 

22 self.protocol = MCPProtocol() 

23 self.search_engine = search_engine 

24 self.query_processor = query_processor 

25 

26 # Initialize specialized handlers 

27 self.search_handler = SearchHandler( 

28 search_engine, query_processor, self.protocol 

29 ) 

30 self.intelligence_handler = IntelligenceHandler(search_engine, self.protocol) 

31 

32 # Reduce noise on startup: use DEBUG level instead of INFO 

33 logger.debug("MCP Handler initialized") 

34 

35 async def handle_request( 

36 self, request: dict[str, Any], headers: dict[str, str] | None = None 

37 ) -> dict[str, Any]: 

38 """Handle MCP request. 

39 

40 Args: 

41 request: The request to handle 

42 headers: Optional HTTP headers for protocol validation 

43 

44 Returns: 

45 Dict[str, Any]: The response 

46 """ 

47 logger.debug("Handling request", request=request) 

48 

49 # Optional protocol version validation from headers 

50 if headers: 

51 protocol_version = headers.get("mcp-protocol-version") 

52 if protocol_version and protocol_version not in [ 

53 "2025-06-18", 

54 "2025-03-26", 

55 "2024-11-05", 

56 ]: 

57 logger.warning( 

58 f"Unsupported protocol version in headers: {protocol_version}" 

59 ) 

60 

61 # Validate request format 

62 if not self.protocol.validate_request(request): 

63 logger.error("Request validation failed") 

64 # For invalid requests, we need to determine if we can extract an ID 

65 request_id = None 

66 if isinstance(request, dict): 

67 request_id = request.get("id") 

68 if request_id is not None and not isinstance(request_id, str | int): 

69 request_id = None 

70 return { 

71 "jsonrpc": "2.0", 

72 "id": request_id, 

73 "error": { 

74 "code": -32600, 

75 "message": "Invalid Request", 

76 "data": "The request is not a valid JSON-RPC 2.0 request", 

77 }, 

78 } 

79 

80 method = request.get("method") 

81 params = request.get("params", {}) 

82 request_id = request.get("id") 

83 

84 logger.debug( 

85 "Processing request", method=method, params=params, request_id=request_id 

86 ) 

87 

88 # Handle notifications (requests without id) 

89 if request_id is None: 

90 logger.debug("Handling notification", method=method) 

91 return {} 

92 

93 try: 

94 if method == "initialize": 

95 logger.info("Handling initialize request") 

96 response = await self._handle_initialize(request_id, params) 

97 self.protocol.mark_initialized() 

98 logger.info("Server initialized successfully") 

99 return response 

100 elif method in ["listOfferings", "tools/list"]: 

101 logger.info(f"Handling {method} request") 

102 logger.debug( 

103 f"{method} request details", 

104 method=method, 

105 params=params, 

106 request_id=request_id, 

107 ) 

108 if not isinstance(method, str): 

109 return self.protocol.create_response( 

110 request_id, 

111 error={ 

112 "code": -32600, 

113 "message": "Invalid Request", 

114 "data": "Method must be a string", 

115 }, 

116 ) 

117 response = await self._handle_list_offerings(request_id, params, method) 

118 logger.debug(f"{method} response", response=response) 

119 return response 

120 elif method == "search": 

121 logger.info("Handling search request") 

122 return await self.search_handler.handle_search(request_id, params) 

123 # Cross-Document Intelligence Methods 

124 elif method == "analyze_document_relationships": 

125 logger.info("Handling document relationship analysis request") 

126 return await self.intelligence_handler.handle_analyze_document_relationships( 

127 request_id, params 

128 ) 

129 elif method == "find_similar_documents": 

130 logger.info("Handling find similar documents request") 

131 return await self.intelligence_handler.handle_find_similar_documents( 

132 request_id, params 

133 ) 

134 elif method == "detect_document_conflicts": 

135 logger.info("Handling conflict detection request") 

136 return await self.intelligence_handler.handle_detect_document_conflicts( 

137 request_id, params 

138 ) 

139 elif method == "find_complementary_content": 

140 logger.info("Handling complementary content request") 

141 return ( 

142 await self.intelligence_handler.handle_find_complementary_content( 

143 request_id, params 

144 ) 

145 ) 

146 elif method == "cluster_documents": 

147 logger.info("Handling document clustering request") 

148 return await self.intelligence_handler.handle_cluster_documents( 

149 request_id, params 

150 ) 

151 elif method == "tools/call": 

152 logger.info("Handling tools/call request") 

153 tool_name = params.get("name") 

154 if tool_name == "search": 

155 return await self.search_handler.handle_search( 

156 request_id, params.get("arguments", {}) 

157 ) 

158 elif tool_name == "hierarchy_search": 

159 return await self.search_handler.handle_hierarchy_search( 

160 request_id, params.get("arguments", {}) 

161 ) 

162 elif tool_name == "attachment_search": 

163 return await self.search_handler.handle_attachment_search( 

164 request_id, params.get("arguments", {}) 

165 ) 

166 # Cross-Document Intelligence Tools 

167 elif tool_name == "analyze_relationships": 

168 logger.info("🔍 DEBUG: analyze_relationships tool called!") 

169 logger.info( 

170 f"🔍 DEBUG: intelligence_handler exists: {self.intelligence_handler is not None}" 

171 ) 

172 return await self.intelligence_handler.handle_analyze_document_relationships( 

173 request_id, params.get("arguments", {}) 

174 ) 

175 elif tool_name == "find_similar_documents": 

176 return ( 

177 await self.intelligence_handler.handle_find_similar_documents( 

178 request_id, params.get("arguments", {}) 

179 ) 

180 ) 

181 elif tool_name == "detect_document_conflicts": 

182 return await self.intelligence_handler.handle_detect_document_conflicts( 

183 request_id, params.get("arguments", {}) 

184 ) 

185 elif tool_name == "find_complementary_content": 

186 return await self.intelligence_handler.handle_find_complementary_content( 

187 request_id, params.get("arguments", {}) 

188 ) 

189 elif tool_name == "cluster_documents": 

190 return await self.intelligence_handler.handle_cluster_documents( 

191 request_id, params.get("arguments", {}) 

192 ) 

193 elif tool_name == "expand_document": 

194 return await self.search_handler.handle_expand_document( 

195 request_id, params.get("arguments", {}) 

196 ) 

197 elif tool_name == "expand_cluster": 

198 return await self.intelligence_handler.handle_expand_cluster( 

199 request_id, params.get("arguments", {}) 

200 ) 

201 else: 

202 logger.warning("Unknown tool requested", tool_name=tool_name) 

203 return self.protocol.create_response( 

204 request_id, 

205 error={ 

206 "code": -32601, 

207 "message": "Method not found", 

208 "data": f"Tool '{tool_name}' not found", 

209 }, 

210 ) 

211 else: 

212 logger.warning("Unknown method requested", method=method) 

213 return self.protocol.create_response( 

214 request_id, 

215 error={ 

216 "code": -32601, 

217 "message": "Method not found", 

218 "data": f"Method '{method}' not found", 

219 }, 

220 ) 

221 except Exception as e: 

222 logger.error("Error handling request", exc_info=True) 

223 return self.protocol.create_response( 

224 request_id, 

225 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

226 ) 

227 

228 async def _handle_initialize( 

229 self, request_id: str | int | None, params: dict[str, Any] 

230 ) -> dict[str, Any]: 

231 """Handle initialize request. 

232 

233 Args: 

234 request_id: The ID of the request 

235 params: The parameters of the request 

236 

237 Returns: 

238 Dict[str, Any]: The response 

239 """ 

240 logger.debug("Initializing with params", params=params) 

241 return self.protocol.create_response( 

242 request_id, 

243 result={ 

244 "protocolVersion": "2025-06-18", 

245 "serverInfo": { 

246 "name": "Qdrant Loader MCP Server", 

247 "version": get_version(), 

248 }, 

249 "capabilities": {"tools": {"listChanged": False}}, 

250 }, 

251 ) 

252 

253 async def _handle_list_offerings( 

254 self, request_id: str | int | None, params: dict[str, Any], method: str 

255 ) -> dict[str, Any]: 

256 """Handle list offerings request. 

257 

258 Args: 

259 request_id: The ID of the request 

260 params: The parameters of the request 

261 method: The method name from the request 

262 

263 Returns: 

264 Dict[str, Any]: The response 

265 """ 

266 logger.debug("Listing offerings with params", params=params) 

267 

268 # Get all tool schemas from the schemas module 

269 all_tools = MCPSchemas.get_all_tool_schemas() 

270 

271 # If the method is tools/list, return the tools array with nextCursor 

272 if method == "tools/list": 

273 return self.protocol.create_response( 

274 request_id, 

275 result={ 

276 "tools": all_tools 

277 # Omit nextCursor when there are no more results 

278 }, 

279 ) 

280 

281 # Otherwise return the full offerings structure 

282 return self.protocol.create_response( 

283 request_id, 

284 result={ 

285 "offerings": [ 

286 { 

287 "id": "qdrant-loader", 

288 "name": "Qdrant Loader", 

289 "description": "Load data into Qdrant vector database", 

290 "version": "1.0.0", 

291 "tools": all_tools, 

292 "resources": [], 

293 "resourceTemplates": [], 

294 } 

295 ] 

296 }, 

297 )