Coverage for src/qdrant_loader_mcp_server/mcp/handler.py: 90%

286 statements  

ยซ prev     ^ index     ยป next       coverage.py v7.8.2, created at 2025-06-09 09:09 +0000

1"""MCP Handler implementation.""" 

2 

3from typing import Any 

4 

5from ..search.engine import SearchEngine 

6from ..search.models import SearchResult 

7from ..search.processor import QueryProcessor 

8from ..utils import LoggingConfig 

9from .protocol import MCPProtocol 

10 

11# Get logger for this module 

12logger = LoggingConfig.get_logger("src.mcp.handler") 

13 

14 

15class MCPHandler: 

16 """MCP Handler for processing RAG requests.""" 

17 

18 def __init__(self, search_engine: SearchEngine, query_processor: QueryProcessor): 

19 """Initialize MCP Handler.""" 

20 self.protocol = MCPProtocol() 

21 self.search_engine = search_engine 

22 self.query_processor = query_processor 

23 logger.info("MCP Handler initialized") 

24 

25 async def handle_request(self, request: dict[str, Any]) -> dict[str, Any]: 

26 """Handle MCP request. 

27 

28 Args: 

29 request: The request to handle 

30 

31 Returns: 

32 Dict[str, Any]: The response 

33 """ 

34 logger.debug("Handling request", request=request) 

35 

36 # Handle non-dict requests 

37 if not isinstance(request, dict): 

38 logger.error("Request is not a dictionary") 

39 return { 

40 "jsonrpc": "2.0", 

41 "id": None, 

42 "error": { 

43 "code": -32600, 

44 "message": "Invalid Request", 

45 "data": "The request is not a valid JSON-RPC 2.0 request", 

46 }, 

47 } 

48 

49 # Validate request format 

50 if not self.protocol.validate_request(request): 

51 logger.error("Request validation failed") 

52 # For invalid requests, we need to determine if we can extract an ID 

53 request_id = request.get("id") 

54 if request_id is None or not isinstance(request_id, str | int): 

55 request_id = None 

56 return { 

57 "jsonrpc": "2.0", 

58 "id": request_id, 

59 "error": { 

60 "code": -32600, 

61 "message": "Invalid Request", 

62 "data": "The request is not a valid JSON-RPC 2.0 request", 

63 }, 

64 } 

65 

66 method = request.get("method") 

67 params = request.get("params", {}) 

68 request_id = request.get("id") 

69 

70 logger.debug( 

71 "Processing request", method=method, params=params, request_id=request_id 

72 ) 

73 

74 # Handle notifications (requests without id) 

75 if request_id is None: 

76 logger.debug("Handling notification", method=method) 

77 return {} 

78 

79 try: 

80 if method == "initialize": 

81 logger.info("Handling initialize request") 

82 response = await self._handle_initialize(request_id, params) 

83 self.protocol.mark_initialized() 

84 logger.info("Server initialized successfully") 

85 return response 

86 elif method in ["listOfferings", "tools/list"]: 

87 logger.info(f"Handling {method} request") 

88 logger.debug( 

89 f"{method} request details", 

90 method=method, 

91 params=params, 

92 request_id=request_id, 

93 ) 

94 if not isinstance(method, str): 

95 return self.protocol.create_response( 

96 request_id, 

97 error={ 

98 "code": -32600, 

99 "message": "Invalid Request", 

100 "data": "Method must be a string", 

101 }, 

102 ) 

103 response = await self._handle_list_offerings(request_id, params, method) 

104 logger.debug(f"{method} response", response=response) 

105 return response 

106 elif method == "search": 

107 logger.info("Handling search request") 

108 return await self._handle_search(request_id, params) 

109 elif method == "tools/call": 

110 logger.info("Handling tools/call request") 

111 tool_name = params.get("name") 

112 if tool_name == "search": 

113 return await self._handle_search( 

114 request_id, params.get("arguments", {}) 

115 ) 

116 elif tool_name == "hierarchy_search": 

117 return await self._handle_hierarchy_search( 

118 request_id, params.get("arguments", {}) 

119 ) 

120 elif tool_name == "attachment_search": 

121 return await self._handle_attachment_search( 

122 request_id, params.get("arguments", {}) 

123 ) 

124 else: 

125 logger.warning("Unknown tool requested", tool_name=tool_name) 

126 return self.protocol.create_response( 

127 request_id, 

128 error={ 

129 "code": -32601, 

130 "message": "Method not found", 

131 "data": f"Tool '{tool_name}' not found", 

132 }, 

133 ) 

134 else: 

135 logger.warning("Unknown method requested", method=method) 

136 return self.protocol.create_response( 

137 request_id, 

138 error={ 

139 "code": -32601, 

140 "message": "Method not found", 

141 "data": f"Method '{method}' not found", 

142 }, 

143 ) 

144 except Exception as e: 

145 logger.error("Error handling request", exc_info=True) 

146 return self.protocol.create_response( 

147 request_id, 

148 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

149 ) 

150 

151 async def _handle_initialize( 

152 self, request_id: str | int | None, params: dict[str, Any] 

153 ) -> dict[str, Any]: 

154 """Handle initialize request. 

155 

156 Args: 

157 request_id: The ID of the request 

158 params: The parameters of the request 

159 

160 Returns: 

161 Dict[str, Any]: The response 

162 """ 

163 logger.debug("Initializing with params", params=params) 

164 return self.protocol.create_response( 

165 request_id, 

166 result={ 

167 "protocolVersion": "2024-11-05", 

168 "serverInfo": {"name": "Qdrant Loader MCP Server", "version": "1.0.0"}, 

169 "capabilities": {"tools": {"listChanged": False}}, 

170 }, 

171 ) 

172 

173 async def _handle_list_offerings( 

174 self, request_id: str | int | None, params: dict[str, Any], method: str 

175 ) -> dict[str, Any]: 

176 """Handle list offerings request. 

177 

178 Args: 

179 request_id: The ID of the request 

180 params: The parameters of the request 

181 method: The method name from the request 

182 

183 Returns: 

184 Dict[str, Any]: The response 

185 """ 

186 logger.debug("Listing offerings with params", params=params) 

187 

188 # Define the search tool according to MCP specification 

189 search_tool = { 

190 "name": "search", 

191 "description": "Perform semantic search across multiple data sources", 

192 "inputSchema": { 

193 "type": "object", 

194 "properties": { 

195 "query": { 

196 "type": "string", 

197 "description": "The search query in natural language", 

198 }, 

199 "source_types": { 

200 "type": "array", 

201 "items": { 

202 "type": "string", 

203 "enum": [ 

204 "git", 

205 "confluence", 

206 "jira", 

207 "documentation", 

208 "localfile", 

209 ], 

210 }, 

211 "description": "Optional list of source types to filter results", 

212 }, 

213 "project_ids": { 

214 "type": "array", 

215 "items": { 

216 "type": "string", 

217 }, 

218 "description": "Optional list of project IDs to filter results", 

219 }, 

220 "limit": { 

221 "type": "integer", 

222 "description": "Maximum number of results to return", 

223 "default": 5, 

224 }, 

225 }, 

226 "required": ["query"], 

227 }, 

228 } 

229 

230 # Define the hierarchical search tool for Confluence 

231 hierarchy_search_tool = { 

232 "name": "hierarchy_search", 

233 "description": "Search Confluence documents with hierarchy-aware filtering and organization", 

234 "inputSchema": { 

235 "type": "object", 

236 "properties": { 

237 "query": { 

238 "type": "string", 

239 "description": "The search query in natural language", 

240 }, 

241 "hierarchy_filter": { 

242 "type": "object", 

243 "properties": { 

244 "depth": { 

245 "type": "integer", 

246 "description": "Filter by specific hierarchy depth (0 = root pages)", 

247 }, 

248 "parent_title": { 

249 "type": "string", 

250 "description": "Filter by parent page title", 

251 }, 

252 "root_only": { 

253 "type": "boolean", 

254 "description": "Show only root pages (no parent)", 

255 }, 

256 "has_children": { 

257 "type": "boolean", 

258 "description": "Filter by whether pages have children", 

259 }, 

260 }, 

261 }, 

262 "organize_by_hierarchy": { 

263 "type": "boolean", 

264 "description": "Group results by hierarchy structure", 

265 "default": False, 

266 }, 

267 "limit": { 

268 "type": "integer", 

269 "description": "Maximum number of results to return", 

270 "default": 10, 

271 }, 

272 }, 

273 "required": ["query"], 

274 }, 

275 } 

276 

277 # Define the attachment search tool 

278 attachment_search_tool = { 

279 "name": "attachment_search", 

280 "description": "Search for file attachments and their parent documents across Confluence, Jira, and other sources", 

281 "inputSchema": { 

282 "type": "object", 

283 "properties": { 

284 "query": { 

285 "type": "string", 

286 "description": "The search query in natural language", 

287 }, 

288 "attachment_filter": { 

289 "type": "object", 

290 "properties": { 

291 "attachments_only": { 

292 "type": "boolean", 

293 "description": "Show only file attachments", 

294 }, 

295 "parent_document_title": { 

296 "type": "string", 

297 "description": "Filter by parent document title", 

298 }, 

299 "file_type": { 

300 "type": "string", 

301 "description": "Filter by file type (e.g., 'pdf', 'xlsx', 'png')", 

302 }, 

303 "file_size_min": { 

304 "type": "integer", 

305 "description": "Minimum file size in bytes", 

306 }, 

307 "file_size_max": { 

308 "type": "integer", 

309 "description": "Maximum file size in bytes", 

310 }, 

311 "author": { 

312 "type": "string", 

313 "description": "Filter by attachment author", 

314 }, 

315 }, 

316 }, 

317 "include_parent_context": { 

318 "type": "boolean", 

319 "description": "Include parent document information in results", 

320 "default": True, 

321 }, 

322 "limit": { 

323 "type": "integer", 

324 "description": "Maximum number of results to return", 

325 "default": 10, 

326 }, 

327 }, 

328 "required": ["query"], 

329 }, 

330 } 

331 

332 # If the method is tools/list, return the tools array with nextCursor 

333 if method == "tools/list": 

334 return self.protocol.create_response( 

335 request_id, 

336 result={ 

337 "tools": [ 

338 search_tool, 

339 hierarchy_search_tool, 

340 attachment_search_tool, 

341 ] 

342 # Omit nextCursor when there are no more results 

343 }, 

344 ) 

345 

346 # Otherwise return the full offerings structure 

347 return self.protocol.create_response( 

348 request_id, 

349 result={ 

350 "offerings": [ 

351 { 

352 "id": "qdrant-loader", 

353 "name": "Qdrant Loader", 

354 "description": "Load data into Qdrant vector database", 

355 "version": "1.0.0", 

356 "tools": [ 

357 search_tool, 

358 hierarchy_search_tool, 

359 attachment_search_tool, 

360 ], 

361 "resources": [], 

362 "resourceTemplates": [], 

363 } 

364 ] 

365 }, 

366 ) 

367 

368 async def _handle_search( 

369 self, request_id: str | int | None, params: dict[str, Any] 

370 ) -> dict[str, Any]: 

371 """Handle search request. 

372 

373 Args: 

374 request_id: The ID of the request 

375 params: The parameters of the request 

376 

377 Returns: 

378 Dict[str, Any]: The response 

379 """ 

380 logger.debug("Handling search request with params", params=params) 

381 

382 # Validate required parameters 

383 if "query" not in params: 

384 logger.error("Missing required parameter: query") 

385 return self.protocol.create_response( 

386 request_id, 

387 error={ 

388 "code": -32602, 

389 "message": "Invalid params", 

390 "data": "Missing required parameter: query", 

391 }, 

392 ) 

393 

394 # Extract parameters with defaults 

395 query = params["query"] 

396 source_types = params.get("source_types", []) 

397 project_ids = params.get("project_ids", []) 

398 limit = params.get("limit", 10) 

399 

400 logger.info( 

401 "Processing search request", 

402 query=query, 

403 source_types=source_types, 

404 project_ids=project_ids, 

405 limit=limit, 

406 ) 

407 

408 try: 

409 # Process the query 

410 logger.debug("Processing query with OpenAI") 

411 processed_query = await self.query_processor.process_query(query) 

412 logger.debug( 

413 "Query processed successfully", processed_query=processed_query 

414 ) 

415 

416 # Perform the search 

417 logger.debug("Executing search in Qdrant") 

418 results = await self.search_engine.search( 

419 query=processed_query["query"], 

420 source_types=source_types, 

421 project_ids=project_ids, 

422 limit=limit, 

423 ) 

424 logger.info( 

425 "Search completed successfully", 

426 result_count=len(results), 

427 first_result_score=results[0].score if results else None, 

428 ) 

429 

430 # Format the response 

431 response = self.protocol.create_response( 

432 request_id, 

433 result={ 

434 "content": [ 

435 { 

436 "type": "text", 

437 "text": f"Found {len(results)} results:\n\n" 

438 + "\n\n".join( 

439 self._format_search_result(result) for result in results 

440 ), 

441 } 

442 ], 

443 "isError": False, 

444 }, 

445 ) 

446 logger.debug("Search response formatted successfully") 

447 return response 

448 

449 except Exception as e: 

450 logger.error("Error during search", exc_info=True) 

451 return self.protocol.create_response( 

452 request_id, 

453 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

454 ) 

455 

456 def _format_search_result(self, result: SearchResult) -> str: 

457 """Format a search result for display.""" 

458 formatted_result = f"Score: {result.score}\n" 

459 formatted_result += f"Text: {result.text}\n" 

460 formatted_result += f"Source: {result.source_type}" 

461 

462 if result.source_title: 

463 formatted_result += f" - {result.source_title}" 

464 

465 # Add project information if available 

466 project_info = result.get_project_info() 

467 if project_info: 

468 formatted_result += f"\n๐Ÿ—๏ธ {project_info}" 

469 

470 # Add attachment information if this is a file attachment 

471 if result.is_attachment: 

472 formatted_result += f"\n๐Ÿ“Ž Attachment" 

473 if result.original_filename: 

474 formatted_result += f": {result.original_filename}" 

475 if result.attachment_context: 

476 formatted_result += f"\n๐Ÿ“‹ {result.attachment_context}" 

477 if result.parent_document_title: 

478 formatted_result += f"\n๐Ÿ“„ Attached to: {result.parent_document_title}" 

479 

480 # Add hierarchy context for Confluence documents 

481 if result.source_type == "confluence" and result.breadcrumb_text: 

482 formatted_result += f"\n๐Ÿ“ Path: {result.breadcrumb_text}" 

483 

484 if result.source_url: 

485 formatted_result += f" ({result.source_url})" 

486 

487 if result.file_path: 

488 formatted_result += f"\nFile: {result.file_path}" 

489 

490 if result.repo_name: 

491 formatted_result += f"\nRepo: {result.repo_name}" 

492 

493 # Add hierarchy information for Confluence documents 

494 if result.source_type == "confluence" and result.hierarchy_context: 

495 formatted_result += f"\n๐Ÿ—๏ธ {result.hierarchy_context}" 

496 

497 # Add parent information if available (for hierarchy, not attachments) 

498 if result.parent_title and not result.is_attachment: 

499 formatted_result += f"\nโฌ†๏ธ Parent: {result.parent_title}" 

500 

501 # Add children count if available 

502 if result.has_children(): 

503 formatted_result += f"\nโฌ‡๏ธ Children: {result.children_count}" 

504 

505 return formatted_result 

506 

507 async def _handle_hierarchy_search( 

508 self, request_id: str | int | None, params: dict[str, Any] 

509 ) -> dict[str, Any]: 

510 """Handle hierarchical search request for Confluence documents. 

511 

512 Args: 

513 request_id: The ID of the request 

514 params: The parameters of the request 

515 

516 Returns: 

517 Dict[str, Any]: The response 

518 """ 

519 logger.debug("Handling hierarchy search request with params", params=params) 

520 

521 # Validate required parameters 

522 if "query" not in params: 

523 logger.error("Missing required parameter: query") 

524 return self.protocol.create_response( 

525 request_id, 

526 error={ 

527 "code": -32602, 

528 "message": "Invalid params", 

529 "data": "Missing required parameter: query", 

530 }, 

531 ) 

532 

533 # Extract parameters with defaults 

534 query = params["query"] 

535 hierarchy_filter = params.get("hierarchy_filter", {}) 

536 organize_by_hierarchy = params.get("organize_by_hierarchy", False) 

537 limit = params.get("limit", 10) 

538 

539 logger.info( 

540 "Processing hierarchy search request", 

541 query=query, 

542 hierarchy_filter=hierarchy_filter, 

543 organize_by_hierarchy=organize_by_hierarchy, 

544 limit=limit, 

545 ) 

546 

547 try: 

548 # Process the query 

549 logger.debug("Processing query with OpenAI") 

550 processed_query = await self.query_processor.process_query(query) 

551 logger.debug( 

552 "Query processed successfully", processed_query=processed_query 

553 ) 

554 

555 # Perform the search (Confluence only for hierarchy) 

556 logger.debug("Executing hierarchy search in Qdrant") 

557 results = await self.search_engine.search( 

558 query=processed_query["query"], 

559 source_types=["confluence"], # Only search Confluence for hierarchy 

560 limit=limit * 2, # Get more results to filter 

561 ) 

562 

563 # Apply hierarchy filters 

564 filtered_results = self._apply_hierarchy_filters(results, hierarchy_filter) 

565 

566 # Limit results after filtering 

567 filtered_results = filtered_results[:limit] 

568 

569 # Organize results if requested 

570 if organize_by_hierarchy: 

571 organized_results = self._organize_by_hierarchy(filtered_results) 

572 response_text = self._format_hierarchical_results(organized_results) 

573 else: 

574 response_text = ( 

575 f"Found {len(filtered_results)} results:\n\n" 

576 + "\n\n".join( 

577 self._format_search_result(result) 

578 for result in filtered_results 

579 ) 

580 ) 

581 

582 logger.info( 

583 "Hierarchy search completed successfully", 

584 result_count=len(filtered_results), 

585 first_result_score=( 

586 filtered_results[0].score if filtered_results else None 

587 ), 

588 ) 

589 

590 # Format the response 

591 response = self.protocol.create_response( 

592 request_id, 

593 result={ 

594 "content": [ 

595 { 

596 "type": "text", 

597 "text": response_text, 

598 } 

599 ], 

600 "isError": False, 

601 }, 

602 ) 

603 logger.debug("Hierarchy search response formatted successfully") 

604 return response 

605 

606 except Exception as e: 

607 logger.error("Error during hierarchy search", exc_info=True) 

608 return self.protocol.create_response( 

609 request_id, 

610 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

611 ) 

612 

613 def _apply_hierarchy_filters( 

614 self, results: list[SearchResult], hierarchy_filter: dict[str, Any] 

615 ) -> list[SearchResult]: 

616 """Apply hierarchy-based filters to search results.""" 

617 filtered_results = [] 

618 

619 for result in results: 

620 # Skip non-Confluence results 

621 if result.source_type != "confluence": 

622 continue 

623 

624 # Apply depth filter 

625 if "depth" in hierarchy_filter: 

626 if result.depth != hierarchy_filter["depth"]: 

627 continue 

628 

629 # Apply parent title filter 

630 if "parent_title" in hierarchy_filter: 

631 if result.parent_title != hierarchy_filter["parent_title"]: 

632 continue 

633 

634 # Apply root only filter 

635 if hierarchy_filter.get("root_only", False): 

636 if not result.is_root_document(): 

637 continue 

638 

639 # Apply has children filter 

640 if "has_children" in hierarchy_filter: 

641 if result.has_children() != hierarchy_filter["has_children"]: 

642 continue 

643 

644 filtered_results.append(result) 

645 

646 return filtered_results 

647 

648 def _organize_by_hierarchy( 

649 self, results: list[SearchResult] 

650 ) -> dict[str, list[SearchResult]]: 

651 """Organize search results by hierarchy structure.""" 

652 hierarchy_groups = {} 

653 

654 for result in results: 

655 # Group by root ancestor or use the document title if it's a root 

656 if result.breadcrumb_text: 

657 # Extract the root from breadcrumb 

658 breadcrumb_parts = result.breadcrumb_text.split(" > ") 

659 root_title = ( 

660 breadcrumb_parts[0] if breadcrumb_parts else result.source_title 

661 ) 

662 else: 

663 root_title = result.source_title 

664 

665 if root_title not in hierarchy_groups: 

666 hierarchy_groups[root_title] = [] 

667 hierarchy_groups[root_title].append(result) 

668 

669 # Sort within each group by depth and title 

670 for group in hierarchy_groups.values(): 

671 group.sort(key=lambda x: (x.depth or 0, x.source_title)) 

672 

673 return hierarchy_groups 

674 

675 def _format_hierarchical_results( 

676 self, organized_results: dict[str, list[SearchResult]] 

677 ) -> str: 

678 """Format hierarchically organized results for display.""" 

679 formatted_sections = [] 

680 

681 for root_title, results in organized_results.items(): 

682 section = f"๐Ÿ“ **{root_title}** ({len(results)} results)\n" 

683 

684 for result in results: 

685 indent = " " * (result.depth or 0) 

686 section += f"{indent}๐Ÿ“„ {result.source_title}" 

687 if result.hierarchy_context: 

688 section += f" | {result.hierarchy_context}" 

689 section += f" (Score: {result.score:.3f})\n" 

690 

691 # Add a snippet of the content 

692 content_snippet = ( 

693 result.text[:150] + "..." if len(result.text) > 150 else result.text 

694 ) 

695 section += f"{indent} {content_snippet}\n" 

696 

697 if result.source_url: 

698 section += f"{indent} ๐Ÿ”— {result.source_url}\n" 

699 section += "\n" 

700 

701 formatted_sections.append(section) 

702 

703 return ( 

704 f"Found {sum(len(results) for results in organized_results.values())} results organized by hierarchy:\n\n" 

705 + "\n".join(formatted_sections) 

706 ) 

707 

708 async def _handle_attachment_search( 

709 self, request_id: str | int | None, params: dict[str, Any] 

710 ) -> dict[str, Any]: 

711 """Handle attachment search request. 

712 

713 Args: 

714 request_id: The ID of the request 

715 params: The parameters of the request 

716 

717 Returns: 

718 Dict[str, Any]: The response 

719 """ 

720 logger.debug("Handling attachment search request with params", params=params) 

721 

722 # Validate required parameters 

723 if "query" not in params: 

724 logger.error("Missing required parameter: query") 

725 return self.protocol.create_response( 

726 request_id, 

727 error={ 

728 "code": -32602, 

729 "message": "Invalid params", 

730 "data": "Missing required parameter: query", 

731 }, 

732 ) 

733 

734 # Extract parameters with defaults 

735 query = params["query"] 

736 attachment_filter = params.get("attachment_filter", {}) 

737 include_parent_context = params.get("include_parent_context", True) 

738 limit = params.get("limit", 10) 

739 

740 logger.info( 

741 "Processing attachment search request", 

742 query=query, 

743 attachment_filter=attachment_filter, 

744 include_parent_context=include_parent_context, 

745 limit=limit, 

746 ) 

747 

748 try: 

749 # Process the query 

750 logger.debug("Processing query with OpenAI") 

751 processed_query = await self.query_processor.process_query(query) 

752 logger.debug( 

753 "Query processed successfully", processed_query=processed_query 

754 ) 

755 

756 # Perform the search 

757 logger.debug("Executing attachment search in Qdrant") 

758 results = await self.search_engine.search( 

759 query=processed_query["query"], 

760 source_types=None, # Search all sources for attachments 

761 limit=limit * 2, # Get more results to filter 

762 ) 

763 

764 # Apply attachment filters 

765 filtered_results = self._apply_attachment_filters( 

766 results, attachment_filter 

767 ) 

768 

769 # Limit results after filtering 

770 filtered_results = filtered_results[:limit] 

771 

772 logger.info( 

773 "Attachment search completed successfully", 

774 result_count=len(filtered_results), 

775 first_result_score=( 

776 filtered_results[0].score if filtered_results else None 

777 ), 

778 ) 

779 

780 # Format the response 

781 response_text = f"Found {len(filtered_results)} results:\n\n" + "\n\n".join( 

782 self._format_attachment_search_result(result) 

783 for result in filtered_results 

784 ) 

785 

786 response = self.protocol.create_response( 

787 request_id, 

788 result={ 

789 "content": [ 

790 { 

791 "type": "text", 

792 "text": response_text, 

793 } 

794 ], 

795 "isError": False, 

796 }, 

797 ) 

798 logger.debug("Attachment search response formatted successfully") 

799 return response 

800 

801 except Exception as e: 

802 logger.error("Error during attachment search", exc_info=True) 

803 return self.protocol.create_response( 

804 request_id, 

805 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

806 ) 

807 

808 def _apply_attachment_filters( 

809 self, results: list[SearchResult], attachment_filter: dict[str, Any] 

810 ) -> list[SearchResult]: 

811 """Apply attachment-based filters to search results.""" 

812 filtered_results = [] 

813 

814 for result in results: 

815 # Skip non-Confluence results 

816 if result.source_type != "confluence": 

817 continue 

818 

819 # Apply attachments only filter 

820 if "attachments_only" in attachment_filter and not result.is_attachment: 

821 continue 

822 

823 # Apply parent document title filter 

824 if "parent_document_title" in attachment_filter: 

825 if ( 

826 result.parent_document_title 

827 != attachment_filter["parent_document_title"] 

828 ): 

829 continue 

830 

831 # Apply file type filter 

832 if "file_type" in attachment_filter: 

833 result_file_type = result.get_file_type() 

834 if result_file_type != attachment_filter["file_type"]: 

835 continue 

836 

837 # Apply file size filter 

838 if ( 

839 "file_size_min" in attachment_filter 

840 and result.file_size 

841 and result.file_size < attachment_filter["file_size_min"] 

842 ): 

843 continue 

844 if ( 

845 "file_size_max" in attachment_filter 

846 and result.file_size 

847 and result.file_size > attachment_filter["file_size_max"] 

848 ): 

849 continue 

850 

851 # Apply author filter 

852 if "author" in attachment_filter: 

853 if result.attachment_author != attachment_filter["author"]: 

854 continue 

855 

856 filtered_results.append(result) 

857 

858 return filtered_results 

859 

860 def _format_attachment_search_result(self, result: SearchResult) -> str: 

861 """Format an attachment search result for display.""" 

862 formatted_result = f"Score: {result.score}\n" 

863 formatted_result += f"Text: {result.text}\n" 

864 formatted_result += f"Source: {result.source_type}" 

865 

866 if result.source_title: 

867 formatted_result += f" - {result.source_title}" 

868 

869 # Add attachment information 

870 formatted_result += f"\n๐Ÿ“Ž Attachment" 

871 if result.original_filename: 

872 formatted_result += f": {result.original_filename}" 

873 if result.attachment_context: 

874 formatted_result += f"\n๐Ÿ“‹ {result.attachment_context}" 

875 if result.parent_document_title: 

876 formatted_result += f"\n๐Ÿ“„ Attached to: {result.parent_document_title}" 

877 

878 # Add hierarchy context for Confluence documents 

879 if result.source_type == "confluence" and result.breadcrumb_text: 

880 formatted_result += f"\n๐Ÿ“ Path: {result.breadcrumb_text}" 

881 

882 if result.source_url: 

883 formatted_result += f" ({result.source_url})" 

884 

885 if result.file_path: 

886 formatted_result += f"\nFile: {result.file_path}" 

887 

888 if result.repo_name: 

889 formatted_result += f"\nRepo: {result.repo_name}" 

890 

891 # Add hierarchy information for Confluence documents 

892 if result.source_type == "confluence" and result.hierarchy_context: 

893 formatted_result += f"\n๐Ÿ—๏ธ {result.hierarchy_context}" 

894 

895 # Add parent information if available (for hierarchy, not attachments) 

896 if result.parent_title and not result.is_attachment: 

897 formatted_result += f"\nโฌ†๏ธ Parent: {result.parent_title}" 

898 

899 # Add children count if available 

900 if result.has_children(): 

901 formatted_result += f"\nโฌ‡๏ธ Children: {result.children_count}" 

902 

903 return formatted_result