Coverage for src/qdrant_loader_mcp_server/mcp/search_handler.py: 97%

290 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Search operations handler for MCP server.""" 

2 

3import inspect 

4from typing import Any 

5 

6from ..search.components.search_result_models import HybridSearchResult 

7from ..search.engine import SearchEngine 

8from ..search.processor import QueryProcessor 

9from ..utils import LoggingConfig 

10from .formatters import MCPFormatters 

11from .protocol import MCPProtocol 

12 

13# Get logger for this module 

14logger = LoggingConfig.get_logger("src.mcp.search_handler") 

15 

16 

17class SearchHandler: 

18 """Handler for search-related operations.""" 

19 

20 def __init__( 

21 self, 

22 search_engine: SearchEngine, 

23 query_processor: QueryProcessor, 

24 protocol: MCPProtocol, 

25 ): 

26 """Initialize search handler.""" 

27 self.search_engine = search_engine 

28 self.query_processor = query_processor 

29 self.protocol = protocol 

30 self.formatters = MCPFormatters() 

31 

32 async def handle_search( 

33 self, request_id: str | int | None, params: dict[str, Any] 

34 ) -> dict[str, Any]: 

35 """Handle basic search request.""" 

36 logger.debug("Handling search request with params", params=params) 

37 

38 # Validate required parameters 

39 if "query" not in params: 

40 logger.error("Missing required parameter: query") 

41 return self.protocol.create_response( 

42 request_id, 

43 error={ 

44 "code": -32602, 

45 "message": "Invalid params", 

46 "data": "Missing required parameter: query", 

47 }, 

48 ) 

49 

50 # Extract parameters with defaults 

51 query = params["query"] 

52 source_types = params.get("source_types", []) 

53 project_ids = params.get("project_ids", []) 

54 limit = params.get("limit", 10) 

55 

56 logger.info( 

57 "Processing search request", 

58 query=query, 

59 source_types=source_types, 

60 project_ids=project_ids, 

61 limit=limit, 

62 ) 

63 

64 try: 

65 # Process the query 

66 logger.debug("Processing query with OpenAI") 

67 processed_query = await self.query_processor.process_query(query) 

68 logger.debug( 

69 "Query processed successfully", processed_query=processed_query 

70 ) 

71 

72 # Perform the search 

73 logger.debug("Executing search in Qdrant") 

74 results = await self.search_engine.search( 

75 query=processed_query["query"], 

76 source_types=source_types, 

77 project_ids=project_ids, 

78 limit=limit, 

79 ) 

80 logger.info( 

81 "Search completed successfully", 

82 result_count=len(results), 

83 first_result_score=results[0].score if results else None, 

84 ) 

85 

86 # Create structured results for MCP 2025-06-18 compliance 

87 structured_results = self.formatters.create_structured_search_results( 

88 results 

89 ) 

90 

91 # Keep existing text response for backward compatibility 

92 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join( 

93 self.formatters.format_search_result(result) for result in results 

94 ) 

95 

96 # Format the response with both text and structured content 

97 response = self.protocol.create_response( 

98 request_id, 

99 result={ 

100 "content": [ 

101 { 

102 "type": "text", 

103 "text": text_response, 

104 } 

105 ], 

106 "structuredContent": { 

107 "results": structured_results, 

108 "total_found": len(results), 

109 "query_context": { 

110 "original_query": query, 

111 "source_types_filtered": source_types, 

112 "project_ids_filtered": project_ids, 

113 }, 

114 }, 

115 "isError": False, 

116 }, 

117 ) 

118 logger.debug("Search response formatted successfully") 

119 return response 

120 

121 except Exception as e: 

122 logger.error("Error during search", exc_info=True) 

123 return self.protocol.create_response( 

124 request_id, 

125 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

126 ) 

127 

128 async def handle_hierarchy_search( 

129 self, request_id: str | int | None, params: dict[str, Any] 

130 ) -> dict[str, Any]: 

131 """Handle hierarchical search request for Confluence documents.""" 

132 logger.debug("Handling hierarchy search request with params", params=params) 

133 

134 # Validate required parameters 

135 if "query" not in params: 

136 logger.error("Missing required parameter: query") 

137 return self.protocol.create_response( 

138 request_id, 

139 error={ 

140 "code": -32602, 

141 "message": "Invalid params", 

142 "data": "Missing required parameter: query", 

143 }, 

144 ) 

145 

146 # Extract parameters with defaults 

147 query = params["query"] 

148 hierarchy_filter = params.get("hierarchy_filter", {}) 

149 organize_by_hierarchy = params.get("organize_by_hierarchy", False) 

150 limit = params.get("limit", 10) 

151 

152 logger.info( 

153 "Processing hierarchy search request", 

154 query=query, 

155 hierarchy_filter=hierarchy_filter, 

156 organize_by_hierarchy=organize_by_hierarchy, 

157 limit=limit, 

158 ) 

159 

160 try: 

161 # Process the query 

162 logger.debug("Processing query with OpenAI") 

163 processed_query = await self.query_processor.process_query(query) 

164 logger.debug( 

165 "Query processed successfully", processed_query=processed_query 

166 ) 

167 

168 # Perform the search (All source types for hierarchy - localfiles have folder structure) 

169 logger.debug("Executing hierarchy search in Qdrant") 

170 results = await self.search_engine.search( 

171 query=processed_query["query"], 

172 source_types=[ 

173 "confluence", 

174 "localfile", 

175 ], # Include localfiles with folder structure 

176 limit=max( 

177 limit * 2, 40 

178 ), # Get enough results to filter for hierarchy navigation 

179 ) 

180 

181 # Apply hierarchy filters (support sync or async patched functions in tests) 

182 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter) 

183 filtered_results = ( 

184 await maybe_filtered 

185 if inspect.isawaitable(maybe_filtered) 

186 else maybe_filtered 

187 ) 

188 

189 # For hierarchy search, prioritize returning more documents for better hierarchy navigation 

190 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit) 

191 hierarchy_limit = max(limit, 20) 

192 filtered_results = filtered_results[:hierarchy_limit] 

193 

194 # Organize results if requested 

195 organized_results = None 

196 if organize_by_hierarchy: 

197 organized_results = self._organize_by_hierarchy(filtered_results) 

198 response_text = self._format_lightweight_hierarchy_text( 

199 organized_results, len(filtered_results) 

200 ) 

201 else: 

202 response_text = self._format_lightweight_hierarchy_text( 

203 {}, len(filtered_results) 

204 ) 

205 

206 logger.info( 

207 "Hierarchy search completed successfully", 

208 result_count=len(filtered_results), 

209 first_result_score=( 

210 filtered_results[0].score if filtered_results else None 

211 ), 

212 ) 

213 

214 # Create structured content for MCP compliance 

215 structured_content = self.formatters.create_lightweight_hierarchy_results( 

216 filtered_results, organized_results, query 

217 ) 

218 

219 # Format the response with both text and structured content 

220 response = self.protocol.create_response( 

221 request_id, 

222 result={ 

223 "content": [ 

224 { 

225 "type": "text", 

226 "text": response_text, 

227 } 

228 ], 

229 "structuredContent": structured_content, 

230 "isError": False, 

231 }, 

232 ) 

233 logger.debug("Hierarchy search response formatted successfully") 

234 return response 

235 

236 except Exception as e: 

237 logger.error("Error during hierarchy search", exc_info=True) 

238 return self.protocol.create_response( 

239 request_id, 

240 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

241 ) 

242 

243 async def handle_attachment_search( 

244 self, request_id: str | int | None, params: dict[str, Any] 

245 ) -> dict[str, Any]: 

246 """Handle attachment search request.""" 

247 logger.debug("Handling attachment search request with params", params=params) 

248 

249 # Validate required parameters 

250 if "query" not in params: 

251 logger.error("Missing required parameter: query") 

252 return self.protocol.create_response( 

253 request_id, 

254 error={ 

255 "code": -32602, 

256 "message": "Invalid params", 

257 "data": "Missing required parameter: query", 

258 }, 

259 ) 

260 

261 # Extract parameters with defaults 

262 query = params["query"] 

263 attachment_filter = params.get("attachment_filter", {}) 

264 include_parent_context = params.get("include_parent_context", True) 

265 limit = params.get("limit", 10) 

266 

267 logger.info( 

268 "Processing attachment search request", 

269 query=query, 

270 attachment_filter=attachment_filter, 

271 include_parent_context=include_parent_context, 

272 limit=limit, 

273 ) 

274 

275 try: 

276 # Process the query 

277 logger.debug("Processing query with OpenAI") 

278 processed_query = await self.query_processor.process_query(query) 

279 logger.debug( 

280 "Query processed successfully", processed_query=processed_query 

281 ) 

282 

283 # Perform the search 

284 logger.debug("Executing attachment search in Qdrant") 

285 results = await self.search_engine.search( 

286 query=processed_query["query"], 

287 source_types=None, # Search all sources for attachments 

288 limit=limit * 2, # Get more results to filter 

289 ) 

290 

291 # Apply lightweight attachment filters (NEW - supports multi-source) 

292 filtered_results = self._apply_lightweight_attachment_filters( 

293 results, attachment_filter 

294 ) 

295 

296 # Limit to reasonable number for performance (ensure good navigation) 

297 attachment_limit = max(limit, 15) # At least 15 for good navigation 

298 filtered_results = filtered_results[:attachment_limit] 

299 

300 logger.info( 

301 "Attachment search completed successfully", 

302 result_count=len(filtered_results), 

303 first_result_score=( 

304 filtered_results[0].score if filtered_results else None 

305 ), 

306 ) 

307 

308 # Create attachment groups for organized display 

309 organized_results = {} 

310 if filtered_results: 

311 # Group attachments by type for better organization 

312 attachment_groups = self.formatters._organize_attachments_by_type( 

313 filtered_results 

314 ) 

315 for group in attachment_groups: 

316 group_results = [ 

317 r 

318 for r in filtered_results 

319 if r.document_id in group["document_ids"] 

320 ] 

321 organized_results[group["group_name"]] = group_results 

322 

323 # Create lightweight text response 

324 response_text = self._format_lightweight_attachment_text( 

325 organized_results, len(filtered_results) 

326 ) 

327 

328 # Create lightweight structured content for MCP compliance 

329 structured_content = self.formatters.create_lightweight_attachment_results( 

330 filtered_results, attachment_filter, query 

331 ) 

332 

333 response = self.protocol.create_response( 

334 request_id, 

335 result={ 

336 "content": [ 

337 { 

338 "type": "text", 

339 "text": response_text, 

340 } 

341 ], 

342 "structuredContent": structured_content, 

343 "isError": False, 

344 }, 

345 ) 

346 logger.debug("Attachment search response formatted successfully") 

347 return response 

348 

349 except Exception as e: 

350 logger.error("Error during attachment search", exc_info=True) 

351 return self.protocol.create_response( 

352 request_id, 

353 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

354 ) 

355 

356 def _apply_hierarchy_filters( 

357 self, results: list[HybridSearchResult], hierarchy_filter: dict[str, Any] 

358 ) -> list[HybridSearchResult]: 

359 """Apply hierarchy-based filters to search results.""" 

360 filtered_results = [] 

361 

362 for result in results: 

363 # Only process sources that have hierarchical structure (confluence, localfile) 

364 if result.source_type not in ["confluence", "localfile"]: 

365 continue 

366 

367 # Apply depth filter - use folder depth for localfiles 

368 if "depth" in hierarchy_filter: 

369 # For localfiles, calculate depth from file_path folder structure 

370 file_path_val = getattr(result, "file_path", None) 

371 if result.source_type == "localfile" and file_path_val: 

372 # Depth = number of folders before filename 

373 path_parts = [p for p in file_path_val.split("/") if p] 

374 # Depth definition: number of folders before filename minus 1 

375 folder_depth = max(0, len(path_parts) - 2) 

376 if folder_depth != hierarchy_filter["depth"]: 

377 continue 

378 elif ( 

379 hasattr(result, "depth") 

380 and result.depth != hierarchy_filter["depth"] 

381 ): 

382 continue 

383 

384 # Apply parent title filter - for localfiles use parent folder 

385 if "parent_title" in hierarchy_filter: 

386 expected_parent = hierarchy_filter["parent_title"] 

387 if result.source_type == "localfile": 

388 # For localfiles use folder structure only if path available 

389 file_path_val = getattr(result, "file_path", None) 

390 if file_path_val: 

391 path_parts = [p for p in file_path_val.split("/") if p] 

392 parent_folder = path_parts[-2] if len(path_parts) > 1 else "" 

393 if parent_folder != expected_parent: 

394 continue 

395 else: 

396 # Without a path we cannot assert parent folder match; skip 

397 continue 

398 else: 

399 # For non-localfile, use explicit parent_title attr only 

400 parent_title_val = getattr(result, "parent_title", None) 

401 if parent_title_val != expected_parent: 

402 continue 

403 

404 # Apply root only filter 

405 if hierarchy_filter.get("root_only", False): 

406 # For localfiles, check if it's in the root folder 

407 file_path_val = getattr(result, "file_path", None) 

408 if result.source_type == "localfile" and file_path_val: 

409 path_parts = [p for p in file_path_val.split("/") if p] 

410 is_root = len(path_parts) <= 2 # Root folder + filename 

411 if not is_root: 

412 continue 

413 elif not result.is_root_document(): 

414 continue 

415 

416 # Apply has children filter - skip for localfiles as we don't track child relationships 

417 if "has_children" in hierarchy_filter and result.source_type != "localfile": 

418 if result.has_children() != hierarchy_filter["has_children"]: 

419 continue 

420 

421 filtered_results.append(result) 

422 

423 return filtered_results 

424 

425 def _organize_by_hierarchy( 

426 self, results: list[HybridSearchResult] 

427 ) -> dict[str, list[HybridSearchResult]]: 

428 """Organize search results by hierarchy structure.""" 

429 hierarchy_groups = {} 

430 

431 for result in results: 

432 # Group by root ancestor or use the document title if it's a root 

433 file_path_val = getattr(result, "file_path", None) 

434 if result.source_type == "localfile" and file_path_val: 

435 # For localfiles, use top-level folder as root 

436 path_parts = [p for p in file_path_val.split("/") if p] 

437 root_title = path_parts[0] if path_parts else "Root" 

438 elif result.breadcrumb_text: 

439 # Extract the root from breadcrumb 

440 breadcrumb_parts = result.breadcrumb_text.split(" > ") 

441 root_title = ( 

442 breadcrumb_parts[0] if breadcrumb_parts else result.source_title 

443 ) 

444 else: 

445 root_title = result.source_title 

446 

447 if root_title not in hierarchy_groups: 

448 hierarchy_groups[root_title] = [] 

449 hierarchy_groups[root_title].append(result) 

450 

451 # Sort within each group by depth and title 

452 for group in hierarchy_groups.values(): 

453 

454 def sort_key(x): 

455 # Calculate depth for localfiles from folder structure 

456 x_file_path = getattr(x, "file_path", None) 

457 if x.source_type == "localfile" and x_file_path: 

458 folder_depth = len([p for p in x_file_path.split("/") if p]) - 1 

459 return (folder_depth, x.source_title) 

460 else: 

461 return (x.depth or 0, x.source_title) 

462 

463 group.sort(key=sort_key) 

464 

465 return hierarchy_groups 

466 

467 def _apply_attachment_filters( 

468 self, results: list[HybridSearchResult], attachment_filter: dict[str, Any] 

469 ) -> list[HybridSearchResult]: 

470 """Apply attachment-based filters to search results.""" 

471 filtered_results = [] 

472 

473 for result in results: 

474 # Skip non-Confluence results 

475 if result.source_type != "confluence": 

476 continue 

477 

478 # Apply attachments only filter 

479 if "attachments_only" in attachment_filter and not result.is_attachment: 

480 continue 

481 

482 # Apply parent document title filter 

483 if "parent_document_title" in attachment_filter: 

484 if ( 

485 result.parent_document_title 

486 != attachment_filter["parent_document_title"] 

487 ): 

488 continue 

489 

490 # Apply file type filter 

491 if "file_type" in attachment_filter: 

492 result_file_type = result.get_file_type() 

493 if result_file_type != attachment_filter["file_type"]: 

494 continue 

495 

496 # Apply file size filter 

497 _min_size = attachment_filter.get("file_size_min") 

498 if ( 

499 _min_size is not None 

500 and result.file_size is not None 

501 and result.file_size < _min_size 

502 ): 

503 continue 

504 _max_size = attachment_filter.get("file_size_max") 

505 if ( 

506 _max_size is not None 

507 and result.file_size is not None 

508 and result.file_size > _max_size 

509 ): 

510 continue 

511 

512 # Apply author filter 

513 if "author" in attachment_filter: 

514 if result.attachment_author != attachment_filter["author"]: 

515 continue 

516 

517 filtered_results.append(result) 

518 

519 return filtered_results 

520 

521 def _apply_lightweight_attachment_filters( 

522 self, results: list[HybridSearchResult], attachment_filter: dict[str, Any] 

523 ) -> list[HybridSearchResult]: 

524 """Fast filtering optimized for attachment discovery across all sources.""" 

525 filtered_results = [] 

526 

527 for result in results: 

528 # Quick attachment detection - avoid expensive checks 

529 _is_attachment_flag = bool(getattr(result, "is_attachment", False)) 

530 _original_filename = getattr(result, "original_filename", None) 

531 _file_path = getattr(result, "file_path", None) 

532 _is_path_file = ( 

533 isinstance(_file_path, str) 

534 and "." in _file_path 

535 and not _file_path.endswith("/") 

536 ) 

537 is_attachment = ( 

538 _is_attachment_flag or bool(_original_filename) or _is_path_file 

539 ) 

540 

541 if not is_attachment: 

542 continue 

543 

544 # Apply filters with early exits for performance 

545 if attachment_filter.get("attachments_only") and not bool( 

546 getattr(result, "is_attachment", False) 

547 ): 

548 continue 

549 

550 if attachment_filter.get("file_type"): 

551 file_type = self.formatters._extract_file_type_minimal(result) 

552 if file_type != attachment_filter["file_type"]: 

553 continue 

554 

555 # Size filters with null checks (include zero-byte files) 

556 _file_size = getattr(result, "file_size", None) 

557 if ( 

558 attachment_filter.get("file_size_min") is not None 

559 and _file_size is not None 

560 and _file_size < attachment_filter["file_size_min"] 

561 ): 

562 continue 

563 

564 if ( 

565 attachment_filter.get("file_size_max") is not None 

566 and _file_size is not None 

567 and _file_size > attachment_filter["file_size_max"] 

568 ): 

569 continue 

570 

571 # Parent document filter (works across source types) 

572 if attachment_filter.get("parent_document_title"): 

573 parent_title = getattr( 

574 result, "parent_document_title", None 

575 ) or getattr(result, "parent_title", None) 

576 if parent_title != attachment_filter["parent_document_title"]: 

577 continue 

578 

579 # Author filter 

580 if attachment_filter.get("author"): 

581 author = getattr(result, "attachment_author", None) or getattr( 

582 result, "author", None 

583 ) 

584 if author != attachment_filter["author"]: 

585 continue 

586 

587 filtered_results.append(result) 

588 

589 return filtered_results 

590 

591 def _format_lightweight_attachment_text( 

592 self, organized_results: dict[str, list], total_found: int 

593 ) -> str: 

594 """Format attachment results as lightweight text summary.""" 

595 if not organized_results: 

596 return f"📎 **Attachment Search Results**\n\nFound {total_found} attachments. Use the structured data below to navigate and retrieve specific files." 

597 

598 formatted = f"📎 **Attachment Search Results** ({total_found} attachments)\n\n" 

599 

600 for group_name, results in organized_results.items(): 

601 formatted += f"📁 **{group_name}** ({len(results)} files)\n" 

602 

603 # Show first few attachments as examples 

604 for result in results[:3]: 

605 filename = self.formatters._extract_safe_filename(result) 

606 file_type = self.formatters._extract_file_type_minimal(result) 

607 formatted += ( 

608 f" 📄 {filename} ({file_type}) - Score: {result.score:.3f}\n" 

609 ) 

610 

611 if len(results) > 3: 

612 formatted += f" ... and {len(results) - 3} more files\n" 

613 formatted += "\n" 

614 

615 formatted += "💡 **Usage:** Use the structured attachment data to:\n" 

616 formatted += "• Browse attachments by file type or source\n" 

617 formatted += "• Get document IDs for specific file content retrieval\n" 

618 formatted += "• Filter attachments by metadata (size, type, etc.)\n" 

619 

620 return formatted 

621 

622 def _format_lightweight_hierarchy_text( 

623 self, organized_results: dict[str, list], total_found: int 

624 ) -> str: 

625 """Format hierarchy results as lightweight text summary.""" 

626 if not organized_results: 

627 return f"📋 **Hierarchy Search Results**\n\nFound {total_found} documents. Use the structured data below to navigate the hierarchy and retrieve specific documents." 

628 

629 formatted = f"📋 **Hierarchy Search Results** ({total_found} documents)\n\n" 

630 

631 for group_name, results in organized_results.items(): 

632 clean_name = self.formatters._generate_clean_group_name(group_name, results) 

633 formatted += f"📁 **{clean_name}** ({len(results)} documents)\n" 

634 

635 # Show first few documents as examples 

636 for result in results[:3]: 

637 formatted += f" 📄 {result.source_title} (Score: {result.score:.3f})\n" 

638 

639 if len(results) > 3: 

640 formatted += f" ... and {len(results) - 3} more documents\n" 

641 formatted += "\n" 

642 

643 formatted += "💡 **Usage:** Use the structured hierarchy data to:\n" 

644 formatted += "• Browse document groups and navigate hierarchy levels\n" 

645 formatted += "• Get document IDs for specific content retrieval\n" 

646 formatted += "• Understand document relationships and organization\n" 

647 

648 return formatted 

649 

650 async def handle_expand_document( 

651 self, request_id: str | int | None, params: dict[str, Any] 

652 ) -> dict[str, Any]: 

653 """Handle expand document request for lazy loading using standard search format.""" 

654 logger.debug("Handling expand document with params", params=params) 

655 

656 # Validate required parameter 

657 if ( 

658 "document_id" not in params 

659 or params["document_id"] is None 

660 or params["document_id"] == "" 

661 ): 

662 logger.error("Missing required parameter: document_id") 

663 return self.protocol.create_response( 

664 request_id, 

665 error={ 

666 "code": -32602, 

667 "message": "Invalid params", 

668 "data": "Missing required parameter: document_id", 

669 }, 

670 ) 

671 

672 document_id = params["document_id"] 

673 

674 try: 

675 logger.info(f"Expanding document with ID: {document_id}") 

676 

677 # Search for the document - field search doesn't guarantee exact matches 

678 # Try document_id field search first, but get more results to filter 

679 results = await self.search_engine.search( 

680 query=f"document_id:{document_id}", 

681 limit=10, # Get more results to ensure we find the exact match 

682 ) 

683 

684 # Filter for exact document_id matches 

685 exact_matches = [r for r in results if r.document_id == document_id] 

686 if exact_matches: 

687 results = exact_matches[:1] # Take only the first exact match 

688 else: 

689 # Fallback to general search if no exact match in field search 

690 results = await self.search_engine.search(query=document_id, limit=10) 

691 # Filter again for exact document_id matches 

692 exact_matches = [r for r in results if r.document_id == document_id] 

693 if exact_matches: 

694 results = exact_matches[:1] 

695 else: 

696 results = [] 

697 

698 if not results: 

699 logger.warning(f"Document not found with ID: {document_id}") 

700 return self.protocol.create_response( 

701 request_id, 

702 error={ 

703 "code": -32604, 

704 "message": "Document not found", 

705 "data": f"No document found with ID: {document_id}", 

706 }, 

707 ) 

708 

709 logger.info(f"Successfully found document: {results[0].source_title}") 

710 

711 # Use the existing search result formatting - exactly the same as standard search 

712 formatted_results = ( 

713 "Found 1 document:\n\n" 

714 + self.formatters.format_search_result(results[0]) 

715 ) 

716 structured_results_list = self.formatters.create_structured_search_results( 

717 results 

718 ) 

719 

720 # Create the same structure as standard search 

721 structured_results = { 

722 "results": structured_results_list, 

723 "total_found": len(results), 

724 "query_context": { 

725 "original_query": f"expand_document:{document_id}", 

726 "source_types_filtered": [], 

727 "project_ids_filtered": [], 

728 "is_document_expansion": True, 

729 }, 

730 } 

731 

732 return self.protocol.create_response( 

733 request_id, 

734 result={ 

735 "content": [ 

736 { 

737 "type": "text", 

738 "text": formatted_results, 

739 } 

740 ], 

741 "structuredContent": structured_results, 

742 "isError": False, 

743 }, 

744 ) 

745 

746 except Exception as e: 

747 logger.error("Error expanding document", exc_info=True) 

748 return self.protocol.create_response( 

749 request_id, 

750 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

751 )