Coverage for src / qdrant_loader_mcp_server / mcp / search_handler.py: 95%
189 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""Search operations handler for MCP server."""
3import asyncio
4import inspect
5from typing import Any
7from qdrant_client import models
9from qdrant_loader_mcp_server.config import QdrantConfig
10from qdrant_loader_mcp_server.config_reranking import MCPReranking
12from ..search.engine import SearchEngine
13from ..search.hybrid.components.reranking import HybridReranker
14from ..search.processor import QueryProcessor
15from ..utils import LoggingConfig
16from .formatters import MCPFormatters
17from .handlers.search import (
18 apply_attachment_filters,
19 apply_hierarchy_filters,
20 apply_lightweight_attachment_filters,
21 format_lightweight_attachment_text,
22 format_lightweight_hierarchy_text,
23 organize_by_hierarchy,
24)
25from .protocol import MCPProtocol
27# Get logger for this module
28logger = LoggingConfig.get_logger("src.mcp.search_handler")
31class SearchHandler:
32 """Handler for search-related operations."""
34 def __init__(
35 self,
36 search_engine: SearchEngine,
37 query_processor: QueryProcessor,
38 protocol: MCPProtocol,
39 reranking_config: MCPReranking | None = None,
40 ):
41 """Initialize search handler."""
42 self.search_engine = search_engine
43 self.query_processor = query_processor
44 self.protocol = protocol
45 self.formatters = MCPFormatters()
46 self.qdrant_config = QdrantConfig()
47 self.reranker = None
49 if reranking_config is None:
50 reranking_config = MCPReranking()
52 # always check after config is finalized
53 if reranking_config.enabled:
54 # If handler-level reranking is active, disable pipeline-level reranking
55 # to avoid running the cross-encoder twice.
56 if hasattr(search_engine, "hybrid_pipeline") and search_engine.hybrid_pipeline is not None:
57 if hasattr(search_engine.hybrid_pipeline, "reranker"):
58 search_engine.hybrid_pipeline.reranker = None
60 if hasattr(search_engine, "pipeline") and search_engine.pipeline is not None:
61 if hasattr(search_engine.pipeline, "reranker"):
62 search_engine.pipeline.reranker = None
64 if reranking_config.enabled:
65 try:
66 self.reranker = HybridReranker(
67 enabled=reranking_config.enabled,
68 model=reranking_config.model,
69 device=reranking_config.device,
70 batch_size=reranking_config.batch_size,
71 )
72 except Exception as e:
73 logger = LoggingConfig.get_logger(__name__)
74 logger.warning(
75 "Failed to initialize reranker, continuing without reranking",
76 error=str(e),
77 )
78 self.reranker = None
80 async def handle_search(
81 self, request_id: str | int | None, params: dict[str, Any]
82 ) -> dict[str, Any]:
83 """
84 Handle a basic text search and return an MCP-formatted response.
86 Validates that `params` contains a required "query" key, processes the query via the QueryProcessor,
87 executes a search with the SearchEngine using optional filters from `params`, and returns both a
88 backward-compatible text block and a structured search result suitable for the MCP protocol.
90 Parameters:
91 request_id (str | int | None): The incoming request identifier passed to the protocol response.
92 params (dict): Search parameters. Required keys:
93 - "query": the search query string.
94 Optional keys:
95 - "source_types" (list): list of source type filters (default: []).
96 - "project_ids" (list): list of project id filters (default: []).
97 - "limit" (int): maximum number of search results to request (default: 5).
99 Returns:
100 dict: An MCP protocol response dictionary. On success the response contains a `result`
101 with `content` (text block), `structuredContent` (results, total_found, query_context),
102 and `isError: False`. On validation failure returns an error response with code -32602
103 and a descriptive message; on internal failure returns an error response with code -32603
104 and the exception string in `data`.
105 """
106 logger.debug("Handling search request with params", params=params)
108 # Validate required parameters
109 if "query" not in params:
110 logger.error("Missing required parameter: query")
111 return self.protocol.create_response(
112 request_id,
113 error={
114 "code": -32602,
115 "message": "Invalid params",
116 "data": "Missing required parameter: query",
117 },
118 )
120 # Extract parameters with defaults
121 query = params["query"]
122 source_types = params.get("source_types", [])
123 project_ids = params.get("project_ids", [])
124 limit = params.get("limit", 5)
126 logger.info(
127 "Processing search request",
128 query=query,
129 source_types=source_types,
130 project_ids=project_ids,
131 limit=limit,
132 )
134 try:
135 # Process the query
136 logger.debug("Processing query with OpenAI")
137 processed_query = await self.query_processor.process_query(query)
138 logger.debug(
139 "Query processed successfully", processed_query=processed_query
140 )
142 # Perform the search
143 logger.debug("Executing search in Qdrant")
144 results = await self.search_engine.search(
145 query=processed_query["query"],
146 source_types=source_types,
147 project_ids=project_ids,
148 limit=limit,
149 )
151 # Apply reranking if enabled
154 if self.reranker:
155 results = await asyncio.to_thread(
156 self.reranker.rerank,
157 query=query,
158 results=results,
159 top_k=limit,
160 text_key="text",
161 )
163 logger.info(
164 "Search completed successfully",
165 result_count=len(results),
166 first_result_score=results[0].score if results else None,
167 )
169 # Create structured results for MCP 2025-06-18 compliance
170 structured_results = self.formatters.create_structured_search_results(
171 results
172 )
174 # Keep existing text response for backward compatibility
175 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join(
176 self.formatters.format_search_result(result) for result in results
177 )
179 # Format the response with both text and structured content
180 response = self.protocol.create_response(
181 request_id,
182 result={
183 "content": [
184 {
185 "type": "text",
186 "text": text_response,
187 }
188 ],
189 "structuredContent": {
190 "results": structured_results,
191 "total_found": len(results),
192 "query_context": {
193 "original_query": query,
194 "source_types_filtered": source_types,
195 "project_ids_filtered": project_ids,
196 },
197 },
198 "isError": False,
199 },
200 )
201 logger.debug("Search response formatted successfully")
202 return response
204 except Exception as e:
205 logger.error("Error during search", exc_info=True)
206 return self.protocol.create_response(
207 request_id,
208 error={"code": -32603, "message": "Internal error", "data": str(e)},
209 )
211 async def handle_hierarchy_search(
212 self, request_id: str | int | None, params: dict[str, Any]
213 ) -> dict[str, Any]:
214 """Handle hierarchical search request for Confluence documents."""
215 logger.debug("Handling hierarchy search request with params", params=params)
217 # Validate required parameters
218 if "query" not in params:
219 logger.error("Missing required parameter: query")
220 return self.protocol.create_response(
221 request_id,
222 error={
223 "code": -32602,
224 "message": "Invalid params",
225 "data": "Missing required parameter: query",
226 },
227 )
229 # Extract parameters with defaults
230 query = params["query"]
231 hierarchy_filter = params.get("hierarchy_filter", {})
232 organize_flag = params.get("organize_by_hierarchy", False)
233 limit = params.get("limit", 10)
235 logger.info(
236 "Processing hierarchy search request",
237 query=query,
238 hierarchy_filter=hierarchy_filter,
239 organize_by_hierarchy=organize_by_hierarchy,
240 limit=limit,
241 )
243 try:
244 # Process the query
245 logger.debug("Processing query with OpenAI")
246 processed_query = await self.query_processor.process_query(query)
247 logger.debug(
248 "Query processed successfully", processed_query=processed_query
249 )
251 # Perform the search (All source types for hierarchy - localfiles have folder structure)
252 logger.debug("Executing hierarchy search in Qdrant")
253 results = await self.search_engine.search(
254 query=processed_query["query"],
255 source_types=[
256 "confluence",
257 "localfile",
258 ], # Include localfiles with folder structure
259 limit=max(
260 limit * 2, 40
261 ), # Get enough results to filter for hierarchy navigation
262 )
264 # Apply hierarchy filters (support sync or async patched functions in tests)
265 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter)
266 filtered_results = (
267 await maybe_filtered
268 if inspect.isawaitable(maybe_filtered)
269 else maybe_filtered
270 )
272 # For hierarchy search, prioritize returning more documents for better hierarchy navigation
273 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit)
274 hierarchy_limit = max(limit, 20)
275 filtered_results = filtered_results[:hierarchy_limit]
277 # Organize results if requested
278 organized_results = None
279 if organize_flag:
280 organized_results = self._organize_by_hierarchy(filtered_results)
281 response_text = format_lightweight_hierarchy_text(
282 organized_results, len(filtered_results)
283 )
284 else:
285 response_text = format_lightweight_hierarchy_text(
286 {}, len(filtered_results)
287 )
289 logger.info(
290 "Hierarchy search completed successfully",
291 result_count=len(filtered_results),
292 first_result_score=(
293 filtered_results[0].score if filtered_results else None
294 ),
295 )
297 # Create structured content for MCP compliance
298 structured_content = self.formatters.create_lightweight_hierarchy_results(
299 filtered_results, organized_results or {}, query
300 )
302 # Format the response with both text and structured content
303 response = self.protocol.create_response(
304 request_id,
305 result={
306 "content": [
307 {
308 "type": "text",
309 "text": response_text,
310 }
311 ],
312 "structuredContent": structured_content,
313 "isError": False,
314 },
315 )
316 logger.debug("Hierarchy search response formatted successfully")
317 return response
319 except Exception as e:
320 logger.error("Error during hierarchy search", exc_info=True)
321 return self.protocol.create_response(
322 request_id,
323 error={"code": -32603, "message": "Internal error", "data": str(e)},
324 )
326 async def handle_attachment_search(
327 self, request_id: str | int | None, params: dict[str, Any]
328 ) -> dict[str, Any]:
329 """Handle attachment search request."""
330 logger.debug("Handling attachment search request with params", params=params)
332 # Validate required parameters
333 if "query" not in params:
334 logger.error("Missing required parameter: query")
335 return self.protocol.create_response(
336 request_id,
337 error={
338 "code": -32602,
339 "message": "Invalid params",
340 "data": "Missing required parameter: query",
341 },
342 )
344 # Extract parameters with defaults
345 query = params["query"]
346 attachment_filter = params.get("attachment_filter", {})
347 include_parent_context = params.get("include_parent_context", True)
348 limit = params.get("limit", 10)
350 logger.info(
351 "Processing attachment search request",
352 query=query,
353 attachment_filter=attachment_filter,
354 include_parent_context=include_parent_context,
355 limit=limit,
356 )
358 try:
359 # Process the query
360 logger.debug("Processing query with OpenAI")
361 processed_query = await self.query_processor.process_query(query)
362 logger.debug(
363 "Query processed successfully", processed_query=processed_query
364 )
366 # Perform the search
367 logger.debug("Executing attachment search in Qdrant")
368 results = await self.search_engine.search(
369 query=processed_query["query"],
370 source_types=None, # Search all sources for attachments
371 limit=limit * 2, # Get more results to filter
372 )
374 # Apply lightweight attachment filters (NEW - supports multi-source)
375 filtered_results = self._apply_lightweight_attachment_filters(
376 results, attachment_filter
377 )
379 # Limit to reasonable number for performance (ensure good navigation)
380 attachment_limit = max(limit, 15) # At least 15 for good navigation
381 filtered_results = filtered_results[:attachment_limit]
383 logger.info(
384 "Attachment search completed successfully",
385 result_count=len(filtered_results),
386 first_result_score=(
387 filtered_results[0].score if filtered_results else None
388 ),
389 )
391 # Create attachment groups for organized display
392 organized_results = {}
393 attachment_groups = []
394 if filtered_results:
395 # Group attachments by type for better organization
396 attachment_groups = self.formatters._organize_attachments_by_type(
397 filtered_results
398 )
399 for group in attachment_groups:
400 group_results = group.get("results", [])
401 organized_results[group["group_name"]] = group_results
403 # Create lightweight text response
404 response_text = format_lightweight_attachment_text(
405 organized_results, len(filtered_results)
406 )
408 # Create lightweight structured content for MCP compliance
409 structured_content = self.formatters.create_lightweight_attachment_results(
410 attachment_groups, query
411 )
413 response = self.protocol.create_response(
414 request_id,
415 result={
416 "content": [
417 {
418 "type": "text",
419 "text": response_text,
420 }
421 ],
422 "structuredContent": structured_content,
423 "isError": False,
424 },
425 )
426 logger.debug("Attachment search response formatted successfully")
427 return response
429 except Exception as e:
430 logger.error("Error during attachment search", exc_info=True)
431 return self.protocol.create_response(
432 request_id,
433 error={"code": -32603, "message": "Internal error", "data": str(e)},
434 )
436 # Back-compat thin wrappers for tests that patch private methods
437 def _apply_hierarchy_filters(self, results, hierarchy_filter):
438 return apply_hierarchy_filters(results, hierarchy_filter)
440 def _organize_by_hierarchy(self, results):
441 return organize_by_hierarchy(results)
443 def _apply_attachment_filters(self, results, attachment_filter):
444 return apply_attachment_filters(results, attachment_filter)
446 def _apply_lightweight_attachment_filters(self, results, attachment_filter):
447 return apply_lightweight_attachment_filters(
448 results,
449 attachment_filter,
450 file_type_extractor=self.formatters._extract_file_type_minimal,
451 )
453 def _format_lightweight_attachment_text(self, organized_results, total_found):
454 return format_lightweight_attachment_text(organized_results, total_found)
456 def _format_lightweight_hierarchy_text(self, organized_results, total_found):
457 return format_lightweight_hierarchy_text(organized_results, total_found)
459 async def handle_expand_document(
460 self, request_id: str | int | None, params: dict[str, Any]
461 ) -> dict[str, Any]:
462 """Return all chunks belonging to a document_id."""
463 logger.debug("Handling expand_document", params=params)
465 # Validate required parameter
466 if (
467 "document_id" not in params
468 or params["document_id"] is None
469 or params["document_id"] == ""
470 ):
471 logger.error("Missing required parameter: document_id")
472 return self.protocol.create_response(
473 request_id,
474 error={
475 "code": -32602,
476 "message": "Invalid params",
477 "data": "Missing required parameter: document_id",
478 },
479 )
481 document_id = params["document_id"]
483 try:
484 logger.info(f"Fetching chunks for document_id={document_id}")
486 # Create Qdrant filter
487 query_filter = models.Filter(
488 must=[
489 models.FieldCondition(
490 key="document_id",
491 match=models.MatchValue(value=document_id),
492 )
493 ]
494 )
496 all_points = []
497 next_offset = None
498 truncated = False
500 collection_name = self.query_processor.collection_name
501 MAX_CHUNKS = 500 # Reasonable upper bound
502 # Scroll to retrieve all chunks
503 while True:
504 remaining = MAX_CHUNKS - len(all_points)
505 if remaining <= 0:
506 truncated = next_offset is not None
507 break
508 points, next_offset = await self.search_engine.client.scroll(
509 collection_name=collection_name,
510 scroll_filter=query_filter,
511 limit=min(100, remaining),
512 offset=next_offset,
513 with_payload=True,
514 with_vectors=False,
515 )
517 all_points.extend(points)
519 if next_offset is None:
520 break
521 if len(all_points) >= MAX_CHUNKS:
522 truncated = True
523 break
524 if not all_points:
525 logger.warning(f"No chunks found for document_id={document_id}")
526 return self.protocol.create_response(
527 request_id,
528 error={
529 "code": -32001,
530 "message": "Document not found",
531 "data": f"No chunks found for document_id: {document_id}",
532 },
533 )
535 logger.info(f"Retrieved {len(all_points)} chunks")
537 # Extract chunk payloads
538 def _chunk_sort_key(point):
539 payload = point.payload or {}
540 metadata = payload.get("metadata") or {}
541 idx = metadata.get("chunk_index", payload.get("chunk_index"))
542 if isinstance(idx, int):
543 return (0, idx, str(point.id))
544 return (1, 0, str(point.id))
546 all_points.sort(key=_chunk_sort_key)
547 chunks = [p.payload for p in all_points]
549 structured_results = {
550 "document_id": document_id,
551 "total_chunks": len(chunks),
552 "chunks": chunks,
553 "truncated": truncated,
554 "query_context": {
555 "original_query": f"expand_document:{document_id}",
556 "is_document_expansion": True,
557 },
558 }
560 return self.protocol.create_response(
561 request_id,
562 result={
563 "content": [
564 {
565 "type": "text",
566 "text": f"Retrieved {len(chunks)} chunks for document {document_id}",
567 }
568 ],
569 "structuredContent": structured_results,
570 "isError": False,
571 },
572 )
574 except Exception as e:
575 logger.error("Error expanding document", exc_info=True)
577 return self.protocol.create_response(
578 request_id,
579 error={
580 "code": -32603,
581 "message": "Internal error",
582 "data": str(e),
583 },
584 )