Coverage for src / qdrant_loader_mcp_server / mcp / search_handler.py: 95%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1"""Search operations handler for MCP server.""" 

2 

3import asyncio 

4import inspect 

5from typing import Any 

6 

7from qdrant_client import models 

8 

9from qdrant_loader_mcp_server.config import QdrantConfig 

10from qdrant_loader_mcp_server.config_reranking import MCPReranking 

11 

12from ..search.engine import SearchEngine 

13from ..search.hybrid.components.reranking import HybridReranker 

14from ..search.processor import QueryProcessor 

15from ..utils import LoggingConfig 

16from .formatters import MCPFormatters 

17from .handlers.search import ( 

18 apply_attachment_filters, 

19 apply_hierarchy_filters, 

20 apply_lightweight_attachment_filters, 

21 format_lightweight_attachment_text, 

22 format_lightweight_hierarchy_text, 

23 organize_by_hierarchy, 

24) 

25from .protocol import MCPProtocol 

26 

27# Get logger for this module 

28logger = LoggingConfig.get_logger("src.mcp.search_handler") 

29 

30 

31class SearchHandler: 

32 """Handler for search-related operations.""" 

33 

34 def __init__( 

35 self, 

36 search_engine: SearchEngine, 

37 query_processor: QueryProcessor, 

38 protocol: MCPProtocol, 

39 reranking_config: MCPReranking | None = None, 

40 ): 

41 """Initialize search handler.""" 

42 self.search_engine = search_engine 

43 self.query_processor = query_processor 

44 self.protocol = protocol 

45 self.formatters = MCPFormatters() 

46 self.qdrant_config = QdrantConfig() 

47 self.reranker = None 

48 

49 if reranking_config is None: 

50 reranking_config = MCPReranking() 

51 

52 # always check after config is finalized 

53 if reranking_config.enabled: 

54 # If handler-level reranking is active, disable pipeline-level reranking 

55 # to avoid running the cross-encoder twice. 

56 if hasattr(search_engine, "hybrid_pipeline") and search_engine.hybrid_pipeline is not None: 

57 if hasattr(search_engine.hybrid_pipeline, "reranker"): 

58 search_engine.hybrid_pipeline.reranker = None 

59 

60 if hasattr(search_engine, "pipeline") and search_engine.pipeline is not None: 

61 if hasattr(search_engine.pipeline, "reranker"): 

62 search_engine.pipeline.reranker = None 

63 

64 if reranking_config.enabled: 

65 try: 

66 self.reranker = HybridReranker( 

67 enabled=reranking_config.enabled, 

68 model=reranking_config.model, 

69 device=reranking_config.device, 

70 batch_size=reranking_config.batch_size, 

71 ) 

72 except Exception as e: 

73 logger = LoggingConfig.get_logger(__name__) 

74 logger.warning( 

75 "Failed to initialize reranker, continuing without reranking", 

76 error=str(e), 

77 ) 

78 self.reranker = None 

79 

80 async def handle_search( 

81 self, request_id: str | int | None, params: dict[str, Any] 

82 ) -> dict[str, Any]: 

83 """ 

84 Handle a basic text search and return an MCP-formatted response. 

85 

86 Validates that `params` contains a required "query" key, processes the query via the QueryProcessor, 

87 executes a search with the SearchEngine using optional filters from `params`, and returns both a 

88 backward-compatible text block and a structured search result suitable for the MCP protocol. 

89 

90 Parameters: 

91 request_id (str | int | None): The incoming request identifier passed to the protocol response. 

92 params (dict): Search parameters. Required keys: 

93 - "query": the search query string. 

94 Optional keys: 

95 - "source_types" (list): list of source type filters (default: []). 

96 - "project_ids" (list): list of project id filters (default: []). 

97 - "limit" (int): maximum number of search results to request (default: 5). 

98 

99 Returns: 

100 dict: An MCP protocol response dictionary. On success the response contains a `result` 

101 with `content` (text block), `structuredContent` (results, total_found, query_context), 

102 and `isError: False`. On validation failure returns an error response with code -32602 

103 and a descriptive message; on internal failure returns an error response with code -32603 

104 and the exception string in `data`. 

105 """ 

106 logger.debug("Handling search request with params", params=params) 

107 

108 # Validate required parameters 

109 if "query" not in params: 

110 logger.error("Missing required parameter: query") 

111 return self.protocol.create_response( 

112 request_id, 

113 error={ 

114 "code": -32602, 

115 "message": "Invalid params", 

116 "data": "Missing required parameter: query", 

117 }, 

118 ) 

119 

120 # Extract parameters with defaults 

121 query = params["query"] 

122 source_types = params.get("source_types", []) 

123 project_ids = params.get("project_ids", []) 

124 limit = params.get("limit", 5) 

125 

126 logger.info( 

127 "Processing search request", 

128 query=query, 

129 source_types=source_types, 

130 project_ids=project_ids, 

131 limit=limit, 

132 ) 

133 

134 try: 

135 # Process the query 

136 logger.debug("Processing query with OpenAI") 

137 processed_query = await self.query_processor.process_query(query) 

138 logger.debug( 

139 "Query processed successfully", processed_query=processed_query 

140 ) 

141 

142 # Perform the search 

143 logger.debug("Executing search in Qdrant") 

144 results = await self.search_engine.search( 

145 query=processed_query["query"], 

146 source_types=source_types, 

147 project_ids=project_ids, 

148 limit=limit, 

149 ) 

150 

151 # Apply reranking if enabled 

152 

153 

154 if self.reranker: 

155 results = await asyncio.to_thread( 

156 self.reranker.rerank, 

157 query=query, 

158 results=results, 

159 top_k=limit, 

160 text_key="text", 

161 ) 

162 

163 logger.info( 

164 "Search completed successfully", 

165 result_count=len(results), 

166 first_result_score=results[0].score if results else None, 

167 ) 

168 

169 # Create structured results for MCP 2025-06-18 compliance 

170 structured_results = self.formatters.create_structured_search_results( 

171 results 

172 ) 

173 

174 # Keep existing text response for backward compatibility 

175 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join( 

176 self.formatters.format_search_result(result) for result in results 

177 ) 

178 

179 # Format the response with both text and structured content 

180 response = self.protocol.create_response( 

181 request_id, 

182 result={ 

183 "content": [ 

184 { 

185 "type": "text", 

186 "text": text_response, 

187 } 

188 ], 

189 "structuredContent": { 

190 "results": structured_results, 

191 "total_found": len(results), 

192 "query_context": { 

193 "original_query": query, 

194 "source_types_filtered": source_types, 

195 "project_ids_filtered": project_ids, 

196 }, 

197 }, 

198 "isError": False, 

199 }, 

200 ) 

201 logger.debug("Search response formatted successfully") 

202 return response 

203 

204 except Exception as e: 

205 logger.error("Error during search", exc_info=True) 

206 return self.protocol.create_response( 

207 request_id, 

208 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

209 ) 

210 

211 async def handle_hierarchy_search( 

212 self, request_id: str | int | None, params: dict[str, Any] 

213 ) -> dict[str, Any]: 

214 """Handle hierarchical search request for Confluence documents.""" 

215 logger.debug("Handling hierarchy search request with params", params=params) 

216 

217 # Validate required parameters 

218 if "query" not in params: 

219 logger.error("Missing required parameter: query") 

220 return self.protocol.create_response( 

221 request_id, 

222 error={ 

223 "code": -32602, 

224 "message": "Invalid params", 

225 "data": "Missing required parameter: query", 

226 }, 

227 ) 

228 

229 # Extract parameters with defaults 

230 query = params["query"] 

231 hierarchy_filter = params.get("hierarchy_filter", {}) 

232 organize_flag = params.get("organize_by_hierarchy", False) 

233 limit = params.get("limit", 10) 

234 

235 logger.info( 

236 "Processing hierarchy search request", 

237 query=query, 

238 hierarchy_filter=hierarchy_filter, 

239 organize_by_hierarchy=organize_by_hierarchy, 

240 limit=limit, 

241 ) 

242 

243 try: 

244 # Process the query 

245 logger.debug("Processing query with OpenAI") 

246 processed_query = await self.query_processor.process_query(query) 

247 logger.debug( 

248 "Query processed successfully", processed_query=processed_query 

249 ) 

250 

251 # Perform the search (All source types for hierarchy - localfiles have folder structure) 

252 logger.debug("Executing hierarchy search in Qdrant") 

253 results = await self.search_engine.search( 

254 query=processed_query["query"], 

255 source_types=[ 

256 "confluence", 

257 "localfile", 

258 ], # Include localfiles with folder structure 

259 limit=max( 

260 limit * 2, 40 

261 ), # Get enough results to filter for hierarchy navigation 

262 ) 

263 

264 # Apply hierarchy filters (support sync or async patched functions in tests) 

265 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter) 

266 filtered_results = ( 

267 await maybe_filtered 

268 if inspect.isawaitable(maybe_filtered) 

269 else maybe_filtered 

270 ) 

271 

272 # For hierarchy search, prioritize returning more documents for better hierarchy navigation 

273 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit) 

274 hierarchy_limit = max(limit, 20) 

275 filtered_results = filtered_results[:hierarchy_limit] 

276 

277 # Organize results if requested 

278 organized_results = None 

279 if organize_flag: 

280 organized_results = self._organize_by_hierarchy(filtered_results) 

281 response_text = format_lightweight_hierarchy_text( 

282 organized_results, len(filtered_results) 

283 ) 

284 else: 

285 response_text = format_lightweight_hierarchy_text( 

286 {}, len(filtered_results) 

287 ) 

288 

289 logger.info( 

290 "Hierarchy search completed successfully", 

291 result_count=len(filtered_results), 

292 first_result_score=( 

293 filtered_results[0].score if filtered_results else None 

294 ), 

295 ) 

296 

297 # Create structured content for MCP compliance 

298 structured_content = self.formatters.create_lightweight_hierarchy_results( 

299 filtered_results, organized_results or {}, query 

300 ) 

301 

302 # Format the response with both text and structured content 

303 response = self.protocol.create_response( 

304 request_id, 

305 result={ 

306 "content": [ 

307 { 

308 "type": "text", 

309 "text": response_text, 

310 } 

311 ], 

312 "structuredContent": structured_content, 

313 "isError": False, 

314 }, 

315 ) 

316 logger.debug("Hierarchy search response formatted successfully") 

317 return response 

318 

319 except Exception as e: 

320 logger.error("Error during hierarchy search", exc_info=True) 

321 return self.protocol.create_response( 

322 request_id, 

323 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

324 ) 

325 

326 async def handle_attachment_search( 

327 self, request_id: str | int | None, params: dict[str, Any] 

328 ) -> dict[str, Any]: 

329 """Handle attachment search request.""" 

330 logger.debug("Handling attachment search request with params", params=params) 

331 

332 # Validate required parameters 

333 if "query" not in params: 

334 logger.error("Missing required parameter: query") 

335 return self.protocol.create_response( 

336 request_id, 

337 error={ 

338 "code": -32602, 

339 "message": "Invalid params", 

340 "data": "Missing required parameter: query", 

341 }, 

342 ) 

343 

344 # Extract parameters with defaults 

345 query = params["query"] 

346 attachment_filter = params.get("attachment_filter", {}) 

347 include_parent_context = params.get("include_parent_context", True) 

348 limit = params.get("limit", 10) 

349 

350 logger.info( 

351 "Processing attachment search request", 

352 query=query, 

353 attachment_filter=attachment_filter, 

354 include_parent_context=include_parent_context, 

355 limit=limit, 

356 ) 

357 

358 try: 

359 # Process the query 

360 logger.debug("Processing query with OpenAI") 

361 processed_query = await self.query_processor.process_query(query) 

362 logger.debug( 

363 "Query processed successfully", processed_query=processed_query 

364 ) 

365 

366 # Perform the search 

367 logger.debug("Executing attachment search in Qdrant") 

368 results = await self.search_engine.search( 

369 query=processed_query["query"], 

370 source_types=None, # Search all sources for attachments 

371 limit=limit * 2, # Get more results to filter 

372 ) 

373 

374 # Apply lightweight attachment filters (NEW - supports multi-source) 

375 filtered_results = self._apply_lightweight_attachment_filters( 

376 results, attachment_filter 

377 ) 

378 

379 # Limit to reasonable number for performance (ensure good navigation) 

380 attachment_limit = max(limit, 15) # At least 15 for good navigation 

381 filtered_results = filtered_results[:attachment_limit] 

382 

383 logger.info( 

384 "Attachment search completed successfully", 

385 result_count=len(filtered_results), 

386 first_result_score=( 

387 filtered_results[0].score if filtered_results else None 

388 ), 

389 ) 

390 

391 # Create attachment groups for organized display 

392 organized_results = {} 

393 attachment_groups = [] 

394 if filtered_results: 

395 # Group attachments by type for better organization 

396 attachment_groups = self.formatters._organize_attachments_by_type( 

397 filtered_results 

398 ) 

399 for group in attachment_groups: 

400 group_results = group.get("results", []) 

401 organized_results[group["group_name"]] = group_results 

402 

403 # Create lightweight text response 

404 response_text = format_lightweight_attachment_text( 

405 organized_results, len(filtered_results) 

406 ) 

407 

408 # Create lightweight structured content for MCP compliance 

409 structured_content = self.formatters.create_lightweight_attachment_results( 

410 attachment_groups, query 

411 ) 

412 

413 response = self.protocol.create_response( 

414 request_id, 

415 result={ 

416 "content": [ 

417 { 

418 "type": "text", 

419 "text": response_text, 

420 } 

421 ], 

422 "structuredContent": structured_content, 

423 "isError": False, 

424 }, 

425 ) 

426 logger.debug("Attachment search response formatted successfully") 

427 return response 

428 

429 except Exception as e: 

430 logger.error("Error during attachment search", exc_info=True) 

431 return self.protocol.create_response( 

432 request_id, 

433 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

434 ) 

435 

436 # Back-compat thin wrappers for tests that patch private methods 

437 def _apply_hierarchy_filters(self, results, hierarchy_filter): 

438 return apply_hierarchy_filters(results, hierarchy_filter) 

439 

440 def _organize_by_hierarchy(self, results): 

441 return organize_by_hierarchy(results) 

442 

443 def _apply_attachment_filters(self, results, attachment_filter): 

444 return apply_attachment_filters(results, attachment_filter) 

445 

446 def _apply_lightweight_attachment_filters(self, results, attachment_filter): 

447 return apply_lightweight_attachment_filters( 

448 results, 

449 attachment_filter, 

450 file_type_extractor=self.formatters._extract_file_type_minimal, 

451 ) 

452 

453 def _format_lightweight_attachment_text(self, organized_results, total_found): 

454 return format_lightweight_attachment_text(organized_results, total_found) 

455 

456 def _format_lightweight_hierarchy_text(self, organized_results, total_found): 

457 return format_lightweight_hierarchy_text(organized_results, total_found) 

458 

459 async def handle_expand_document( 

460 self, request_id: str | int | None, params: dict[str, Any] 

461 ) -> dict[str, Any]: 

462 """Return all chunks belonging to a document_id.""" 

463 logger.debug("Handling expand_document", params=params) 

464 

465 # Validate required parameter 

466 if ( 

467 "document_id" not in params 

468 or params["document_id"] is None 

469 or params["document_id"] == "" 

470 ): 

471 logger.error("Missing required parameter: document_id") 

472 return self.protocol.create_response( 

473 request_id, 

474 error={ 

475 "code": -32602, 

476 "message": "Invalid params", 

477 "data": "Missing required parameter: document_id", 

478 }, 

479 ) 

480 

481 document_id = params["document_id"] 

482 

483 try: 

484 logger.info(f"Fetching chunks for document_id={document_id}") 

485 

486 # Create Qdrant filter 

487 query_filter = models.Filter( 

488 must=[ 

489 models.FieldCondition( 

490 key="document_id", 

491 match=models.MatchValue(value=document_id), 

492 ) 

493 ] 

494 ) 

495 

496 all_points = [] 

497 next_offset = None 

498 truncated = False 

499 

500 collection_name = self.query_processor.collection_name 

501 MAX_CHUNKS = 500 # Reasonable upper bound 

502 # Scroll to retrieve all chunks 

503 while True: 

504 remaining = MAX_CHUNKS - len(all_points) 

505 if remaining <= 0: 

506 truncated = next_offset is not None 

507 break 

508 points, next_offset = await self.search_engine.client.scroll( 

509 collection_name=collection_name, 

510 scroll_filter=query_filter, 

511 limit=min(100, remaining), 

512 offset=next_offset, 

513 with_payload=True, 

514 with_vectors=False, 

515 ) 

516 

517 all_points.extend(points) 

518 

519 if next_offset is None: 

520 break 

521 if len(all_points) >= MAX_CHUNKS: 

522 truncated = True 

523 break 

524 if not all_points: 

525 logger.warning(f"No chunks found for document_id={document_id}") 

526 return self.protocol.create_response( 

527 request_id, 

528 error={ 

529 "code": -32001, 

530 "message": "Document not found", 

531 "data": f"No chunks found for document_id: {document_id}", 

532 }, 

533 ) 

534 

535 logger.info(f"Retrieved {len(all_points)} chunks") 

536 

537 # Extract chunk payloads 

538 def _chunk_sort_key(point): 

539 payload = point.payload or {} 

540 metadata = payload.get("metadata") or {} 

541 idx = metadata.get("chunk_index", payload.get("chunk_index")) 

542 if isinstance(idx, int): 

543 return (0, idx, str(point.id)) 

544 return (1, 0, str(point.id)) 

545 

546 all_points.sort(key=_chunk_sort_key) 

547 chunks = [p.payload for p in all_points] 

548 

549 structured_results = { 

550 "document_id": document_id, 

551 "total_chunks": len(chunks), 

552 "chunks": chunks, 

553 "truncated": truncated, 

554 "query_context": { 

555 "original_query": f"expand_document:{document_id}", 

556 "is_document_expansion": True, 

557 }, 

558 } 

559 

560 return self.protocol.create_response( 

561 request_id, 

562 result={ 

563 "content": [ 

564 { 

565 "type": "text", 

566 "text": f"Retrieved {len(chunks)} chunks for document {document_id}", 

567 } 

568 ], 

569 "structuredContent": structured_results, 

570 "isError": False, 

571 }, 

572 ) 

573 

574 except Exception as e: 

575 logger.error("Error expanding document", exc_info=True) 

576 

577 return self.protocol.create_response( 

578 request_id, 

579 error={ 

580 "code": -32603, 

581 "message": "Internal error", 

582 "data": str(e), 

583 }, 

584 )