Coverage for src/qdrant_loader_mcp_server/mcp/search_handler.py: 97%
290 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Search operations handler for MCP server."""
3import inspect
4from typing import Any
6from ..search.components.search_result_models import HybridSearchResult
7from ..search.engine import SearchEngine
8from ..search.processor import QueryProcessor
9from ..utils import LoggingConfig
10from .formatters import MCPFormatters
11from .protocol import MCPProtocol
13# Get logger for this module
14logger = LoggingConfig.get_logger("src.mcp.search_handler")
17class SearchHandler:
18 """Handler for search-related operations."""
20 def __init__(
21 self,
22 search_engine: SearchEngine,
23 query_processor: QueryProcessor,
24 protocol: MCPProtocol,
25 ):
26 """Initialize search handler."""
27 self.search_engine = search_engine
28 self.query_processor = query_processor
29 self.protocol = protocol
30 self.formatters = MCPFormatters()
32 async def handle_search(
33 self, request_id: str | int | None, params: dict[str, Any]
34 ) -> dict[str, Any]:
35 """Handle basic search request."""
36 logger.debug("Handling search request with params", params=params)
38 # Validate required parameters
39 if "query" not in params:
40 logger.error("Missing required parameter: query")
41 return self.protocol.create_response(
42 request_id,
43 error={
44 "code": -32602,
45 "message": "Invalid params",
46 "data": "Missing required parameter: query",
47 },
48 )
50 # Extract parameters with defaults
51 query = params["query"]
52 source_types = params.get("source_types", [])
53 project_ids = params.get("project_ids", [])
54 limit = params.get("limit", 10)
56 logger.info(
57 "Processing search request",
58 query=query,
59 source_types=source_types,
60 project_ids=project_ids,
61 limit=limit,
62 )
64 try:
65 # Process the query
66 logger.debug("Processing query with OpenAI")
67 processed_query = await self.query_processor.process_query(query)
68 logger.debug(
69 "Query processed successfully", processed_query=processed_query
70 )
72 # Perform the search
73 logger.debug("Executing search in Qdrant")
74 results = await self.search_engine.search(
75 query=processed_query["query"],
76 source_types=source_types,
77 project_ids=project_ids,
78 limit=limit,
79 )
80 logger.info(
81 "Search completed successfully",
82 result_count=len(results),
83 first_result_score=results[0].score if results else None,
84 )
86 # Create structured results for MCP 2025-06-18 compliance
87 structured_results = self.formatters.create_structured_search_results(
88 results
89 )
91 # Keep existing text response for backward compatibility
92 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join(
93 self.formatters.format_search_result(result) for result in results
94 )
96 # Format the response with both text and structured content
97 response = self.protocol.create_response(
98 request_id,
99 result={
100 "content": [
101 {
102 "type": "text",
103 "text": text_response,
104 }
105 ],
106 "structuredContent": {
107 "results": structured_results,
108 "total_found": len(results),
109 "query_context": {
110 "original_query": query,
111 "source_types_filtered": source_types,
112 "project_ids_filtered": project_ids,
113 },
114 },
115 "isError": False,
116 },
117 )
118 logger.debug("Search response formatted successfully")
119 return response
121 except Exception as e:
122 logger.error("Error during search", exc_info=True)
123 return self.protocol.create_response(
124 request_id,
125 error={"code": -32603, "message": "Internal error", "data": str(e)},
126 )
128 async def handle_hierarchy_search(
129 self, request_id: str | int | None, params: dict[str, Any]
130 ) -> dict[str, Any]:
131 """Handle hierarchical search request for Confluence documents."""
132 logger.debug("Handling hierarchy search request with params", params=params)
134 # Validate required parameters
135 if "query" not in params:
136 logger.error("Missing required parameter: query")
137 return self.protocol.create_response(
138 request_id,
139 error={
140 "code": -32602,
141 "message": "Invalid params",
142 "data": "Missing required parameter: query",
143 },
144 )
146 # Extract parameters with defaults
147 query = params["query"]
148 hierarchy_filter = params.get("hierarchy_filter", {})
149 organize_by_hierarchy = params.get("organize_by_hierarchy", False)
150 limit = params.get("limit", 10)
152 logger.info(
153 "Processing hierarchy search request",
154 query=query,
155 hierarchy_filter=hierarchy_filter,
156 organize_by_hierarchy=organize_by_hierarchy,
157 limit=limit,
158 )
160 try:
161 # Process the query
162 logger.debug("Processing query with OpenAI")
163 processed_query = await self.query_processor.process_query(query)
164 logger.debug(
165 "Query processed successfully", processed_query=processed_query
166 )
168 # Perform the search (All source types for hierarchy - localfiles have folder structure)
169 logger.debug("Executing hierarchy search in Qdrant")
170 results = await self.search_engine.search(
171 query=processed_query["query"],
172 source_types=[
173 "confluence",
174 "localfile",
175 ], # Include localfiles with folder structure
176 limit=max(
177 limit * 2, 40
178 ), # Get enough results to filter for hierarchy navigation
179 )
181 # Apply hierarchy filters (support sync or async patched functions in tests)
182 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter)
183 filtered_results = (
184 await maybe_filtered
185 if inspect.isawaitable(maybe_filtered)
186 else maybe_filtered
187 )
189 # For hierarchy search, prioritize returning more documents for better hierarchy navigation
190 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit)
191 hierarchy_limit = max(limit, 20)
192 filtered_results = filtered_results[:hierarchy_limit]
194 # Organize results if requested
195 organized_results = None
196 if organize_by_hierarchy:
197 organized_results = self._organize_by_hierarchy(filtered_results)
198 response_text = self._format_lightweight_hierarchy_text(
199 organized_results, len(filtered_results)
200 )
201 else:
202 response_text = self._format_lightweight_hierarchy_text(
203 {}, len(filtered_results)
204 )
206 logger.info(
207 "Hierarchy search completed successfully",
208 result_count=len(filtered_results),
209 first_result_score=(
210 filtered_results[0].score if filtered_results else None
211 ),
212 )
214 # Create structured content for MCP compliance
215 structured_content = self.formatters.create_lightweight_hierarchy_results(
216 filtered_results, organized_results, query
217 )
219 # Format the response with both text and structured content
220 response = self.protocol.create_response(
221 request_id,
222 result={
223 "content": [
224 {
225 "type": "text",
226 "text": response_text,
227 }
228 ],
229 "structuredContent": structured_content,
230 "isError": False,
231 },
232 )
233 logger.debug("Hierarchy search response formatted successfully")
234 return response
236 except Exception as e:
237 logger.error("Error during hierarchy search", exc_info=True)
238 return self.protocol.create_response(
239 request_id,
240 error={"code": -32603, "message": "Internal error", "data": str(e)},
241 )
243 async def handle_attachment_search(
244 self, request_id: str | int | None, params: dict[str, Any]
245 ) -> dict[str, Any]:
246 """Handle attachment search request."""
247 logger.debug("Handling attachment search request with params", params=params)
249 # Validate required parameters
250 if "query" not in params:
251 logger.error("Missing required parameter: query")
252 return self.protocol.create_response(
253 request_id,
254 error={
255 "code": -32602,
256 "message": "Invalid params",
257 "data": "Missing required parameter: query",
258 },
259 )
261 # Extract parameters with defaults
262 query = params["query"]
263 attachment_filter = params.get("attachment_filter", {})
264 include_parent_context = params.get("include_parent_context", True)
265 limit = params.get("limit", 10)
267 logger.info(
268 "Processing attachment search request",
269 query=query,
270 attachment_filter=attachment_filter,
271 include_parent_context=include_parent_context,
272 limit=limit,
273 )
275 try:
276 # Process the query
277 logger.debug("Processing query with OpenAI")
278 processed_query = await self.query_processor.process_query(query)
279 logger.debug(
280 "Query processed successfully", processed_query=processed_query
281 )
283 # Perform the search
284 logger.debug("Executing attachment search in Qdrant")
285 results = await self.search_engine.search(
286 query=processed_query["query"],
287 source_types=None, # Search all sources for attachments
288 limit=limit * 2, # Get more results to filter
289 )
291 # Apply lightweight attachment filters (NEW - supports multi-source)
292 filtered_results = self._apply_lightweight_attachment_filters(
293 results, attachment_filter
294 )
296 # Limit to reasonable number for performance (ensure good navigation)
297 attachment_limit = max(limit, 15) # At least 15 for good navigation
298 filtered_results = filtered_results[:attachment_limit]
300 logger.info(
301 "Attachment search completed successfully",
302 result_count=len(filtered_results),
303 first_result_score=(
304 filtered_results[0].score if filtered_results else None
305 ),
306 )
308 # Create attachment groups for organized display
309 organized_results = {}
310 if filtered_results:
311 # Group attachments by type for better organization
312 attachment_groups = self.formatters._organize_attachments_by_type(
313 filtered_results
314 )
315 for group in attachment_groups:
316 group_results = [
317 r
318 for r in filtered_results
319 if r.document_id in group["document_ids"]
320 ]
321 organized_results[group["group_name"]] = group_results
323 # Create lightweight text response
324 response_text = self._format_lightweight_attachment_text(
325 organized_results, len(filtered_results)
326 )
328 # Create lightweight structured content for MCP compliance
329 structured_content = self.formatters.create_lightweight_attachment_results(
330 filtered_results, attachment_filter, query
331 )
333 response = self.protocol.create_response(
334 request_id,
335 result={
336 "content": [
337 {
338 "type": "text",
339 "text": response_text,
340 }
341 ],
342 "structuredContent": structured_content,
343 "isError": False,
344 },
345 )
346 logger.debug("Attachment search response formatted successfully")
347 return response
349 except Exception as e:
350 logger.error("Error during attachment search", exc_info=True)
351 return self.protocol.create_response(
352 request_id,
353 error={"code": -32603, "message": "Internal error", "data": str(e)},
354 )
356 def _apply_hierarchy_filters(
357 self, results: list[HybridSearchResult], hierarchy_filter: dict[str, Any]
358 ) -> list[HybridSearchResult]:
359 """Apply hierarchy-based filters to search results."""
360 filtered_results = []
362 for result in results:
363 # Only process sources that have hierarchical structure (confluence, localfile)
364 if result.source_type not in ["confluence", "localfile"]:
365 continue
367 # Apply depth filter - use folder depth for localfiles
368 if "depth" in hierarchy_filter:
369 # For localfiles, calculate depth from file_path folder structure
370 file_path_val = getattr(result, "file_path", None)
371 if result.source_type == "localfile" and file_path_val:
372 # Depth = number of folders before filename
373 path_parts = [p for p in file_path_val.split("/") if p]
374 # Depth definition: number of folders before filename minus 1
375 folder_depth = max(0, len(path_parts) - 2)
376 if folder_depth != hierarchy_filter["depth"]:
377 continue
378 elif (
379 hasattr(result, "depth")
380 and result.depth != hierarchy_filter["depth"]
381 ):
382 continue
384 # Apply parent title filter - for localfiles use parent folder
385 if "parent_title" in hierarchy_filter:
386 expected_parent = hierarchy_filter["parent_title"]
387 if result.source_type == "localfile":
388 # For localfiles use folder structure only if path available
389 file_path_val = getattr(result, "file_path", None)
390 if file_path_val:
391 path_parts = [p for p in file_path_val.split("/") if p]
392 parent_folder = path_parts[-2] if len(path_parts) > 1 else ""
393 if parent_folder != expected_parent:
394 continue
395 else:
396 # Without a path we cannot assert parent folder match; skip
397 continue
398 else:
399 # For non-localfile, use explicit parent_title attr only
400 parent_title_val = getattr(result, "parent_title", None)
401 if parent_title_val != expected_parent:
402 continue
404 # Apply root only filter
405 if hierarchy_filter.get("root_only", False):
406 # For localfiles, check if it's in the root folder
407 file_path_val = getattr(result, "file_path", None)
408 if result.source_type == "localfile" and file_path_val:
409 path_parts = [p for p in file_path_val.split("/") if p]
410 is_root = len(path_parts) <= 2 # Root folder + filename
411 if not is_root:
412 continue
413 elif not result.is_root_document():
414 continue
416 # Apply has children filter - skip for localfiles as we don't track child relationships
417 if "has_children" in hierarchy_filter and result.source_type != "localfile":
418 if result.has_children() != hierarchy_filter["has_children"]:
419 continue
421 filtered_results.append(result)
423 return filtered_results
425 def _organize_by_hierarchy(
426 self, results: list[HybridSearchResult]
427 ) -> dict[str, list[HybridSearchResult]]:
428 """Organize search results by hierarchy structure."""
429 hierarchy_groups = {}
431 for result in results:
432 # Group by root ancestor or use the document title if it's a root
433 file_path_val = getattr(result, "file_path", None)
434 if result.source_type == "localfile" and file_path_val:
435 # For localfiles, use top-level folder as root
436 path_parts = [p for p in file_path_val.split("/") if p]
437 root_title = path_parts[0] if path_parts else "Root"
438 elif result.breadcrumb_text:
439 # Extract the root from breadcrumb
440 breadcrumb_parts = result.breadcrumb_text.split(" > ")
441 root_title = (
442 breadcrumb_parts[0] if breadcrumb_parts else result.source_title
443 )
444 else:
445 root_title = result.source_title
447 if root_title not in hierarchy_groups:
448 hierarchy_groups[root_title] = []
449 hierarchy_groups[root_title].append(result)
451 # Sort within each group by depth and title
452 for group in hierarchy_groups.values():
454 def sort_key(x):
455 # Calculate depth for localfiles from folder structure
456 x_file_path = getattr(x, "file_path", None)
457 if x.source_type == "localfile" and x_file_path:
458 folder_depth = len([p for p in x_file_path.split("/") if p]) - 1
459 return (folder_depth, x.source_title)
460 else:
461 return (x.depth or 0, x.source_title)
463 group.sort(key=sort_key)
465 return hierarchy_groups
467 def _apply_attachment_filters(
468 self, results: list[HybridSearchResult], attachment_filter: dict[str, Any]
469 ) -> list[HybridSearchResult]:
470 """Apply attachment-based filters to search results."""
471 filtered_results = []
473 for result in results:
474 # Skip non-Confluence results
475 if result.source_type != "confluence":
476 continue
478 # Apply attachments only filter
479 if "attachments_only" in attachment_filter and not result.is_attachment:
480 continue
482 # Apply parent document title filter
483 if "parent_document_title" in attachment_filter:
484 if (
485 result.parent_document_title
486 != attachment_filter["parent_document_title"]
487 ):
488 continue
490 # Apply file type filter
491 if "file_type" in attachment_filter:
492 result_file_type = result.get_file_type()
493 if result_file_type != attachment_filter["file_type"]:
494 continue
496 # Apply file size filter
497 _min_size = attachment_filter.get("file_size_min")
498 if (
499 _min_size is not None
500 and result.file_size is not None
501 and result.file_size < _min_size
502 ):
503 continue
504 _max_size = attachment_filter.get("file_size_max")
505 if (
506 _max_size is not None
507 and result.file_size is not None
508 and result.file_size > _max_size
509 ):
510 continue
512 # Apply author filter
513 if "author" in attachment_filter:
514 if result.attachment_author != attachment_filter["author"]:
515 continue
517 filtered_results.append(result)
519 return filtered_results
521 def _apply_lightweight_attachment_filters(
522 self, results: list[HybridSearchResult], attachment_filter: dict[str, Any]
523 ) -> list[HybridSearchResult]:
524 """Fast filtering optimized for attachment discovery across all sources."""
525 filtered_results = []
527 for result in results:
528 # Quick attachment detection - avoid expensive checks
529 _is_attachment_flag = bool(getattr(result, "is_attachment", False))
530 _original_filename = getattr(result, "original_filename", None)
531 _file_path = getattr(result, "file_path", None)
532 _is_path_file = (
533 isinstance(_file_path, str)
534 and "." in _file_path
535 and not _file_path.endswith("/")
536 )
537 is_attachment = (
538 _is_attachment_flag or bool(_original_filename) or _is_path_file
539 )
541 if not is_attachment:
542 continue
544 # Apply filters with early exits for performance
545 if attachment_filter.get("attachments_only") and not bool(
546 getattr(result, "is_attachment", False)
547 ):
548 continue
550 if attachment_filter.get("file_type"):
551 file_type = self.formatters._extract_file_type_minimal(result)
552 if file_type != attachment_filter["file_type"]:
553 continue
555 # Size filters with null checks (include zero-byte files)
556 _file_size = getattr(result, "file_size", None)
557 if (
558 attachment_filter.get("file_size_min") is not None
559 and _file_size is not None
560 and _file_size < attachment_filter["file_size_min"]
561 ):
562 continue
564 if (
565 attachment_filter.get("file_size_max") is not None
566 and _file_size is not None
567 and _file_size > attachment_filter["file_size_max"]
568 ):
569 continue
571 # Parent document filter (works across source types)
572 if attachment_filter.get("parent_document_title"):
573 parent_title = getattr(
574 result, "parent_document_title", None
575 ) or getattr(result, "parent_title", None)
576 if parent_title != attachment_filter["parent_document_title"]:
577 continue
579 # Author filter
580 if attachment_filter.get("author"):
581 author = getattr(result, "attachment_author", None) or getattr(
582 result, "author", None
583 )
584 if author != attachment_filter["author"]:
585 continue
587 filtered_results.append(result)
589 return filtered_results
591 def _format_lightweight_attachment_text(
592 self, organized_results: dict[str, list], total_found: int
593 ) -> str:
594 """Format attachment results as lightweight text summary."""
595 if not organized_results:
596 return f"📎 **Attachment Search Results**\n\nFound {total_found} attachments. Use the structured data below to navigate and retrieve specific files."
598 formatted = f"📎 **Attachment Search Results** ({total_found} attachments)\n\n"
600 for group_name, results in organized_results.items():
601 formatted += f"📁 **{group_name}** ({len(results)} files)\n"
603 # Show first few attachments as examples
604 for result in results[:3]:
605 filename = self.formatters._extract_safe_filename(result)
606 file_type = self.formatters._extract_file_type_minimal(result)
607 formatted += (
608 f" 📄 {filename} ({file_type}) - Score: {result.score:.3f}\n"
609 )
611 if len(results) > 3:
612 formatted += f" ... and {len(results) - 3} more files\n"
613 formatted += "\n"
615 formatted += "💡 **Usage:** Use the structured attachment data to:\n"
616 formatted += "• Browse attachments by file type or source\n"
617 formatted += "• Get document IDs for specific file content retrieval\n"
618 formatted += "• Filter attachments by metadata (size, type, etc.)\n"
620 return formatted
622 def _format_lightweight_hierarchy_text(
623 self, organized_results: dict[str, list], total_found: int
624 ) -> str:
625 """Format hierarchy results as lightweight text summary."""
626 if not organized_results:
627 return f"📋 **Hierarchy Search Results**\n\nFound {total_found} documents. Use the structured data below to navigate the hierarchy and retrieve specific documents."
629 formatted = f"📋 **Hierarchy Search Results** ({total_found} documents)\n\n"
631 for group_name, results in organized_results.items():
632 clean_name = self.formatters._generate_clean_group_name(group_name, results)
633 formatted += f"📁 **{clean_name}** ({len(results)} documents)\n"
635 # Show first few documents as examples
636 for result in results[:3]:
637 formatted += f" 📄 {result.source_title} (Score: {result.score:.3f})\n"
639 if len(results) > 3:
640 formatted += f" ... and {len(results) - 3} more documents\n"
641 formatted += "\n"
643 formatted += "💡 **Usage:** Use the structured hierarchy data to:\n"
644 formatted += "• Browse document groups and navigate hierarchy levels\n"
645 formatted += "• Get document IDs for specific content retrieval\n"
646 formatted += "• Understand document relationships and organization\n"
648 return formatted
650 async def handle_expand_document(
651 self, request_id: str | int | None, params: dict[str, Any]
652 ) -> dict[str, Any]:
653 """Handle expand document request for lazy loading using standard search format."""
654 logger.debug("Handling expand document with params", params=params)
656 # Validate required parameter
657 if (
658 "document_id" not in params
659 or params["document_id"] is None
660 or params["document_id"] == ""
661 ):
662 logger.error("Missing required parameter: document_id")
663 return self.protocol.create_response(
664 request_id,
665 error={
666 "code": -32602,
667 "message": "Invalid params",
668 "data": "Missing required parameter: document_id",
669 },
670 )
672 document_id = params["document_id"]
674 try:
675 logger.info(f"Expanding document with ID: {document_id}")
677 # Search for the document - field search doesn't guarantee exact matches
678 # Try document_id field search first, but get more results to filter
679 results = await self.search_engine.search(
680 query=f"document_id:{document_id}",
681 limit=10, # Get more results to ensure we find the exact match
682 )
684 # Filter for exact document_id matches
685 exact_matches = [r for r in results if r.document_id == document_id]
686 if exact_matches:
687 results = exact_matches[:1] # Take only the first exact match
688 else:
689 # Fallback to general search if no exact match in field search
690 results = await self.search_engine.search(query=document_id, limit=10)
691 # Filter again for exact document_id matches
692 exact_matches = [r for r in results if r.document_id == document_id]
693 if exact_matches:
694 results = exact_matches[:1]
695 else:
696 results = []
698 if not results:
699 logger.warning(f"Document not found with ID: {document_id}")
700 return self.protocol.create_response(
701 request_id,
702 error={
703 "code": -32604,
704 "message": "Document not found",
705 "data": f"No document found with ID: {document_id}",
706 },
707 )
709 logger.info(f"Successfully found document: {results[0].source_title}")
711 # Use the existing search result formatting - exactly the same as standard search
712 formatted_results = (
713 "Found 1 document:\n\n"
714 + self.formatters.format_search_result(results[0])
715 )
716 structured_results_list = self.formatters.create_structured_search_results(
717 results
718 )
720 # Create the same structure as standard search
721 structured_results = {
722 "results": structured_results_list,
723 "total_found": len(results),
724 "query_context": {
725 "original_query": f"expand_document:{document_id}",
726 "source_types_filtered": [],
727 "project_ids_filtered": [],
728 "is_document_expansion": True,
729 },
730 }
732 return self.protocol.create_response(
733 request_id,
734 result={
735 "content": [
736 {
737 "type": "text",
738 "text": formatted_results,
739 }
740 ],
741 "structuredContent": structured_results,
742 "isError": False,
743 },
744 )
746 except Exception as e:
747 logger.error("Error expanding document", exc_info=True)
748 return self.protocol.create_response(
749 request_id,
750 error={"code": -32603, "message": "Internal error", "data": str(e)},
751 )