Coverage for src/qdrant_loader_mcp_server/mcp/search_handler.py: 100%
148 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""Search operations handler for MCP server."""
3import inspect
4from typing import Any
6from ..search.engine import SearchEngine
7from ..search.processor import QueryProcessor
8from ..utils import LoggingConfig
9from .formatters import MCPFormatters
10from .handlers.search import (
11 apply_attachment_filters,
12 apply_hierarchy_filters,
13 apply_lightweight_attachment_filters,
14 format_lightweight_attachment_text,
15 format_lightweight_hierarchy_text,
16 organize_by_hierarchy,
17)
18from .protocol import MCPProtocol
20# Get logger for this module
21logger = LoggingConfig.get_logger("src.mcp.search_handler")
24class SearchHandler:
25 """Handler for search-related operations."""
27 def __init__(
28 self,
29 search_engine: SearchEngine,
30 query_processor: QueryProcessor,
31 protocol: MCPProtocol,
32 ):
33 """Initialize search handler."""
34 self.search_engine = search_engine
35 self.query_processor = query_processor
36 self.protocol = protocol
37 self.formatters = MCPFormatters()
39 async def handle_search(
40 self, request_id: str | int | None, params: dict[str, Any]
41 ) -> dict[str, Any]:
42 """Handle basic search request."""
43 logger.debug("Handling search request with params", params=params)
45 # Validate required parameters
46 if "query" not in params:
47 logger.error("Missing required parameter: query")
48 return self.protocol.create_response(
49 request_id,
50 error={
51 "code": -32602,
52 "message": "Invalid params",
53 "data": "Missing required parameter: query",
54 },
55 )
57 # Extract parameters with defaults
58 query = params["query"]
59 source_types = params.get("source_types", [])
60 project_ids = params.get("project_ids", [])
61 limit = params.get("limit", 10)
63 logger.info(
64 "Processing search request",
65 query=query,
66 source_types=source_types,
67 project_ids=project_ids,
68 limit=limit,
69 )
71 try:
72 # Process the query
73 logger.debug("Processing query with OpenAI")
74 processed_query = await self.query_processor.process_query(query)
75 logger.debug(
76 "Query processed successfully", processed_query=processed_query
77 )
79 # Perform the search
80 logger.debug("Executing search in Qdrant")
81 results = await self.search_engine.search(
82 query=processed_query["query"],
83 source_types=source_types,
84 project_ids=project_ids,
85 limit=limit,
86 )
87 logger.info(
88 "Search completed successfully",
89 result_count=len(results),
90 first_result_score=results[0].score if results else None,
91 )
93 # Create structured results for MCP 2025-06-18 compliance
94 structured_results = self.formatters.create_structured_search_results(
95 results
96 )
98 # Keep existing text response for backward compatibility
99 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join(
100 self.formatters.format_search_result(result) for result in results
101 )
103 # Format the response with both text and structured content
104 response = self.protocol.create_response(
105 request_id,
106 result={
107 "content": [
108 {
109 "type": "text",
110 "text": text_response,
111 }
112 ],
113 "structuredContent": {
114 "results": structured_results,
115 "total_found": len(results),
116 "query_context": {
117 "original_query": query,
118 "source_types_filtered": source_types,
119 "project_ids_filtered": project_ids,
120 },
121 },
122 "isError": False,
123 },
124 )
125 logger.debug("Search response formatted successfully")
126 return response
128 except Exception as e:
129 logger.error("Error during search", exc_info=True)
130 return self.protocol.create_response(
131 request_id,
132 error={"code": -32603, "message": "Internal error", "data": str(e)},
133 )
135 async def handle_hierarchy_search(
136 self, request_id: str | int | None, params: dict[str, Any]
137 ) -> dict[str, Any]:
138 """Handle hierarchical search request for Confluence documents."""
139 logger.debug("Handling hierarchy search request with params", params=params)
141 # Validate required parameters
142 if "query" not in params:
143 logger.error("Missing required parameter: query")
144 return self.protocol.create_response(
145 request_id,
146 error={
147 "code": -32602,
148 "message": "Invalid params",
149 "data": "Missing required parameter: query",
150 },
151 )
153 # Extract parameters with defaults
154 query = params["query"]
155 hierarchy_filter = params.get("hierarchy_filter", {})
156 organize_flag = params.get("organize_by_hierarchy", False)
157 limit = params.get("limit", 10)
159 logger.info(
160 "Processing hierarchy search request",
161 query=query,
162 hierarchy_filter=hierarchy_filter,
163 organize_by_hierarchy=organize_by_hierarchy,
164 limit=limit,
165 )
167 try:
168 # Process the query
169 logger.debug("Processing query with OpenAI")
170 processed_query = await self.query_processor.process_query(query)
171 logger.debug(
172 "Query processed successfully", processed_query=processed_query
173 )
175 # Perform the search (All source types for hierarchy - localfiles have folder structure)
176 logger.debug("Executing hierarchy search in Qdrant")
177 results = await self.search_engine.search(
178 query=processed_query["query"],
179 source_types=[
180 "confluence",
181 "localfile",
182 ], # Include localfiles with folder structure
183 limit=max(
184 limit * 2, 40
185 ), # Get enough results to filter for hierarchy navigation
186 )
188 # Apply hierarchy filters (support sync or async patched functions in tests)
189 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter)
190 filtered_results = (
191 await maybe_filtered
192 if inspect.isawaitable(maybe_filtered)
193 else maybe_filtered
194 )
196 # For hierarchy search, prioritize returning more documents for better hierarchy navigation
197 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit)
198 hierarchy_limit = max(limit, 20)
199 filtered_results = filtered_results[:hierarchy_limit]
201 # Organize results if requested
202 organized_results = None
203 if organize_flag:
204 organized_results = self._organize_by_hierarchy(filtered_results)
205 response_text = format_lightweight_hierarchy_text(
206 organized_results, len(filtered_results)
207 )
208 else:
209 response_text = format_lightweight_hierarchy_text(
210 {}, len(filtered_results)
211 )
213 logger.info(
214 "Hierarchy search completed successfully",
215 result_count=len(filtered_results),
216 first_result_score=(
217 filtered_results[0].score if filtered_results else None
218 ),
219 )
221 # Create structured content for MCP compliance
222 structured_content = self.formatters.create_lightweight_hierarchy_results(
223 filtered_results, organized_results or {}, query
224 )
226 # Format the response with both text and structured content
227 response = self.protocol.create_response(
228 request_id,
229 result={
230 "content": [
231 {
232 "type": "text",
233 "text": response_text,
234 }
235 ],
236 "structuredContent": structured_content,
237 "isError": False,
238 },
239 )
240 logger.debug("Hierarchy search response formatted successfully")
241 return response
243 except Exception as e:
244 logger.error("Error during hierarchy search", exc_info=True)
245 return self.protocol.create_response(
246 request_id,
247 error={"code": -32603, "message": "Internal error", "data": str(e)},
248 )
250 async def handle_attachment_search(
251 self, request_id: str | int | None, params: dict[str, Any]
252 ) -> dict[str, Any]:
253 """Handle attachment search request."""
254 logger.debug("Handling attachment search request with params", params=params)
256 # Validate required parameters
257 if "query" not in params:
258 logger.error("Missing required parameter: query")
259 return self.protocol.create_response(
260 request_id,
261 error={
262 "code": -32602,
263 "message": "Invalid params",
264 "data": "Missing required parameter: query",
265 },
266 )
268 # Extract parameters with defaults
269 query = params["query"]
270 attachment_filter = params.get("attachment_filter", {})
271 include_parent_context = params.get("include_parent_context", True)
272 limit = params.get("limit", 10)
274 logger.info(
275 "Processing attachment search request",
276 query=query,
277 attachment_filter=attachment_filter,
278 include_parent_context=include_parent_context,
279 limit=limit,
280 )
282 try:
283 # Process the query
284 logger.debug("Processing query with OpenAI")
285 processed_query = await self.query_processor.process_query(query)
286 logger.debug(
287 "Query processed successfully", processed_query=processed_query
288 )
290 # Perform the search
291 logger.debug("Executing attachment search in Qdrant")
292 results = await self.search_engine.search(
293 query=processed_query["query"],
294 source_types=None, # Search all sources for attachments
295 limit=limit * 2, # Get more results to filter
296 )
298 # Apply lightweight attachment filters (NEW - supports multi-source)
299 filtered_results = self._apply_lightweight_attachment_filters(
300 results, attachment_filter
301 )
303 # Limit to reasonable number for performance (ensure good navigation)
304 attachment_limit = max(limit, 15) # At least 15 for good navigation
305 filtered_results = filtered_results[:attachment_limit]
307 logger.info(
308 "Attachment search completed successfully",
309 result_count=len(filtered_results),
310 first_result_score=(
311 filtered_results[0].score if filtered_results else None
312 ),
313 )
315 # Create attachment groups for organized display
316 organized_results = {}
317 attachment_groups = []
318 if filtered_results:
319 # Group attachments by type for better organization
320 attachment_groups = self.formatters._organize_attachments_by_type(
321 filtered_results
322 )
323 for group in attachment_groups:
324 group_results = group.get("results", [])
325 organized_results[group["group_name"]] = group_results
327 # Create lightweight text response
328 response_text = format_lightweight_attachment_text(
329 organized_results, len(filtered_results)
330 )
332 # Create lightweight structured content for MCP compliance
333 structured_content = self.formatters.create_lightweight_attachment_results(
334 attachment_groups, query
335 )
337 response = self.protocol.create_response(
338 request_id,
339 result={
340 "content": [
341 {
342 "type": "text",
343 "text": response_text,
344 }
345 ],
346 "structuredContent": structured_content,
347 "isError": False,
348 },
349 )
350 logger.debug("Attachment search response formatted successfully")
351 return response
353 except Exception as e:
354 logger.error("Error during attachment search", exc_info=True)
355 return self.protocol.create_response(
356 request_id,
357 error={"code": -32603, "message": "Internal error", "data": str(e)},
358 )
360 # Back-compat thin wrappers for tests that patch private methods
361 def _apply_hierarchy_filters(self, results, hierarchy_filter):
362 return apply_hierarchy_filters(results, hierarchy_filter)
364 def _organize_by_hierarchy(self, results):
365 return organize_by_hierarchy(results)
367 def _apply_attachment_filters(self, results, attachment_filter):
368 return apply_attachment_filters(results, attachment_filter)
370 def _apply_lightweight_attachment_filters(self, results, attachment_filter):
371 return apply_lightweight_attachment_filters(
372 results,
373 attachment_filter,
374 file_type_extractor=self.formatters._extract_file_type_minimal,
375 )
377 def _format_lightweight_attachment_text(self, organized_results, total_found):
378 return format_lightweight_attachment_text(organized_results, total_found)
380 def _format_lightweight_hierarchy_text(self, organized_results, total_found):
381 return format_lightweight_hierarchy_text(organized_results, total_found)
383 async def handle_expand_document(
384 self, request_id: str | int | None, params: dict[str, Any]
385 ) -> dict[str, Any]:
386 """Handle expand document request for lazy loading using standard search format."""
387 logger.debug("Handling expand document with params", params=params)
389 # Validate required parameter
390 if (
391 "document_id" not in params
392 or params["document_id"] is None
393 or params["document_id"] == ""
394 ):
395 logger.error("Missing required parameter: document_id")
396 return self.protocol.create_response(
397 request_id,
398 error={
399 "code": -32602,
400 "message": "Invalid params",
401 "data": "Missing required parameter: document_id",
402 },
403 )
405 document_id = params["document_id"]
407 try:
408 logger.info(f"Expanding document with ID: {document_id}")
410 # Search for the document - field search doesn't guarantee exact matches
411 # Try document_id field search first, but get more results to filter
412 results = await self.search_engine.search(
413 query=f"document_id:{document_id}",
414 limit=10, # Get more results to ensure we find the exact match
415 )
417 # Filter for exact document_id matches
418 exact_matches = [r for r in results if r.document_id == document_id]
419 if exact_matches:
420 results = exact_matches[:1] # Take only the first exact match
421 else:
422 # Fallback to general search if no exact match in field search
423 results = await self.search_engine.search(query=document_id, limit=10)
424 # Filter again for exact document_id matches
425 exact_matches = [r for r in results if r.document_id == document_id]
426 if exact_matches:
427 results = exact_matches[:1]
428 else:
429 results = []
431 if not results:
432 logger.warning(f"Document not found with ID: {document_id}")
433 return self.protocol.create_response(
434 request_id,
435 error={
436 "code": -32604,
437 "message": "Document not found",
438 "data": f"No document found with ID: {document_id}",
439 },
440 )
442 logger.info(f"Successfully found document: {results[0].source_title}")
444 # Use the existing search result formatting - exactly the same as standard search
445 formatted_results = (
446 "Found 1 document:\n\n"
447 + self.formatters.format_search_result(results[0])
448 )
449 structured_results_list = self.formatters.create_structured_search_results(
450 results
451 )
453 # Create the same structure as standard search
454 structured_results = {
455 "results": structured_results_list,
456 "total_found": len(results),
457 "query_context": {
458 "original_query": f"expand_document:{document_id}",
459 "source_types_filtered": [],
460 "project_ids_filtered": [],
461 "is_document_expansion": True,
462 },
463 }
465 return self.protocol.create_response(
466 request_id,
467 result={
468 "content": [
469 {
470 "type": "text",
471 "text": formatted_results,
472 }
473 ],
474 "structuredContent": structured_results,
475 "isError": False,
476 },
477 )
479 except Exception as e:
480 logger.error("Error expanding document", exc_info=True)
481 return self.protocol.create_response(
482 request_id,
483 error={"code": -32603, "message": "Internal error", "data": str(e)},
484 )