Coverage for src / qdrant_loader_mcp_server / mcp / search_handler.py: 94%
241 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
1"""Search operations handler for MCP server."""
3import asyncio
4import inspect
5from typing import Any
7from qdrant_client import models
9from qdrant_loader_mcp_server.config_reranking import MCPReranking
11from ..search.engine import SearchEngine
12from ..search.hybrid.components.reranking import HybridReranker
13from ..search.processor import QueryProcessor
14from ..utils import LoggingConfig
15from .formatters import MCPFormatters
16from .handlers.search import (
17 apply_attachment_filters,
18 apply_hierarchy_filters,
19 apply_lightweight_attachment_filters,
20 format_lightweight_attachment_text,
21 format_lightweight_hierarchy_text,
22 organize_by_hierarchy,
23)
24from .protocol import MCPProtocol
26# Get logger for this module
27logger = LoggingConfig.get_logger("src.mcp.search_handler")
30class SearchHandler:
31 """Handler for search-related operations."""
33 def __init__(
34 self,
35 search_engine: SearchEngine,
36 query_processor: QueryProcessor,
37 protocol: MCPProtocol,
38 reranking_config: MCPReranking | None = None,
39 ):
40 """Initialize search handler."""
41 self.search_engine = search_engine
42 self.query_processor = query_processor
43 self.protocol = protocol
44 self.formatters = MCPFormatters()
45 self.reranker = None
47 if reranking_config is None:
48 reranking_config = MCPReranking()
50 # always check after config is finalized
51 if reranking_config.enabled:
52 # If handler-level reranking is active, disable pipeline-level reranking
53 # to avoid running the cross-encoder twice.
54 if (
55 hasattr(search_engine, "hybrid_pipeline")
56 and search_engine.hybrid_pipeline is not None
57 ):
58 if hasattr(search_engine.hybrid_pipeline, "reranker"):
59 search_engine.hybrid_pipeline.reranker = None
61 if (
62 hasattr(search_engine, "pipeline")
63 and search_engine.pipeline is not None
64 ):
65 if hasattr(search_engine.pipeline, "reranker"):
66 search_engine.pipeline.reranker = None
68 if reranking_config.enabled:
69 try:
70 self.reranker = HybridReranker(
71 enabled=reranking_config.enabled,
72 model=reranking_config.model,
73 device=reranking_config.device,
74 batch_size=reranking_config.batch_size,
75 )
76 except Exception as e:
77 logger = LoggingConfig.get_logger(__name__)
78 logger.warning(
79 "Failed to initialize reranker, continuing without reranking",
80 error=str(e),
81 )
82 self.reranker = None
84 async def handle_search(
85 self, request_id: str | int | None, params: dict[str, Any]
86 ) -> dict[str, Any]:
87 """
88 Handle a basic text search and return an MCP-formatted response.
90 Validates that `params` contains a required "query" key, processes the query via the QueryProcessor,
91 executes a search with the SearchEngine using optional filters from `params`, and returns both a
92 backward-compatible text block and a structured search result suitable for the MCP protocol.
94 Parameters:
95 request_id (str | int | None): The incoming request identifier passed to the protocol response.
96 params (dict): Search parameters. Required keys:
97 - "query": the search query string.
98 Optional keys:
99 - "source_types" (list): list of source type filters (default: []).
100 - "project_ids" (list): list of project id filters (default: []).
101 - "limit" (int): maximum number of search results to request (default: 5).
103 Returns:
104 dict: An MCP protocol response dictionary. On success the response contains a `result`
105 with `content` (text block), `structuredContent` (results, total_found, query_context),
106 and `isError: False`. On validation failure returns an error response with code -32602
107 and a descriptive message; on internal failure returns an error response with code -32603
108 and the exception string in `data`.
109 """
110 logger.debug("Handling search request with params", params=params)
112 # Validate required parameters
113 if "query" not in params:
114 logger.error("Missing required parameter: query")
115 return self.protocol.create_response(
116 request_id,
117 error={
118 "code": -32602,
119 "message": "Invalid params",
120 "data": "Missing required parameter: query",
121 },
122 )
124 # Extract parameters with defaults
125 query = params["query"]
126 source_types = params.get("source_types", [])
127 project_ids = params.get("project_ids", [])
128 limit = params.get("limit", 5)
130 logger.info(
131 "Processing search request",
132 query=query,
133 source_types=source_types,
134 project_ids=project_ids,
135 limit=limit,
136 )
138 try:
139 # Process the query
140 logger.debug("Processing query with OpenAI")
141 processed_query = await self.query_processor.process_query(query)
142 logger.debug(
143 "Query processed successfully", processed_query=processed_query
144 )
146 # Perform the search
147 logger.debug("Executing search in Qdrant")
148 results = await self.search_engine.search(
149 query=processed_query["query"],
150 source_types=source_types,
151 project_ids=project_ids,
152 limit=limit,
153 )
155 # Apply reranking if enabled
157 if self.reranker:
158 results = await asyncio.to_thread(
159 self.reranker.rerank,
160 query=query,
161 results=results,
162 top_k=limit,
163 text_key="text",
164 )
166 logger.info(
167 "Search completed successfully",
168 result_count=len(results),
169 first_result_score=results[0].score if results else None,
170 )
172 # Create structured results for MCP 2025-06-18 compliance
173 structured_results = self.formatters.create_structured_search_results(
174 results
175 )
177 # Keep existing text response for backward compatibility
178 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join(
179 self.formatters.format_search_result(result) for result in results
180 )
182 # Format the response with both text and structured content
183 response = self.protocol.create_response(
184 request_id,
185 result={
186 "content": [
187 {
188 "type": "text",
189 "text": text_response,
190 }
191 ],
192 "structuredContent": {
193 "results": structured_results,
194 "total_found": len(results),
195 "query_context": {
196 "original_query": query,
197 "source_types_filtered": source_types,
198 "project_ids_filtered": project_ids,
199 },
200 },
201 "isError": False,
202 },
203 )
204 logger.debug("Search response formatted successfully")
205 return response
207 except Exception as e:
208 logger.error("Error during search", exc_info=True)
209 return self.protocol.create_response(
210 request_id,
211 error={"code": -32603, "message": "Internal error", "data": str(e)},
212 )
214 async def handle_hierarchy_search(
215 self, request_id: str | int | None, params: dict[str, Any]
216 ) -> dict[str, Any]:
217 """Handle hierarchical search request for Confluence documents."""
218 logger.debug("Handling hierarchy search request with params", params=params)
220 # Validate required parameters
221 if "query" not in params:
222 logger.error("Missing required parameter: query")
223 return self.protocol.create_response(
224 request_id,
225 error={
226 "code": -32602,
227 "message": "Invalid params",
228 "data": "Missing required parameter: query",
229 },
230 )
232 # Extract parameters with defaults
233 query = params["query"]
234 hierarchy_filter = params.get("hierarchy_filter", {})
235 organize_flag = params.get("organize_by_hierarchy", False)
236 limit = params.get("limit", 10)
238 logger.info(
239 "Processing hierarchy search request",
240 query=query,
241 hierarchy_filter=hierarchy_filter,
242 organize_by_hierarchy=organize_by_hierarchy,
243 limit=limit,
244 )
246 try:
247 # Process the query
248 logger.debug("Processing query with OpenAI")
249 processed_query = await self.query_processor.process_query(query)
250 logger.debug(
251 "Query processed successfully", processed_query=processed_query
252 )
254 # Perform the search (All source types for hierarchy - localfiles have folder structure)
255 logger.debug("Executing hierarchy search in Qdrant")
256 results = await self.search_engine.search(
257 query=processed_query["query"],
258 source_types=[
259 "confluence",
260 "localfile",
261 ], # Include localfiles with folder structure
262 limit=max(
263 limit * 2, 40
264 ), # Get enough results to filter for hierarchy navigation
265 )
267 # Apply hierarchy filters (support sync or async patched functions in tests)
268 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter)
269 filtered_results = (
270 await maybe_filtered
271 if inspect.isawaitable(maybe_filtered)
272 else maybe_filtered
273 )
275 # For hierarchy search, prioritize returning more documents for better hierarchy navigation
276 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit)
277 hierarchy_limit = max(limit, 20)
278 filtered_results = filtered_results[:hierarchy_limit]
280 # Organize results if requested
281 organized_results = None
282 if organize_flag:
283 organized_results = self._organize_by_hierarchy(filtered_results)
284 response_text = format_lightweight_hierarchy_text(
285 organized_results, len(filtered_results)
286 )
287 else:
288 response_text = format_lightweight_hierarchy_text(
289 {}, len(filtered_results)
290 )
292 logger.info(
293 "Hierarchy search completed successfully",
294 result_count=len(filtered_results),
295 first_result_score=(
296 filtered_results[0].score if filtered_results else None
297 ),
298 )
300 # Create structured content for MCP compliance
301 structured_content = self.formatters.create_lightweight_hierarchy_results(
302 filtered_results, organized_results or {}, query
303 )
305 # Format the response with both text and structured content
306 response = self.protocol.create_response(
307 request_id,
308 result={
309 "content": [
310 {
311 "type": "text",
312 "text": response_text,
313 }
314 ],
315 "structuredContent": structured_content,
316 "isError": False,
317 },
318 )
319 logger.debug("Hierarchy search response formatted successfully")
320 return response
322 except Exception as e:
323 logger.error("Error during hierarchy search", exc_info=True)
324 return self.protocol.create_response(
325 request_id,
326 error={"code": -32603, "message": "Internal error", "data": str(e)},
327 )
329 async def handle_attachment_search(
330 self, request_id: str | int | None, params: dict[str, Any]
331 ) -> dict[str, Any]:
332 """Handle attachment search request."""
333 logger.debug("Handling attachment search request with params", params=params)
335 # Validate required parameters
336 if "query" not in params:
337 logger.error("Missing required parameter: query")
338 return self.protocol.create_response(
339 request_id,
340 error={
341 "code": -32602,
342 "message": "Invalid params",
343 "data": "Missing required parameter: query",
344 },
345 )
347 # Extract parameters with defaults
348 query = params["query"]
349 attachment_filter = params.get("attachment_filter", {})
350 include_parent_context = params.get("include_parent_context", True)
351 limit = params.get("limit", 10)
353 logger.info(
354 "Processing attachment search request",
355 query=query,
356 attachment_filter=attachment_filter,
357 include_parent_context=include_parent_context,
358 limit=limit,
359 )
361 try:
362 # Process the query
363 logger.debug("Processing query with OpenAI")
364 processed_query = await self.query_processor.process_query(query)
365 logger.debug(
366 "Query processed successfully", processed_query=processed_query
367 )
369 # Perform the search
370 logger.debug("Executing attachment search in Qdrant")
371 results = await self.search_engine.search(
372 query=processed_query["query"],
373 source_types=None, # Search all sources for attachments
374 limit=limit * 2, # Get more results to filter
375 )
377 # Apply lightweight attachment filters (NEW - supports multi-source)
378 filtered_results = self._apply_lightweight_attachment_filters(
379 results, attachment_filter
380 )
382 # Limit to reasonable number for performance (ensure good navigation)
383 attachment_limit = max(limit, 15) # At least 15 for good navigation
384 filtered_results = filtered_results[:attachment_limit]
386 logger.info(
387 "Attachment search completed successfully",
388 result_count=len(filtered_results),
389 first_result_score=(
390 filtered_results[0].score if filtered_results else None
391 ),
392 )
394 # Create attachment groups for organized display
395 organized_results = {}
396 attachment_groups = []
397 if filtered_results:
398 # Group attachments by type for better organization
399 attachment_groups = self.formatters._organize_attachments_by_type(
400 filtered_results
401 )
402 for group in attachment_groups:
403 group_results = group.get("results", [])
404 organized_results[group["group_name"]] = group_results
406 # Create lightweight text response
407 response_text = format_lightweight_attachment_text(
408 organized_results, len(filtered_results)
409 )
411 # Create lightweight structured content for MCP compliance
412 structured_content = self.formatters.create_lightweight_attachment_results(
413 attachment_groups, query
414 )
416 response = self.protocol.create_response(
417 request_id,
418 result={
419 "content": [
420 {
421 "type": "text",
422 "text": response_text,
423 }
424 ],
425 "structuredContent": structured_content,
426 "isError": False,
427 },
428 )
429 logger.debug("Attachment search response formatted successfully")
430 return response
432 except Exception as e:
433 logger.error("Error during attachment search", exc_info=True)
434 return self.protocol.create_response(
435 request_id,
436 error={"code": -32603, "message": "Internal error", "data": str(e)},
437 )
439 # Back-compat thin wrappers for tests that patch private methods
440 def _apply_hierarchy_filters(self, results, hierarchy_filter):
441 return apply_hierarchy_filters(results, hierarchy_filter)
443 def _organize_by_hierarchy(self, results):
444 return organize_by_hierarchy(results)
446 def _apply_attachment_filters(self, results, attachment_filter):
447 return apply_attachment_filters(results, attachment_filter)
449 def _apply_lightweight_attachment_filters(self, results, attachment_filter):
450 return apply_lightweight_attachment_filters(
451 results,
452 attachment_filter,
453 file_type_extractor=self.formatters._extract_file_type_minimal,
454 )
456 def _format_lightweight_attachment_text(self, organized_results, total_found):
457 return format_lightweight_attachment_text(organized_results, total_found)
459 def _format_lightweight_hierarchy_text(self, organized_results, total_found):
460 return format_lightweight_hierarchy_text(organized_results, total_found)
462 async def handle_expand_document(
463 self, request_id: str | int | None, params: dict[str, Any]
464 ) -> dict[str, Any]:
465 """Return all chunks belonging to a document_id."""
466 logger.debug("Handling expand_document", params=params)
468 # Validate required parameter
469 if (
470 "document_id" not in params
471 or params["document_id"] is None
472 or params["document_id"] == ""
473 ):
474 logger.error("Missing required parameter: document_id")
475 return self.protocol.create_response(
476 request_id,
477 error={
478 "code": -32602,
479 "message": "Invalid params",
480 "data": "Missing required parameter: document_id",
481 },
482 )
484 document_id = params["document_id"]
486 try:
487 logger.info(f"Fetching chunks for document_id={document_id}")
489 # Create Qdrant filter
490 query_filter = models.Filter(
491 must=[
492 models.FieldCondition(
493 key="document_id",
494 match=models.MatchValue(value=document_id),
495 )
496 ]
497 )
499 all_points = []
500 next_offset = None
501 truncated = False
503 collection_name = self.search_engine.config.collection_name
504 MAX_CHUNKS = 500 # Reasonable upper bound
506 # Acquire the search semaphore to prevent overwhelming the
507 # shared Qdrant client connection pool under concurrent load.
508 async with self.search_engine._search_semaphore:
509 while True:
510 remaining = MAX_CHUNKS - len(all_points)
511 if remaining <= 0:
512 truncated = next_offset is not None
513 break
514 points, next_offset = await self.search_engine.client.scroll(
515 collection_name=collection_name,
516 scroll_filter=query_filter,
517 limit=min(100, remaining),
518 offset=next_offset,
519 with_payload=True,
520 with_vectors=False,
521 )
523 all_points.extend(points)
525 if next_offset is None:
526 break
527 if len(all_points) >= MAX_CHUNKS:
528 truncated = True
529 break
530 if not all_points:
531 logger.warning(f"No chunks found for document_id={document_id}")
532 return self.protocol.create_response(
533 request_id,
534 error={
535 "code": -32001,
536 "message": "Document not found",
537 "data": f"No chunks found for document_id: {document_id}",
538 },
539 )
541 logger.info(f"Retrieved {len(all_points)} chunks")
543 # Extract chunk payloads
544 def _chunk_sort_key(point):
545 payload = point.payload or {}
546 metadata = payload.get("metadata") or {}
547 idx = metadata.get("chunk_index", payload.get("chunk_index"))
548 if isinstance(idx, int):
549 return (0, idx, str(point.id))
550 return (1, 0, str(point.id))
552 all_points.sort(key=_chunk_sort_key)
553 chunks = [p.payload for p in all_points]
555 structured_results = {
556 "document_id": document_id,
557 "total_chunks": len(chunks),
558 "chunks": chunks,
559 "truncated": truncated,
560 "query_context": {
561 "original_query": f"expand_document:{document_id}",
562 "is_document_expansion": True,
563 },
564 }
566 return self.protocol.create_response(
567 request_id,
568 result={
569 "content": [
570 {
571 "type": "text",
572 "text": f"Retrieved {len(chunks)} chunks for document {document_id}",
573 }
574 ],
575 "structuredContent": structured_results,
576 "isError": False,
577 },
578 )
580 except Exception as e:
581 logger.error("Error expanding document", exc_info=True)
583 return self.protocol.create_response(
584 request_id,
585 error={
586 "code": -32603,
587 "message": "Internal error",
588 "data": str(e),
589 },
590 )
592 async def handle_expand_chunk_context(
593 self, request_id: str | int | None, params: dict[str, Any]
594 ) -> dict[str, Any]:
595 """
596 Return neighboring chunks within the same document based on chunk_index.
598 Required params:
599 document_id (str): The document identifier.
600 chunk_index (int): The target chunk index to expand around.
602 Optional params:
603 window_size (int): Number of chunks before/after target (default=2).
604 """
605 logger.debug("Handling expand_chunk_context", params=params)
606 # =========================
607 # Validate params
608 # =========================
609 if (
610 "document_id" not in params
611 or params["document_id"] is None
612 or params["document_id"] == ""
613 ):
614 logger.error("Missing required parameter: document_id")
615 return self.protocol.create_response(
616 request_id,
617 error={
618 "code": -32602,
619 "message": "Invalid params",
620 "data": "Missing required parameter: document_id",
621 },
622 )
623 if (
624 "chunk_index" not in params
625 or params["chunk_index"] is None
626 or params["chunk_index"] == ""
627 ):
628 logger.error("Missing required parameter: chunk_index")
629 return self.protocol.create_response(
630 request_id,
631 error={
632 "code": -32602,
633 "message": "Invalid params",
634 "data": "Missing required parameter: chunk_index",
635 },
636 )
638 document_id = params["document_id"]
639 try:
640 chunk_index = int(params["chunk_index"])
641 except (ValueError, TypeError):
642 return self.protocol.create_response(
643 request_id,
644 error={
645 "code": -32602,
646 "message": "Invalid params",
647 "data": "chunk_index must be a valid integer",
648 },
649 )
651 if chunk_index < 0:
652 return self.protocol.create_response(
653 request_id,
654 error={
655 "code": -32602,
656 "message": "Invalid params",
657 "data": "chunk_index must be a non-negative integer",
658 },
659 )
661 MAX_WINDOW_SIZE = 25
662 window_size = params.get("window_size", 2)
664 if (
665 not isinstance(window_size, int)
666 or window_size < 0
667 or window_size > MAX_WINDOW_SIZE
668 ):
669 return self.protocol.create_response(
670 request_id,
671 error={
672 "code": -32602,
673 "message": "Invalid params",
674 "data": f"window_size must be a non-negative integer between 0 and {MAX_WINDOW_SIZE}",
675 },
676 )
678 # =========================
679 # Calculate chunk window
680 # =========================
681 start_chunk = max(chunk_index - window_size, 0)
682 end_chunk = chunk_index + window_size
684 logger.info(
685 f"Expanding document {document_id} "
686 f"around chunk {chunk_index} "
687 f"(window={window_size}, range={start_chunk}-{end_chunk})"
688 )
690 # =========================
691 # Metadata filter retrieval
692 # =========================
694 query_filter = models.Filter(
695 must=[
696 models.FieldCondition(
697 key="document_id",
698 match=models.MatchValue(value=document_id),
699 )
700 ],
701 should=[
702 models.FieldCondition(
703 key="metadata.chunk_index",
704 range=models.Range(
705 gte=start_chunk,
706 lte=end_chunk,
707 ),
708 ),
709 models.FieldCondition(
710 key="chunk_index",
711 range=models.Range(
712 gte=start_chunk,
713 lte=end_chunk,
714 ),
715 ),
716 ],
717 must_not=[],
718 )
720 # =========================
721 # Scroll all matching chunks
722 # =========================
723 all_points = []
724 next_offset = None
725 collection_name = self.search_engine.config.collection_name
726 try:
727 async with self.search_engine._search_semaphore:
728 while True:
729 points, next_offset = await self.search_engine.client.scroll(
730 collection_name=collection_name,
731 scroll_filter=query_filter,
732 offset=next_offset,
733 limit=100,
734 with_payload=True,
735 with_vectors=False,
736 )
737 all_points.extend(points)
738 if next_offset is None:
739 break
740 except Exception as e:
741 logger.error("Error expanding chunk context", exc_info=True)
742 return self.protocol.create_response(
743 request_id,
744 error={
745 "code": -32603,
746 "message": "Internal error",
747 "data": str(e),
748 },
749 )
751 # =========================
752 # Sort by chunk_index with fallback
753 # =========================
754 chunks = sorted(
755 [p.payload for p in all_points],
756 key=lambda x: x.get("metadata", {}).get(
757 "chunk_index", x.get("chunk_index", 0)
758 ),
759 )
761 # =========================
762 # Build context sections
763 # =========================
765 pre_chunks = []
766 post_chunks = []
767 target_chunk = None
769 for chunk in chunks:
770 idx = chunk.get("metadata", {}).get("chunk_index")
771 if idx is None:
772 continue # Skip chunks without chunk_index
774 if idx < chunk_index:
775 pre_chunks.append(chunk)
777 elif idx == chunk_index:
778 target_chunk = chunk
780 else:
781 post_chunks.append(chunk)
783 if target_chunk is None:
784 return self.protocol.create_response(
785 request_id,
786 error={
787 "code": -32001,
788 "message": "Chunk not found",
789 "data": (
790 f"No chunk found for document_id: {document_id}, "
791 f"chunk_index: {chunk_index}"
792 ),
793 },
794 )
795 # =========================
796 # Build structured output
797 # =========================
799 structured_results = {
800 "context_chunks": {
801 "pre": pre_chunks,
802 "target": target_chunk,
803 "post": post_chunks,
804 },
805 "metadata": {
806 "document_id": document_id,
807 "chunk_index": chunk_index,
808 "window_size": window_size,
809 "context_range": {
810 "start": start_chunk,
811 "end": end_chunk,
812 },
813 "total_chunks": len(chunks),
814 },
815 }
817 # =========================
818 # Return response
819 # =========================
820 return self.protocol.create_response(
821 request_id,
822 result={
823 "content": [
824 {
825 "type": "text",
826 "text": f"Retrieved context for chunk {chunk_index} in document {document_id} "
827 f"({len(pre_chunks)} pre, {1 if target_chunk else 0} target, {len(post_chunks)} post)",
828 }
829 ],
830 "structuredContent": structured_results,
831 "isError": False,
832 },
833 )