Coverage for src/qdrant_loader_mcp_server/mcp/search_handler.py: 100%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1"""Search operations handler for MCP server.""" 

2 

3import inspect 

4from typing import Any 

5 

6from ..search.engine import SearchEngine 

7from ..search.processor import QueryProcessor 

8from ..utils import LoggingConfig 

9from .formatters import MCPFormatters 

10from .handlers.search import ( 

11 apply_attachment_filters, 

12 apply_hierarchy_filters, 

13 apply_lightweight_attachment_filters, 

14 format_lightweight_attachment_text, 

15 format_lightweight_hierarchy_text, 

16 organize_by_hierarchy, 

17) 

18from .protocol import MCPProtocol 

19 

20# Get logger for this module 

21logger = LoggingConfig.get_logger("src.mcp.search_handler") 

22 

23 

24class SearchHandler: 

25 """Handler for search-related operations.""" 

26 

27 def __init__( 

28 self, 

29 search_engine: SearchEngine, 

30 query_processor: QueryProcessor, 

31 protocol: MCPProtocol, 

32 ): 

33 """Initialize search handler.""" 

34 self.search_engine = search_engine 

35 self.query_processor = query_processor 

36 self.protocol = protocol 

37 self.formatters = MCPFormatters() 

38 

39 async def handle_search( 

40 self, request_id: str | int | None, params: dict[str, Any] 

41 ) -> dict[str, Any]: 

42 """Handle basic search request.""" 

43 logger.debug("Handling search request with params", params=params) 

44 

45 # Validate required parameters 

46 if "query" not in params: 

47 logger.error("Missing required parameter: query") 

48 return self.protocol.create_response( 

49 request_id, 

50 error={ 

51 "code": -32602, 

52 "message": "Invalid params", 

53 "data": "Missing required parameter: query", 

54 }, 

55 ) 

56 

57 # Extract parameters with defaults 

58 query = params["query"] 

59 source_types = params.get("source_types", []) 

60 project_ids = params.get("project_ids", []) 

61 limit = params.get("limit", 10) 

62 

63 logger.info( 

64 "Processing search request", 

65 query=query, 

66 source_types=source_types, 

67 project_ids=project_ids, 

68 limit=limit, 

69 ) 

70 

71 try: 

72 # Process the query 

73 logger.debug("Processing query with OpenAI") 

74 processed_query = await self.query_processor.process_query(query) 

75 logger.debug( 

76 "Query processed successfully", processed_query=processed_query 

77 ) 

78 

79 # Perform the search 

80 logger.debug("Executing search in Qdrant") 

81 results = await self.search_engine.search( 

82 query=processed_query["query"], 

83 source_types=source_types, 

84 project_ids=project_ids, 

85 limit=limit, 

86 ) 

87 logger.info( 

88 "Search completed successfully", 

89 result_count=len(results), 

90 first_result_score=results[0].score if results else None, 

91 ) 

92 

93 # Create structured results for MCP 2025-06-18 compliance 

94 structured_results = self.formatters.create_structured_search_results( 

95 results 

96 ) 

97 

98 # Keep existing text response for backward compatibility 

99 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join( 

100 self.formatters.format_search_result(result) for result in results 

101 ) 

102 

103 # Format the response with both text and structured content 

104 response = self.protocol.create_response( 

105 request_id, 

106 result={ 

107 "content": [ 

108 { 

109 "type": "text", 

110 "text": text_response, 

111 } 

112 ], 

113 "structuredContent": { 

114 "results": structured_results, 

115 "total_found": len(results), 

116 "query_context": { 

117 "original_query": query, 

118 "source_types_filtered": source_types, 

119 "project_ids_filtered": project_ids, 

120 }, 

121 }, 

122 "isError": False, 

123 }, 

124 ) 

125 logger.debug("Search response formatted successfully") 

126 return response 

127 

128 except Exception as e: 

129 logger.error("Error during search", exc_info=True) 

130 return self.protocol.create_response( 

131 request_id, 

132 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

133 ) 

134 

135 async def handle_hierarchy_search( 

136 self, request_id: str | int | None, params: dict[str, Any] 

137 ) -> dict[str, Any]: 

138 """Handle hierarchical search request for Confluence documents.""" 

139 logger.debug("Handling hierarchy search request with params", params=params) 

140 

141 # Validate required parameters 

142 if "query" not in params: 

143 logger.error("Missing required parameter: query") 

144 return self.protocol.create_response( 

145 request_id, 

146 error={ 

147 "code": -32602, 

148 "message": "Invalid params", 

149 "data": "Missing required parameter: query", 

150 }, 

151 ) 

152 

153 # Extract parameters with defaults 

154 query = params["query"] 

155 hierarchy_filter = params.get("hierarchy_filter", {}) 

156 organize_flag = params.get("organize_by_hierarchy", False) 

157 limit = params.get("limit", 10) 

158 

159 logger.info( 

160 "Processing hierarchy search request", 

161 query=query, 

162 hierarchy_filter=hierarchy_filter, 

163 organize_by_hierarchy=organize_by_hierarchy, 

164 limit=limit, 

165 ) 

166 

167 try: 

168 # Process the query 

169 logger.debug("Processing query with OpenAI") 

170 processed_query = await self.query_processor.process_query(query) 

171 logger.debug( 

172 "Query processed successfully", processed_query=processed_query 

173 ) 

174 

175 # Perform the search (All source types for hierarchy - localfiles have folder structure) 

176 logger.debug("Executing hierarchy search in Qdrant") 

177 results = await self.search_engine.search( 

178 query=processed_query["query"], 

179 source_types=[ 

180 "confluence", 

181 "localfile", 

182 ], # Include localfiles with folder structure 

183 limit=max( 

184 limit * 2, 40 

185 ), # Get enough results to filter for hierarchy navigation 

186 ) 

187 

188 # Apply hierarchy filters (support sync or async patched functions in tests) 

189 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter) 

190 filtered_results = ( 

191 await maybe_filtered 

192 if inspect.isawaitable(maybe_filtered) 

193 else maybe_filtered 

194 ) 

195 

196 # For hierarchy search, prioritize returning more documents for better hierarchy navigation 

197 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit) 

198 hierarchy_limit = max(limit, 20) 

199 filtered_results = filtered_results[:hierarchy_limit] 

200 

201 # Organize results if requested 

202 organized_results = None 

203 if organize_flag: 

204 organized_results = self._organize_by_hierarchy(filtered_results) 

205 response_text = format_lightweight_hierarchy_text( 

206 organized_results, len(filtered_results) 

207 ) 

208 else: 

209 response_text = format_lightweight_hierarchy_text( 

210 {}, len(filtered_results) 

211 ) 

212 

213 logger.info( 

214 "Hierarchy search completed successfully", 

215 result_count=len(filtered_results), 

216 first_result_score=( 

217 filtered_results[0].score if filtered_results else None 

218 ), 

219 ) 

220 

221 # Create structured content for MCP compliance 

222 structured_content = self.formatters.create_lightweight_hierarchy_results( 

223 filtered_results, organized_results or {}, query 

224 ) 

225 

226 # Format the response with both text and structured content 

227 response = self.protocol.create_response( 

228 request_id, 

229 result={ 

230 "content": [ 

231 { 

232 "type": "text", 

233 "text": response_text, 

234 } 

235 ], 

236 "structuredContent": structured_content, 

237 "isError": False, 

238 }, 

239 ) 

240 logger.debug("Hierarchy search response formatted successfully") 

241 return response 

242 

243 except Exception as e: 

244 logger.error("Error during hierarchy search", exc_info=True) 

245 return self.protocol.create_response( 

246 request_id, 

247 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

248 ) 

249 

250 async def handle_attachment_search( 

251 self, request_id: str | int | None, params: dict[str, Any] 

252 ) -> dict[str, Any]: 

253 """Handle attachment search request.""" 

254 logger.debug("Handling attachment search request with params", params=params) 

255 

256 # Validate required parameters 

257 if "query" not in params: 

258 logger.error("Missing required parameter: query") 

259 return self.protocol.create_response( 

260 request_id, 

261 error={ 

262 "code": -32602, 

263 "message": "Invalid params", 

264 "data": "Missing required parameter: query", 

265 }, 

266 ) 

267 

268 # Extract parameters with defaults 

269 query = params["query"] 

270 attachment_filter = params.get("attachment_filter", {}) 

271 include_parent_context = params.get("include_parent_context", True) 

272 limit = params.get("limit", 10) 

273 

274 logger.info( 

275 "Processing attachment search request", 

276 query=query, 

277 attachment_filter=attachment_filter, 

278 include_parent_context=include_parent_context, 

279 limit=limit, 

280 ) 

281 

282 try: 

283 # Process the query 

284 logger.debug("Processing query with OpenAI") 

285 processed_query = await self.query_processor.process_query(query) 

286 logger.debug( 

287 "Query processed successfully", processed_query=processed_query 

288 ) 

289 

290 # Perform the search 

291 logger.debug("Executing attachment search in Qdrant") 

292 results = await self.search_engine.search( 

293 query=processed_query["query"], 

294 source_types=None, # Search all sources for attachments 

295 limit=limit * 2, # Get more results to filter 

296 ) 

297 

298 # Apply lightweight attachment filters (NEW - supports multi-source) 

299 filtered_results = self._apply_lightweight_attachment_filters( 

300 results, attachment_filter 

301 ) 

302 

303 # Limit to reasonable number for performance (ensure good navigation) 

304 attachment_limit = max(limit, 15) # At least 15 for good navigation 

305 filtered_results = filtered_results[:attachment_limit] 

306 

307 logger.info( 

308 "Attachment search completed successfully", 

309 result_count=len(filtered_results), 

310 first_result_score=( 

311 filtered_results[0].score if filtered_results else None 

312 ), 

313 ) 

314 

315 # Create attachment groups for organized display 

316 organized_results = {} 

317 attachment_groups = [] 

318 if filtered_results: 

319 # Group attachments by type for better organization 

320 attachment_groups = self.formatters._organize_attachments_by_type( 

321 filtered_results 

322 ) 

323 for group in attachment_groups: 

324 group_results = group.get("results", []) 

325 organized_results[group["group_name"]] = group_results 

326 

327 # Create lightweight text response 

328 response_text = format_lightweight_attachment_text( 

329 organized_results, len(filtered_results) 

330 ) 

331 

332 # Create lightweight structured content for MCP compliance 

333 structured_content = self.formatters.create_lightweight_attachment_results( 

334 attachment_groups, query 

335 ) 

336 

337 response = self.protocol.create_response( 

338 request_id, 

339 result={ 

340 "content": [ 

341 { 

342 "type": "text", 

343 "text": response_text, 

344 } 

345 ], 

346 "structuredContent": structured_content, 

347 "isError": False, 

348 }, 

349 ) 

350 logger.debug("Attachment search response formatted successfully") 

351 return response 

352 

353 except Exception as e: 

354 logger.error("Error during attachment search", exc_info=True) 

355 return self.protocol.create_response( 

356 request_id, 

357 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

358 ) 

359 

360 # Back-compat thin wrappers for tests that patch private methods 

361 def _apply_hierarchy_filters(self, results, hierarchy_filter): 

362 return apply_hierarchy_filters(results, hierarchy_filter) 

363 

364 def _organize_by_hierarchy(self, results): 

365 return organize_by_hierarchy(results) 

366 

367 def _apply_attachment_filters(self, results, attachment_filter): 

368 return apply_attachment_filters(results, attachment_filter) 

369 

370 def _apply_lightweight_attachment_filters(self, results, attachment_filter): 

371 return apply_lightweight_attachment_filters( 

372 results, 

373 attachment_filter, 

374 file_type_extractor=self.formatters._extract_file_type_minimal, 

375 ) 

376 

377 def _format_lightweight_attachment_text(self, organized_results, total_found): 

378 return format_lightweight_attachment_text(organized_results, total_found) 

379 

380 def _format_lightweight_hierarchy_text(self, organized_results, total_found): 

381 return format_lightweight_hierarchy_text(organized_results, total_found) 

382 

383 async def handle_expand_document( 

384 self, request_id: str | int | None, params: dict[str, Any] 

385 ) -> dict[str, Any]: 

386 """Handle expand document request for lazy loading using standard search format.""" 

387 logger.debug("Handling expand document with params", params=params) 

388 

389 # Validate required parameter 

390 if ( 

391 "document_id" not in params 

392 or params["document_id"] is None 

393 or params["document_id"] == "" 

394 ): 

395 logger.error("Missing required parameter: document_id") 

396 return self.protocol.create_response( 

397 request_id, 

398 error={ 

399 "code": -32602, 

400 "message": "Invalid params", 

401 "data": "Missing required parameter: document_id", 

402 }, 

403 ) 

404 

405 document_id = params["document_id"] 

406 

407 try: 

408 logger.info(f"Expanding document with ID: {document_id}") 

409 

410 # Search for the document - field search doesn't guarantee exact matches 

411 # Try document_id field search first, but get more results to filter 

412 results = await self.search_engine.search( 

413 query=f"document_id:{document_id}", 

414 limit=10, # Get more results to ensure we find the exact match 

415 ) 

416 

417 # Filter for exact document_id matches 

418 exact_matches = [r for r in results if r.document_id == document_id] 

419 if exact_matches: 

420 results = exact_matches[:1] # Take only the first exact match 

421 else: 

422 # Fallback to general search if no exact match in field search 

423 results = await self.search_engine.search(query=document_id, limit=10) 

424 # Filter again for exact document_id matches 

425 exact_matches = [r for r in results if r.document_id == document_id] 

426 if exact_matches: 

427 results = exact_matches[:1] 

428 else: 

429 results = [] 

430 

431 if not results: 

432 logger.warning(f"Document not found with ID: {document_id}") 

433 return self.protocol.create_response( 

434 request_id, 

435 error={ 

436 "code": -32604, 

437 "message": "Document not found", 

438 "data": f"No document found with ID: {document_id}", 

439 }, 

440 ) 

441 

442 logger.info(f"Successfully found document: {results[0].source_title}") 

443 

444 # Use the existing search result formatting - exactly the same as standard search 

445 formatted_results = ( 

446 "Found 1 document:\n\n" 

447 + self.formatters.format_search_result(results[0]) 

448 ) 

449 structured_results_list = self.formatters.create_structured_search_results( 

450 results 

451 ) 

452 

453 # Create the same structure as standard search 

454 structured_results = { 

455 "results": structured_results_list, 

456 "total_found": len(results), 

457 "query_context": { 

458 "original_query": f"expand_document:{document_id}", 

459 "source_types_filtered": [], 

460 "project_ids_filtered": [], 

461 "is_document_expansion": True, 

462 }, 

463 } 

464 

465 return self.protocol.create_response( 

466 request_id, 

467 result={ 

468 "content": [ 

469 { 

470 "type": "text", 

471 "text": formatted_results, 

472 } 

473 ], 

474 "structuredContent": structured_results, 

475 "isError": False, 

476 }, 

477 ) 

478 

479 except Exception as e: 

480 logger.error("Error expanding document", exc_info=True) 

481 return self.protocol.create_response( 

482 request_id, 

483 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

484 )