Coverage for src/qdrant_loader_mcp_server/mcp/formatters.py: 83%

602 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Response formatters for MCP server.""" 

2 

3from typing import Any 

4 

5from ..search.components.search_result_models import HybridSearchResult 

6 

7 

8class MCPFormatters: 

9 """Response formatters for MCP server.""" 

10 

11 @staticmethod 

12 def format_search_result(result: HybridSearchResult) -> str: 

13 """Format a search result for display.""" 

14 formatted_result = f"Score: {result.score}\n" 

15 formatted_result += f"Text: {result.text}\n" 

16 formatted_result += f"Source: {result.source_type}" 

17 

18 if result.source_title: 

19 formatted_result += f" - {result.source_title}" 

20 

21 # Add project information if available 

22 project_info = result.get_project_info() 

23 if project_info: 

24 formatted_result += f"\n🏗️ {project_info}" 

25 

26 # Add attachment information if this is a file attachment 

27 if result.is_attachment: 

28 formatted_result += "\n📎 Attachment" 

29 if result.original_filename: 

30 formatted_result += f": {result.original_filename}" 

31 if result.attachment_context: 

32 formatted_result += f"\n📋 {result.attachment_context}" 

33 if result.parent_document_title: 

34 formatted_result += f"\n📄 Attached to: {result.parent_document_title}" 

35 

36 # Add hierarchy context for Confluence documents 

37 if result.source_type == "confluence" and result.breadcrumb_text: 

38 formatted_result += f"\n📍 Path: {result.breadcrumb_text}" 

39 

40 if result.source_url: 

41 formatted_result += f" ({result.source_url})" 

42 

43 if result.file_path: 

44 formatted_result += f"\nFile: {result.file_path}" 

45 

46 if result.repo_name: 

47 formatted_result += f"\nRepo: {result.repo_name}" 

48 

49 # Add hierarchy information for Confluence documents 

50 if result.source_type == "confluence" and result.hierarchy_context: 

51 formatted_result += f"\n🏗️ {result.hierarchy_context}" 

52 

53 # Add parent information if available (for hierarchy, not attachments) 

54 if result.parent_title and not result.is_attachment: 

55 formatted_result += f"\n⬆️ Parent: {result.parent_title}" 

56 

57 # Add children count if available 

58 if result.has_children(): 

59 formatted_result += f"\n⬇️ Children: {result.children_count}" 

60 

61 return formatted_result 

62 

63 @staticmethod 

64 def format_attachment_search_result(result: HybridSearchResult) -> str: 

65 """Format an attachment search result for display.""" 

66 formatted_result = f"Score: {result.score}\n" 

67 formatted_result += f"Text: {result.text}\n" 

68 formatted_result += f"Source: {result.source_type}" 

69 

70 if result.source_title: 

71 formatted_result += f" - {result.source_title}" 

72 

73 # Add attachment information 

74 formatted_result += "\n📎 Attachment" 

75 if result.original_filename: 

76 formatted_result += f": {result.original_filename}" 

77 if result.attachment_context: 

78 formatted_result += f"\n📋 {result.attachment_context}" 

79 if result.parent_document_title: 

80 formatted_result += f"\n📄 Attached to: {result.parent_document_title}" 

81 

82 # Add hierarchy context for Confluence documents 

83 if result.source_type == "confluence" and result.breadcrumb_text: 

84 formatted_result += f"\n📍 Path: {result.breadcrumb_text}" 

85 

86 if result.source_url: 

87 formatted_result += f" ({result.source_url})" 

88 

89 if result.file_path: 

90 formatted_result += f"\nFile: {result.file_path}" 

91 

92 if result.repo_name: 

93 formatted_result += f"\nRepo: {result.repo_name}" 

94 

95 # Add hierarchy information for Confluence documents 

96 if result.source_type == "confluence" and result.hierarchy_context: 

97 formatted_result += f"\n🏗️ {result.hierarchy_context}" 

98 

99 # Add parent information if available (for hierarchy, not attachments) 

100 if result.parent_title and not result.is_attachment: 

101 formatted_result += f"\n⬆️ Parent: {result.parent_title}" 

102 

103 # Add children count if available 

104 if result.has_children(): 

105 formatted_result += f"\n⬇️ Children: {result.children_count}" 

106 

107 return formatted_result 

108 

109 @staticmethod 

110 def format_hierarchical_results( 

111 organized_results: dict[str, list[HybridSearchResult]], 

112 ) -> str: 

113 """Format hierarchically organized results for display.""" 

114 formatted_sections = [] 

115 

116 for root_title, results in organized_results.items(): 

117 section = f"📁 **{root_title}** ({len(results)} results)\n" 

118 

119 for result in results: 

120 indent = " " * (result.depth or 0) 

121 section += f"{indent}📄 {result.source_title}" 

122 if result.hierarchy_context: 

123 section += f" | {result.hierarchy_context}" 

124 section += f" (Score: {result.score:.3f})\n" 

125 

126 # Add a snippet of the content 

127 content_snippet = ( 

128 result.text[:150] + "..." if len(result.text) > 150 else result.text 

129 ) 

130 section += f"{indent} {content_snippet}\n" 

131 

132 if result.source_url: 

133 section += f"{indent} 🔗 {result.source_url}\n" 

134 section += "\n" 

135 

136 formatted_sections.append(section) 

137 

138 return ( 

139 f"Found {sum(len(results) for results in organized_results.values())} results organized by hierarchy:\n\n" 

140 + "\n".join(formatted_sections) 

141 ) 

142 

143 @staticmethod 

144 def format_relationship_analysis(analysis: dict[str, Any]) -> str: 

145 """Format document relationship analysis for display.""" 

146 if "error" in analysis: 

147 return f"❌ Error: {analysis['error']}" 

148 

149 summary = analysis.get("summary", {}) 

150 formatted = f"""🔍 **Document Relationship Analysis** 

151 

152📊 **Summary:** 

153• Total Documents: {summary.get('total_documents', 0)} 

154• Clusters Found: {summary.get('clusters_found', 0)} 

155• Citation Relationships: {summary.get('citation_relationships', 0)} 

156• Conflicts Detected: {summary.get('conflicts_detected', 0)} 

157 

158🏷️ **Query Information:** 

159• Original Query: {analysis.get('query_metadata', {}).get('original_query', 'N/A')} 

160• Documents Analyzed: {analysis.get('query_metadata', {}).get('document_count', 0)} 

161""" 

162 

163 clusters = analysis.get("document_clusters", []) 

164 if clusters: 

165 formatted += "\n🗂️ **Document Clusters:**\n" 

166 for i, cluster in enumerate(clusters[:3], 1): # Show first 3 clusters 

167 formatted += ( 

168 f"• Cluster {i}: {len(cluster.get('documents', []))} documents\n" 

169 ) 

170 

171 conflicts = analysis.get("conflict_analysis", {}).get("conflicting_pairs", []) 

172 if conflicts: 

173 formatted += f"\n⚠️ **Conflicts Detected:** {len(conflicts)} conflicting document pairs\n" 

174 

175 return formatted 

176 

177 @staticmethod 

178 def format_similar_documents(similar_docs: list[dict[str, Any]]) -> str: 

179 """Format similar documents results for display.""" 

180 if not similar_docs: 

181 return "🔍 **Similar Documents**\n\nNo similar documents found." 

182 

183 formatted = f"🔍 **Similar Documents** ({len(similar_docs)} found)\n\n" 

184 

185 for i, doc_info in enumerate(similar_docs[:5], 1): # Show top 5 

186 score = doc_info.get("similarity_score", 0) 

187 document = doc_info.get("document", {}) 

188 reasons = doc_info.get("similarity_reasons", []) 

189 

190 formatted += f"**{i}. Similarity Score: {score:.3f}**\n" 

191 if hasattr(document, "source_title"): 

192 formatted += f"• Title: {document.source_title}\n" 

193 if reasons: 

194 formatted += f"• Reasons: {', '.join(reasons)}\n" 

195 formatted += "\n" 

196 

197 return formatted 

198 

199 @staticmethod 

200 def create_lightweight_similar_documents_results( 

201 similar_docs: list[dict[str, Any]], 

202 target_query: str = "", 

203 comparison_query: str = "", 

204 ) -> dict[str, Any]: 

205 """Return minimal similar documents data for fast navigation.""" 

206 

207 # Create similarity index with minimal data 

208 similarity_index: list[dict[str, Any]] = [] 

209 for doc_info in similar_docs: 

210 document = doc_info.get("document", {}) 

211 fields = MCPFormatters._extract_minimal_doc_fields( 

212 document, context=doc_info 

213 ) 

214 

215 similarity_index.append( 

216 { 

217 "document_id": fields["document_id"], 

218 "title": fields["title"], 

219 "similarity_score": doc_info.get("similarity_score", 0.0), 

220 "similarity_info": { 

221 "metric_scores": doc_info.get("metric_scores", {}), 

222 "similarity_reasons": doc_info.get("similarity_reasons", []), 

223 "source_type": fields["source_type"], 

224 }, 

225 "navigation_hints": { 

226 "can_expand": True, 

227 "has_content": fields["text_length"] > 0, 

228 "content_length": fields["text_length"], 

229 "expand_tool": "expand_document", # Unified expand tool 

230 }, 

231 } 

232 ) 

233 

234 # Extract similarity metrics used 

235 metrics_used: list[str] = [] 

236 if similar_docs: 

237 first_doc_metrics = similar_docs[0].get("metric_scores", {}) 

238 metrics_used = list(first_doc_metrics.keys()) 

239 

240 return { 

241 "similarity_index": similarity_index, 

242 "query_info": { 

243 "target_query": target_query, 

244 "comparison_query": comparison_query, 

245 "total_found": len(similarity_index), 

246 "metrics_used": metrics_used, 

247 }, 

248 "navigation": { 

249 "supports_lazy_loading": True, 

250 "expand_document_tool": "expand_document", 

251 "sort_order": "similarity_desc", 

252 "max_displayed": len(similarity_index), 

253 }, 

254 } 

255 

256 @staticmethod 

257 def _extract_minimal_doc_fields( 

258 document: Any, context: dict[str, Any] | None = None 

259 ) -> dict[str, Any]: 

260 """Extract minimal document fields from either an object or a dict. 

261 

262 Returns a dict with keys: document_id, title, source_type, text_length. 

263 """ 

264 # Defaults 

265 doc_id = "" 

266 title = "Untitled" 

267 source_type = "unknown" 

268 text_length = 0 

269 

270 # Object-like document (e.g., HybridSearchResult) 

271 if document is not None and not isinstance(document, dict): 

272 doc_id = getattr(document, "document_id", None) or "" 

273 title = getattr(document, "source_title", None) or title 

274 source_type = getattr(document, "source_type", None) or source_type 

275 text_val = getattr(document, "text", None) 

276 else: 

277 # Dict-like document 

278 doc_id = (document or {}).get("document_id") or (context or {}).get( 

279 "document_id", "" 

280 ) 

281 title = ( 

282 (document or {}).get("source_title") 

283 or (document or {}).get("title") 

284 or title 

285 ) 

286 source_type = (document or {}).get("source_type") or source_type 

287 text_val = (document or {}).get("text", None) 

288 

289 # Compute text_length robustly 

290 if isinstance(text_val, str): 

291 text_length = len(text_val) 

292 elif isinstance(text_val, int | float): 

293 # Treat numeric and all non-string types as missing content 

294 text_length = 0 

295 

296 return { 

297 "document_id": doc_id, 

298 "title": title, 

299 "source_type": source_type, 

300 "text_length": text_length, 

301 } 

302 

303 @staticmethod 

304 def format_conflict_analysis(conflicts: dict[str, Any]) -> str: 

305 """Format conflict analysis results for display.""" 

306 conflicting_pairs = conflicts.get("conflicting_pairs", []) 

307 

308 if not conflicting_pairs: 

309 return ( 

310 "✅ **Conflict Analysis**\n\nNo conflicts detected between documents." 

311 ) 

312 

313 formatted = ( 

314 f"⚠️ **Conflict Analysis** ({len(conflicting_pairs)} conflicts found)\n\n" 

315 ) 

316 

317 for i, (doc1, doc2, conflict_info) in enumerate(conflicting_pairs[:5], 1): 

318 conflict_type = conflict_info.get("type", "unknown") 

319 formatted += f"**{i}. Conflict Type: {conflict_type}**\n" 

320 formatted += f"• Document 1: {doc1}\n" 

321 formatted += f"• Document 2: {doc2}\n\n" 

322 

323 suggestions = conflicts.get("resolution_suggestions", {}) 

324 if suggestions: 

325 formatted += "💡 **Resolution Suggestions:**\n" 

326 # Convert dict values to list and take first 3 

327 suggestion_list = list(suggestions.values())[:3] 

328 for suggestion in suggestion_list: 

329 formatted += f"{suggestion}\n" 

330 

331 # Append detector/runtime stats if available for transparency 

332 qmeta = conflicts.get("query_metadata", {}) or {} 

333 stats = qmeta.get("detector_stats", {}) or {} 

334 if stats: 

335 pairs_considered = stats.get("pairs_considered") 

336 pairs_analyzed = stats.get("pairs_analyzed") 

337 llm_pairs = stats.get("llm_pairs") 

338 elapsed_ms = stats.get("elapsed_ms") 

339 partial = qmeta.get("partial_results") or stats.get("partial_results") 

340 

341 formatted += "\n\n🧪 **Analysis Stats:**\n" 

342 if pairs_considered is not None: 

343 formatted += f"• Pairs considered: {pairs_considered}\n" 

344 if pairs_analyzed is not None: 

345 formatted += f"• Pairs analyzed: {pairs_analyzed}\n" 

346 if llm_pairs is not None: 

347 formatted += f"• LLM pairs: {llm_pairs}\n" 

348 if elapsed_ms is not None: 

349 try: 

350 formatted += f"• Elapsed: {float(elapsed_ms):.0f} ms\n" 

351 except Exception: 

352 formatted += f"• Elapsed: {elapsed_ms} ms\n" 

353 if partial: 

354 formatted += "• Partial results due to time budget\n" 

355 

356 return formatted 

357 

358 @staticmethod 

359 def create_lightweight_conflict_results( 

360 conflicts: dict[str, Any], query: str = "", documents: list = None 

361 ) -> dict[str, Any]: 

362 """Create lightweight conflict results for fast navigation and lazy loading.""" 

363 

364 conflicting_pairs = conflicts.get("conflicting_pairs", []) 

365 

366 # Create conflict index with comprehensive information 

367 conflict_index = [] 

368 involved_document_ids = set() 

369 

370 for i, (doc1_id, doc2_id, conflict_info) in enumerate(conflicting_pairs): 

371 conflict_id = f"conf_{i+1:03d}" 

372 

373 # Extract document titles from IDs or use the IDs themselves 

374 title_1 = doc1_id.split(":", 1)[-1] if ":" in doc1_id else doc1_id 

375 title_2 = doc2_id.split(":", 1)[-1] if ":" in doc2_id else doc2_id 

376 

377 # Extract conflicting statements using centralized helper 

378 conflicting_statements = MCPFormatters._extract_conflicting_statements( 

379 conflict_info 

380 ) 

381 

382 # Create rich conflict entry with comprehensive information 

383 conflict_entry = { 

384 "conflict_id": conflict_id, 

385 "document_1_id": doc1_id, 

386 "document_2_id": doc2_id, 

387 "conflict_type": conflict_info.get("type", "unknown"), 

388 "confidence_score": round(conflict_info.get("confidence", 0.0), 3), 

389 "title_1": title_1[:100] + "..." if len(title_1) > 100 else title_1, 

390 "title_2": title_2[:100] + "..." if len(title_2) > 100 else title_2, 

391 "summary": conflict_info.get( 

392 "description", "Potential conflict detected" 

393 ), 

394 "detailed_description": conflict_info.get( 

395 "description", 

396 "Documents contain contradictory or inconsistent information", 

397 ), 

398 "resolution_suggestion": MCPFormatters._generate_conflict_resolution_suggestion( 

399 conflict_info 

400 ), 

401 "conflict_indicators": conflict_info.get("indicators", []), 

402 "conflicting_statements": conflicting_statements, 

403 "analysis_tier": conflict_info.get("analysis_tier", "unknown"), 

404 "tier_score": round(conflict_info.get("tier_score", 0.0), 3), 

405 "affected_sections": MCPFormatters._extract_affected_sections( 

406 conflict_info 

407 ), 

408 } 

409 

410 conflict_index.append(conflict_entry) 

411 involved_document_ids.add(doc1_id) 

412 involved_document_ids.add(doc2_id) 

413 

414 # Create document index for involved documents 

415 document_index = [] 

416 if documents: 

417 # Create a lookup for document details 

418 doc_lookup = {} 

419 for doc in documents: 

420 # Handle both SearchResult objects and dictionaries 

421 if isinstance(doc, dict): 

422 doc_id = doc.get("document_id") or f"{doc.get('source_type', 'unknown')}:{doc.get('title', 'Untitled')}" 

423 else: 

424 doc_id = doc.document_id or f"{doc.source_type}:{doc.source_title}" 

425 doc_lookup[doc_id] = doc 

426 

427 # Build document index for involved documents 

428 for doc_id in involved_document_ids: 

429 if doc_id in doc_lookup: 

430 doc = doc_lookup[doc_id] 

431 # Handle both SearchResult objects and dictionaries 

432 if isinstance(doc, dict): 

433 title = doc.get("title", "Untitled") 

434 source_type = doc.get("source_type", "unknown") 

435 text_length = 0 # Not available in dict format 

436 last_modified = None # Not available in dict format 

437 else: 

438 title = doc.source_title or "Untitled" 

439 source_type = doc.source_type 

440 text_length = len(doc.text) if doc.text else 0 

441 last_modified = ( 

442 doc.last_modified 

443 if hasattr(doc, "last_modified") 

444 else None 

445 ) 

446 

447 document_index.append( 

448 { 

449 "document_id": doc_id, 

450 "title": title, 

451 "source_type": source_type, 

452 "text_length": text_length, 

453 "conflict_count": sum( 

454 1 

455 for conflict in conflict_index 

456 if conflict["document_1_id"] == doc_id 

457 or conflict["document_2_id"] == doc_id 

458 ), 

459 "last_modified": last_modified, 

460 } 

461 ) 

462 else: 

463 # Fallback for documents not in the lookup 

464 document_index.append( 

465 { 

466 "document_id": doc_id, 

467 "title": ( 

468 doc_id.split(":", 1)[-1] if ":" in doc_id else doc_id 

469 ), 

470 "source_type": ( 

471 doc_id.split(":", 1)[0] if ":" in doc_id else "unknown" 

472 ), 

473 "text_length": 0, 

474 "conflict_count": sum( 

475 1 

476 for conflict in conflict_index 

477 if conflict["document_1_id"] == doc_id 

478 or conflict["document_2_id"] == doc_id 

479 ), 

480 "last_modified": None, 

481 } 

482 ) 

483 

484 # Create conflict summary 

485 conflict_types = {} 

486 for conflict in conflict_index: 

487 conflict_type = conflict["conflict_type"] 

488 conflict_types[conflict_type] = conflict_types.get(conflict_type, 0) + 1 

489 

490 conflict_summary = { 

491 "total_documents_analyzed": conflicts.get("query_metadata", {}).get( 

492 "document_count", 0 

493 ), 

494 "documents_with_conflicts": len(involved_document_ids), 

495 "total_conflicts_found": len(conflict_index), 

496 "conflict_types": conflict_types, 

497 "highest_confidence_score": max( 

498 [c["confidence_score"] for c in conflict_index], default=0.0 

499 ), 

500 } 

501 

502 # Analysis metadata (copy to avoid mutating input) 

503 analysis_metadata = dict(conflicts.get("query_metadata", {})) 

504 analysis_metadata.update( 

505 {"analysis_strategy": "tiered_analysis", "response_type": "lightweight"} 

506 ) 

507 

508 # Convert conflict_index to the expected schema format 

509 conflicts_detected = [] 

510 for conflict in conflict_index: 

511 # Find document details for the conflicting documents 

512 doc1_info = next( 

513 ( 

514 doc 

515 for doc in document_index 

516 if doc["document_id"] == conflict["document_1_id"] 

517 ), 

518 None, 

519 ) 

520 doc2_info = next( 

521 ( 

522 doc 

523 for doc in document_index 

524 if doc["document_id"] == conflict["document_2_id"] 

525 ), 

526 None, 

527 ) 

528 

529 conflicts_detected.append( 

530 { 

531 "conflict_id": conflict["conflict_id"], 

532 "document_1": { 

533 "title": ( 

534 doc1_info["title"] if doc1_info else conflict["title_1"] 

535 ), 

536 "content_preview": "", # Can be populated if needed 

537 "source_type": ( 

538 doc1_info["source_type"] if doc1_info else "unknown" 

539 ), 

540 "document_id": conflict[ 

541 "document_1_id" 

542 ], # Add this for expand_document compatibility 

543 }, 

544 "document_2": { 

545 "title": ( 

546 doc2_info["title"] if doc2_info else conflict["title_2"] 

547 ), 

548 "content_preview": "", # Can be populated if needed 

549 "source_type": ( 

550 doc2_info["source_type"] if doc2_info else "unknown" 

551 ), 

552 "document_id": conflict[ 

553 "document_2_id" 

554 ], # Add this for expand_document compatibility 

555 }, 

556 "conflict_type": conflict["conflict_type"], 

557 "conflict_score": conflict["confidence_score"], 

558 "conflict_description": conflict["summary"], 

559 "conflicting_statements": conflict.get( 

560 "conflicting_statements", [] 

561 ), 

562 "analysis_tier": conflict["analysis_tier"], # Keep our enhancement 

563 } 

564 ) 

565 

566 # Update conflict summary to match expected format 

567 updated_conflict_summary = { 

568 "total_documents_analyzed": conflict_summary["total_documents_analyzed"], 

569 "conflicts_found": conflict_summary["total_conflicts_found"], 

570 "conflict_types": list(conflict_summary["conflict_types"].keys()), 

571 "highest_conflict_score": conflict_summary["highest_confidence_score"], 

572 } 

573 

574 return { 

575 "conflicts_detected": conflicts_detected, 

576 "conflict_summary": updated_conflict_summary, 

577 "analysis_metadata": analysis_metadata, 

578 "document_index": document_index, # Keep for our enhancement 

579 "navigation": { 

580 "total_conflicts": len(conflict_index), 

581 "max_displayed": len(conflict_index), 

582 "can_expand_documents": True, 

583 "expand_tool": "expand_document", 

584 }, 

585 } 

586 

587 @staticmethod 

588 def _extract_conflicting_statements( 

589 conflict_info: dict[str, Any], 

590 ) -> list[dict[str, str]]: 

591 """Extract actual conflicting statements from structured conflict data.""" 

592 statements = [] 

593 

594 # Check if we have new structured indicators 

595 structured_indicators = conflict_info.get("structured_indicators", []) 

596 

597 if structured_indicators: 

598 # Use the new structured data with actual text snippets 

599 for indicator in structured_indicators[:3]: # Limit to 3 for brevity 

600 doc1_snippet = indicator.get("doc1_snippet", "[Content not available]") 

601 doc2_snippet = indicator.get("doc2_snippet", "[Content not available]") 

602 

603 statements.append( 

604 {"from_doc1": doc1_snippet, "from_doc2": doc2_snippet} 

605 ) 

606 else: 

607 # Fallback to old format (summary only) 

608 indicators = conflict_info.get("indicators", []) 

609 for indicator in indicators[:3]: 

610 statements.append({"from_doc1": indicator, "from_doc2": indicator}) 

611 

612 return statements 

613 

614 @staticmethod 

615 def format_complementary_content(complementary: list[dict[str, Any]]) -> str: 

616 """Format complementary content results for display.""" 

617 if not complementary: 

618 return "🔍 **Complementary Content**\n\nNo complementary content found." 

619 

620 formatted = ( 

621 f"🔗 **Complementary Content** ({len(complementary)} recommendations)\n\n" 

622 ) 

623 

624 for i, item in enumerate(complementary[:5], 1): # Show top 5 

625 document = item.get("document", {}) 

626 score = item.get("relevance_score", 0) # Fixed: use correct key 

627 reason = item.get("recommendation_reason", "") # Fixed: singular form 

628 

629 formatted += f"**{i}. Complementary Score: {score:.3f}**\n" 

630 if hasattr(document, "source_title"): 

631 formatted += f"• Title: {document.source_title}\n" 

632 if reason: 

633 formatted += f"• Why Complementary: {reason}\n" 

634 formatted += "\n" 

635 

636 return formatted 

637 

638 @staticmethod 

639 def format_document_clusters(clusters: dict[str, Any]) -> str: 

640 """Format document clustering results for display.""" 

641 cluster_list = clusters.get("clusters", []) 

642 metadata = clusters.get("clustering_metadata", {}) 

643 

644 if not cluster_list: 

645 message = metadata.get("message", "No clusters could be formed.") 

646 return f"🗂️ **Document Clustering**\n\n{message}" 

647 

648 formatted = f"""🗂️ **Document Clustering Results** 

649 

650📊 **Clustering Summary:** 

651• Strategy: {metadata.get('strategy', 'unknown')} 

652• Total Clusters: {metadata.get('total_clusters', 0)} 

653• Total Documents: {metadata.get('total_documents', 0)} 

654• Original Query: {metadata.get('original_query', 'N/A')} 

655 

656""" 

657 

658 for i, cluster in enumerate(cluster_list[:5], 1): # Show first 5 clusters 

659 formatted += f"**Cluster {i} (ID: {cluster.get('id', 'unknown')})**\n" 

660 formatted += f"• Documents: {len(cluster.get('documents', []))}\n" 

661 formatted += f"• Coherence Score: {cluster.get('coherence_score', 0):.3f}\n" 

662 

663 topics = cluster.get("centroid_topics", []) 

664 if topics: 

665 formatted += f"• Key Topics: {', '.join(topics[:3])}\n" 

666 

667 entities = cluster.get("shared_entities", []) 

668 if entities: 

669 formatted += f"• Shared Entities: {', '.join(entities[:3])}\n" 

670 

671 summary = cluster.get("cluster_summary", "") 

672 if summary: 

673 formatted += f"• Summary: {summary}\n" 

674 

675 formatted += "\n" 

676 

677 return formatted 

678 

679 @staticmethod 

680 def create_lightweight_cluster_results( 

681 clustering_results: dict[str, Any], query: str = "" 

682 ) -> dict[str, Any]: 

683 """Create lightweight cluster results for lazy loading following hierarchy_search pattern.""" 

684 

685 clusters = clustering_results.get("clusters", []) 

686 metadata = clustering_results.get("clustering_metadata", {}) 

687 

688 # Create cluster index with minimal data (limit documents per cluster for performance) 

689 cluster_index = [] 

690 total_documents_shown = 0 

691 max_docs_per_cluster = 5 # Show only first 5 documents per cluster initially 

692 

693 for cluster in clusters: 

694 cluster_documents = cluster.get("documents", []) 

695 

696 # Create lightweight document entries (only first few per cluster) 

697 lightweight_docs = [] 

698 for doc in cluster_documents[:max_docs_per_cluster]: 

699 doc_id = None 

700 title = "Untitled" 

701 source_type = "unknown" 

702 

703 if hasattr(doc, "document_id"): 

704 doc_id = doc.document_id 

705 # Safely resolve title: prefer get_display_title() when available, otherwise fallback to source_title 

706 if hasattr(doc, "get_display_title"): 

707 title = doc.get_display_title() or getattr( 

708 doc, "source_title", "Untitled" 

709 ) 

710 else: 

711 title = getattr(doc, "source_title", "Untitled") 

712 source_type = doc.source_type 

713 elif hasattr(doc, "source_title"): 

714 doc_id = f"{doc.source_type}:{doc.source_title}" 

715 title = doc.source_title 

716 source_type = doc.source_type 

717 elif isinstance(doc, dict): 

718 doc_id = doc.get("document_id", "") 

719 title = ( 

720 doc.get("title") 

721 or doc.get("source_title") 

722 or doc.get("parent_document_title") 

723 or "Untitled" 

724 ) 

725 source_type = doc.get("source_type", "unknown") 

726 

727 lightweight_docs.append( 

728 { 

729 "document_id": doc_id, 

730 "title": title, 

731 "source_type": source_type, 

732 "cluster_relevance": 1.0, 

733 } 

734 ) 

735 total_documents_shown += 1 

736 

737 # Build cluster info 

738 cluster_info = { 

739 "cluster_id": cluster.get("id", f"cluster_{len(cluster_index)}"), 

740 "cluster_name": cluster.get( 

741 "name", f"Cluster {len(cluster_index) + 1}" 

742 ), 

743 "cluster_theme": cluster.get("cluster_summary", "Mixed documents"), 

744 "document_count": len(cluster_documents), 

745 "documents_shown": len(lightweight_docs), 

746 "coherence_score": cluster.get("coherence_score", 0.0), 

747 "representative_doc_id": cluster.get("representative_doc_id"), 

748 "cluster_strategy": cluster.get( 

749 "cluster_strategy", metadata.get("strategy", "mixed_features") 

750 ), 

751 "quality_metrics": cluster.get("quality_metrics", {}), 

752 "documents": lightweight_docs, 

753 "cluster_metadata": { 

754 "shared_entities": cluster.get("shared_entities", [])[ 

755 :5 

756 ], # Limit to first 5 

757 "shared_topics": cluster.get("centroid_topics", [])[ 

758 :5 

759 ], # Limit to first 5 

760 "cluster_keywords": cluster.get("cluster_keywords", [])[:5], 

761 }, 

762 } 

763 cluster_index.append(cluster_info) 

764 

765 # Create enhanced clustering metadata 

766 enhanced_metadata = { 

767 "strategy": metadata.get("strategy", "mixed_features"), 

768 "total_documents": metadata.get("total_documents", 0), 

769 "clusters_created": metadata.get("clusters_created", len(clusters)), 

770 "unclustered_documents": metadata.get("unclustered_documents", 0), 

771 "document_retrieval_rate": metadata.get("document_retrieval_rate", 1.0), 

772 "clustering_quality": metadata.get("clustering_quality", 0.0), 

773 "processing_time_ms": metadata.get("processing_time_ms", 0), 

774 "strategy_performance": metadata.get("strategy_performance", {}), 

775 "recommendations": metadata.get("recommendations", {}), 

776 "query_metadata": { 

777 "search_query": query, 

778 "documents_shown": total_documents_shown, 

779 "max_docs_per_cluster": max_docs_per_cluster, 

780 "lazy_loading_enabled": True, 

781 }, 

782 } 

783 

784 return { 

785 "cluster_index": cluster_index, 

786 "clustering_metadata": enhanced_metadata, 

787 "expansion_info": { 

788 "cluster_expansion_available": True, 

789 "document_expansion_available": True, 

790 "expansion_instructions": "Use expand_document tool with document_id or expand_cluster with cluster_id for full content", 

791 }, 

792 } 

793 

794 @staticmethod 

795 def create_structured_search_results( 

796 results: list[HybridSearchResult], 

797 ) -> list[dict[str, Any]]: 

798 """Create structured results matching Qdrant document structure.""" 

799 return [ 

800 { 

801 # 🔥 ROOT LEVEL FIELDS (matching Qdrant structure) 

802 "score": getattr(result, "score", 0.0), 

803 "document_id": getattr(result, "document_id", "") or "", 

804 "title": ( 

805 result.get_display_title() 

806 if hasattr(result, "get_display_title") 

807 else (getattr(result, "source_title", None) or "Untitled") 

808 ), 

809 "content": getattr(result, "text", None) or "", 

810 "source_type": getattr(result, "source_type", "unknown"), 

811 "source": getattr(result, "repo_name", None) or "", 

812 "url": getattr(result, "source_url", None) or "", 

813 "created_at": getattr(result, "created_at", None) or "", 

814 "updated_at": getattr(result, "last_modified", None) or "", 

815 # 🔥 NESTED METADATA (matching Qdrant structure) 

816 "metadata": { 

817 # Project information 

818 "project_id": getattr(result, "project_id", None) or "", 

819 "project_name": getattr(result, "project_name", None) or "", 

820 "project_description": getattr(result, "project_description", None) 

821 or "", 

822 "collection_name": getattr(result, "collection_name", None) or "", 

823 # File information (from rich Qdrant metadata) 

824 "file_path": getattr(result, "file_path", None) or "", 

825 "file_name": getattr(result, "original_filename", None) or "", 

826 "file_type": getattr(result, "original_file_type", None) or "", 

827 "file_size": getattr(result, "file_size", None), 

828 # Content analysis (from rich Qdrant metadata) 

829 "word_count": getattr(result, "word_count", None), 

830 "char_count": getattr(result, "char_count", None), 

831 "estimated_read_time": getattr(result, "estimated_read_time", None), 

832 # Chunking information (from rich Qdrant metadata) 

833 "chunk_index": getattr(result, "chunk_index", None), 

834 "total_chunks": getattr(result, "total_chunks", None), 

835 "chunk_info": ( 

836 f"Chunk {getattr(result, 'chunk_index', 0) + 1}/{getattr(result, 'total_chunks', 1)}" 

837 if isinstance(getattr(result, "chunk_index", None), int) 

838 and isinstance(getattr(result, "total_chunks", None), int) 

839 else None 

840 ), 

841 "chunking_strategy": getattr(result, "chunking_strategy", None) 

842 or "", 

843 # Enhanced context and analysis 

844 "hierarchy_context": ( 

845 result.get_hierarchy_info() 

846 if hasattr(result, "get_hierarchy_info") 

847 else {} 

848 ), 

849 "content_analysis": ( 

850 result.get_content_info() 

851 if hasattr(result, "get_content_info") 

852 else {} 

853 ), 

854 "semantic_analysis": ( 

855 result.get_semantic_info() 

856 if hasattr(result, "get_semantic_info") 

857 else {} 

858 ), 

859 "section_context": ( 

860 result.get_section_context() 

861 if hasattr(result, "get_section_context") 

862 else "" 

863 ), 

864 "attachment_info": ( 

865 result.get_attachment_info() 

866 if hasattr(result, "get_attachment_info") 

867 else {} 

868 ), 

869 }, 

870 } 

871 for result in results 

872 ] 

873 

874 @staticmethod 

875 def create_lightweight_hierarchy_results( 

876 filtered_results: list[HybridSearchResult], 

877 organized_results: dict[str, list[HybridSearchResult]] = None, 

878 query: str = "", 

879 ) -> dict[str, Any]: 

880 """Return minimal hierarchy data for fast navigation.""" 

881 

882 # Create hierarchy index with minimal data (up to 20 hierarchy nodes) 

883 hierarchy_index = [] 

884 for result in filtered_results[:20]: 

885 hierarchy_index.append( 

886 { 

887 "document_id": getattr(result, "document_id", ""), 

888 "title": getattr(result, "source_title", None) or "Untitled", 

889 "score": getattr(result, "score", 0.0), 

890 "hierarchy_info": { 

891 "depth": MCPFormatters._extract_synthetic_depth(result), 

892 "parent_id": MCPFormatters._extract_synthetic_parent_id(result), 

893 "parent_title": MCPFormatters._extract_synthetic_parent_title( 

894 result 

895 ), 

896 "breadcrumb": MCPFormatters._extract_synthetic_breadcrumb( 

897 result 

898 ), 

899 "has_children": MCPFormatters._extract_has_children(result), 

900 "source_type": getattr(result, "source_type", "unknown"), 

901 }, 

902 "navigation_hints": { 

903 "group": MCPFormatters._get_group_key(result), 

904 "siblings_count": MCPFormatters._count_siblings( 

905 result, filtered_results 

906 ), 

907 "children_count": MCPFormatters._extract_children_count( 

908 result, filtered_results 

909 ), 

910 }, 

911 } 

912 ) 

913 

914 # Create clean hierarchy groups 

915 hierarchy_groups = [] 

916 if organized_results: 

917 for group_key, results in organized_results.items(): 

918 hierarchy_groups.append( 

919 { 

920 "group_key": group_key, 

921 "group_name": MCPFormatters._generate_clean_group_name( 

922 group_key, results 

923 ), 

924 "document_ids": [r.document_id for r in results], 

925 "depth_range": [ 

926 min(getattr(r, "depth", 0) or 0 for r in results), 

927 max(getattr(r, "depth", 0) or 0 for r in results), 

928 ], 

929 "total_documents": len(results), 

930 } 

931 ) 

932 

933 return { 

934 "hierarchy_index": hierarchy_index, 

935 "hierarchy_groups": hierarchy_groups, 

936 "total_found": len(filtered_results), 

937 "query_metadata": { 

938 "search_query": query, 

939 "source_types_found": list( 

940 {getattr(r, "source_type", "unknown") for r in filtered_results} 

941 ), 

942 }, 

943 } 

944 

945 @staticmethod 

946 def create_lightweight_complementary_results( 

947 complementary_recommendations: list[dict[str, Any]], 

948 target_document: "HybridSearchResult" = None, 

949 context_documents_analyzed: int = 0, 

950 target_query: str = "", 

951 ) -> dict[str, Any]: 

952 """Create lightweight complementary content results for lazy loading.""" 

953 

954 # Create complementary index with minimal data 

955 complementary_index = [] 

956 for result in complementary_recommendations: 

957 document = result.get("document") 

958 if document: 

959 complementary_index.append( 

960 { 

961 "document_id": document.document_id, 

962 "title": document.source_title or "Untitled", 

963 "complementary_score": result.get("relevance_score", 0.0), 

964 "complementary_reason": result.get("recommendation_reason", ""), 

965 "relationship_type": result.get("strategy", "related"), 

966 "source_type": document.source_type or "", 

967 "basic_metadata": { 

968 "project_id": document.project_id or "", 

969 "created_at": document.created_at or "", 

970 "source_url": document.source_url or "", 

971 }, 

972 # NO content_preview - use expand_document for full content 

973 } 

974 ) 

975 

976 # Target document info (accept object or lightweight dict) 

977 target_info = { 

978 "title": target_query, # Fallback to query 

979 "content_preview": "", 

980 "source_type": "", 

981 } 

982 

983 if target_document: 

984 if isinstance(target_document, dict): 

985 target_info = { 

986 "document_id": target_document.get("document_id", ""), 

987 "title": target_document.get("title", target_query), 

988 "source_type": target_document.get("source_type", ""), 

989 } 

990 else: 

991 target_info = { 

992 "document_id": getattr(target_document, "document_id", ""), 

993 "title": getattr(target_document, "source_title", None) 

994 or target_query, 

995 "source_type": getattr(target_document, "source_type", "") or "", 

996 } 

997 

998 # Calculate summary statistics 

999 scores = [item.get("complementary_score", 0.0) for item in complementary_index] 

1000 relationship_types = [ 

1001 item.get("relationship_type", "related") for item in complementary_index 

1002 ] 

1003 

1004 return { 

1005 "complementary_index": complementary_index, 

1006 "target_document": target_info, 

1007 "complementary_summary": { 

1008 "total_analyzed": context_documents_analyzed, 

1009 "complementary_found": len(complementary_index), 

1010 "highest_score": max(scores, default=0.0), 

1011 "relationship_types": list(set(relationship_types)), 

1012 }, 

1013 "lazy_loading_enabled": True, 

1014 "expand_document_hint": "Use expand_document tool with document_id for full content", 

1015 } 

1016 

1017 @staticmethod 

1018 def _generate_clean_group_name(group_key: str, results: list) -> str: 

1019 """Generate clear, short group names.""" 

1020 # Remove chunk/content prefixes from group names 

1021 if group_key.startswith("Exists, limited clarity"): 

1022 return "Technical Documentation" 

1023 if group_key.startswith("Immediately begin compiling"): 

1024 return "Product Management" 

1025 if group_key.startswith("Purpose and Scope"): 

1026 return "Project Overview" 

1027 

1028 # Use first meaningful part of breadcrumb 

1029 if " > " in group_key: 

1030 return group_key.split(" > ")[0] 

1031 

1032 # Truncate long names and add context 

1033 if len(group_key) > 50: 

1034 source_type = results[0].source_type if results else "unknown" 

1035 return f"{group_key[:47]}... ({source_type.title()})" 

1036 

1037 return group_key 

1038 

1039 @staticmethod 

1040 def _get_group_key(result) -> str: 

1041 """Generate a stable group key for hierarchy organization.""" 

1042 # Try synthetic breadcrumb first 

1043 synthetic_breadcrumb = MCPFormatters._extract_synthetic_breadcrumb(result) 

1044 if synthetic_breadcrumb: 

1045 if result.source_type == "confluence": 

1046 return synthetic_breadcrumb 

1047 elif result.source_type == "localfile": 

1048 # Use root folder from breadcrumb 

1049 return ( 

1050 synthetic_breadcrumb.split(" > ")[0] 

1051 if " > " in synthetic_breadcrumb 

1052 else synthetic_breadcrumb 

1053 ) 

1054 

1055 # Fallback to file path for localfiles 

1056 if result.source_type == "localfile" and result.file_path: 

1057 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

1058 return path_parts[0] if path_parts else "Root" 

1059 

1060 # Fallback to title 

1061 return result.source_title or "Uncategorized" 

1062 

1063 @staticmethod 

1064 def _count_siblings(result, all_results: list) -> int: 

1065 """Count sibling documents at the same hierarchy level.""" 

1066 target_depth = MCPFormatters._extract_synthetic_depth(result) 

1067 target_parent = MCPFormatters._extract_synthetic_parent_title(result) 

1068 target_group = MCPFormatters._get_group_key(result) 

1069 

1070 siblings = 0 

1071 for other_result in all_results: 

1072 other_depth = MCPFormatters._extract_synthetic_depth(other_result) 

1073 other_parent = MCPFormatters._extract_synthetic_parent_title(other_result) 

1074 other_group = MCPFormatters._get_group_key(other_result) 

1075 

1076 # Count as siblings if same depth and same parent/group 

1077 if ( 

1078 other_depth == target_depth 

1079 and (other_parent == target_parent or other_group == target_group) 

1080 and other_result.document_id != result.document_id 

1081 ): 

1082 siblings += 1 

1083 

1084 return siblings 

1085 

1086 @staticmethod 

1087 def _extract_synthetic_depth(result) -> int: 

1088 """Extract or synthesize depth information from available data.""" 

1089 # Try native hierarchy first 

1090 if hasattr(result, "depth") and result.depth is not None: 

1091 return result.depth 

1092 

1093 # For localfiles, use folder depth 

1094 if result.source_type == "localfile" and result.file_path: 

1095 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

1096 return max(0, len(path_parts) - 1) # Exclude filename 

1097 

1098 # For confluence with section context 

1099 if result.source_type == "confluence": 

1100 section_context = getattr(result, "section_context", "") 

1101 if section_context and "[H" in section_context: 

1102 # Extract header level from section context like "[H2]" 

1103 try: 

1104 header_level = int(section_context.split("[H")[1][0]) 

1105 return header_level - 1 # H1=0, H2=1, etc. 

1106 except (IndexError, ValueError): 

1107 pass 

1108 

1109 return 0 

1110 

1111 @staticmethod 

1112 def _extract_synthetic_parent_id(result) -> str | None: 

1113 """Extract or synthesize parent ID from available data.""" 

1114 # For chunked documents, use base document ID if different chunk 

1115 try: 

1116 chunk_index = getattr(result, "chunk_index", 0) 

1117 if isinstance(chunk_index, int) and chunk_index > 0: 

1118 # Generate a parent ID for chunk 0 of the same document 

1119 document_id = getattr(result, "document_id", None) 

1120 if document_id and isinstance(document_id, str): 

1121 base_id = document_id.split("-")[0] 

1122 return f"{base_id}-chunk-0" if base_id else None 

1123 except (TypeError, AttributeError): 

1124 pass 

1125 

1126 return None 

1127 

1128 @staticmethod 

1129 def _extract_synthetic_parent_title(result) -> str | None: 

1130 """Extract or synthesize parent title from available data.""" 

1131 try: 

1132 # For localfiles, use parent folder name 

1133 source_type = getattr(result, "source_type", "") 

1134 if source_type == "localfile": 

1135 file_path = getattr(result, "file_path", "") 

1136 if file_path and isinstance(file_path, str): 

1137 path_parts = [p for p in file_path.split("/") if p and p != "."] 

1138 if len(path_parts) > 1: 

1139 return path_parts[-2] # Parent folder 

1140 

1141 # For chunked documents, use the base document title 

1142 chunk_index = getattr(result, "chunk_index", 0) 

1143 if isinstance(chunk_index, int) and chunk_index > 0: 

1144 title = getattr(result, "source_title", "") or "" 

1145 if isinstance(title, str) and "(Chunk " in title: 

1146 return title.split("(Chunk ")[0].strip() 

1147 except (TypeError, AttributeError): 

1148 pass 

1149 

1150 return None 

1151 

1152 @staticmethod 

1153 def _extract_synthetic_breadcrumb(result) -> str | None: 

1154 """Extract or synthesize breadcrumb from available data.""" 

1155 # Try native breadcrumb first 

1156 if hasattr(result, "breadcrumb_text") and result.breadcrumb_text: 

1157 return result.breadcrumb_text 

1158 

1159 # For localfiles, create breadcrumb from file path 

1160 if getattr(result, "source_type", None) == "localfile" and getattr( 

1161 result, "file_path", None 

1162 ): 

1163 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

1164 if len(path_parts) > 1: 

1165 return " > ".join(path_parts[:-1]) # Exclude filename 

1166 

1167 # For confluence with section context, create from section info 

1168 if getattr(result, "source_type", None) == "confluence": 

1169 section_context = getattr(result, "section_context", "") 

1170 if isinstance(section_context, str) and section_context: 

1171 # Extract section title from context like "[H2] Functions - Beta release" 

1172 if "]" in section_context: 

1173 section_title = section_context.split("]", 1)[1].strip() 

1174 if section_title and "(#" in section_title: 

1175 section_title = section_title.split("(#")[0].strip() 

1176 return section_title 

1177 

1178 return None 

1179 

1180 @staticmethod 

1181 def _extract_has_children(result) -> bool: 

1182 """Extract or synthesize has_children information.""" 

1183 try: 

1184 # Try native hierarchy first 

1185 if hasattr(result, "has_children") and callable(result.has_children): 

1186 return result.has_children() 

1187 

1188 # For chunked documents, check if this is not the last chunk 

1189 chunk_index = getattr(result, "chunk_index", 0) 

1190 total_chunks = getattr(result, "total_chunks", 1) 

1191 if isinstance(chunk_index, int) and isinstance(total_chunks, int): 

1192 return chunk_index < (total_chunks - 1) 

1193 except (TypeError, AttributeError): 

1194 pass 

1195 

1196 return False 

1197 

1198 @staticmethod 

1199 def _extract_children_count(result, all_results: list) -> int: 

1200 """Extract or synthesize children count from available data.""" 

1201 try: 

1202 # Try native children count first 

1203 children_count = getattr(result, "children_count", None) 

1204 if children_count is not None and isinstance(children_count, int): 

1205 return children_count 

1206 

1207 # For chunked documents, count remaining chunks in same document 

1208 chunk_index = getattr(result, "chunk_index", 0) 

1209 total_chunks = getattr(result, "total_chunks", 1) 

1210 if isinstance(chunk_index, int) and isinstance(total_chunks, int): 

1211 return max(0, total_chunks - chunk_index - 1) 

1212 

1213 # For localfiles, count files in subdirectories (rough estimate) 

1214 source_type = getattr(result, "source_type", "") 

1215 file_path = getattr(result, "file_path", "") 

1216 if ( 

1217 source_type == "localfile" 

1218 and file_path 

1219 and isinstance(file_path, str) 

1220 and all_results 

1221 ): 

1222 base_path = "/".join(file_path.split("/")[:-1]) # Remove filename 

1223 children = 0 

1224 for other in all_results: 

1225 other_source_type = getattr(other, "source_type", "") 

1226 other_file_path = getattr(other, "file_path", "") 

1227 if ( 

1228 other_source_type == "localfile" 

1229 and other_file_path 

1230 and isinstance(other_file_path, str) 

1231 and other_file_path.startswith(base_path + "/") 

1232 and other_file_path != file_path 

1233 ): 

1234 children += 1 

1235 return min(children, 10) # Cap to reasonable number 

1236 except (TypeError, AttributeError): 

1237 pass 

1238 

1239 return 0 

1240 

1241 @staticmethod 

1242 def create_structured_hierarchy_results( 

1243 filtered_results: list[HybridSearchResult], 

1244 organize_by_hierarchy: bool, 

1245 organized_results: dict[str, list[HybridSearchResult]] = None, 

1246 ) -> dict[str, Any]: 

1247 """Legacy method - replaced by create_lightweight_hierarchy_results.""" 

1248 # For backward compatibility during transition, delegate to lightweight version 

1249 return MCPFormatters.create_lightweight_hierarchy_results( 

1250 filtered_results, organized_results 

1251 ) 

1252 

1253 @staticmethod 

1254 def create_structured_attachment_results( 

1255 filtered_results: list[HybridSearchResult], 

1256 attachment_filter: dict[str, Any], 

1257 include_parent_context: bool = True, 

1258 ) -> dict[str, Any]: 

1259 """Create structured content for attachment search MCP compliance.""" 

1260 return { 

1261 "results": [ 

1262 { 

1263 "score": result.score, 

1264 "title": result.source_title or "Untitled", 

1265 "content": result.text, 

1266 "attachment_info": { 

1267 "filename": getattr( 

1268 result, "original_filename", result.source_title or "Untitled" 

1269 ) 

1270 or "Untitled", 

1271 "file_type": ( 

1272 getattr(result, "original_file_type", None) 

1273 or MCPFormatters._extract_file_type_minimal(result) 

1274 or "unknown" 

1275 ), 

1276 "file_size": getattr(result, "file_size", 0) or 0, 

1277 "parent_document": ( 

1278 (getattr(result, "parent_document_title", "") or "") 

1279 if include_parent_context 

1280 else "" 

1281 ), 

1282 }, 

1283 "metadata": { 

1284 "file_path": result.file_path or "", 

1285 "project_id": result.project_id or "", 

1286 "upload_date": getattr(result, "created_at", "") or "", 

1287 "author": getattr(result, "author", "") or "", 

1288 }, 

1289 } 

1290 for result in filtered_results 

1291 ], 

1292 "total_found": len(filtered_results), 

1293 "attachment_summary": { 

1294 "total_attachments": len(filtered_results), 

1295 "file_types": list( 

1296 { 

1297 ( 

1298 getattr(result, "original_file_type", None) 

1299 or MCPFormatters._extract_file_type_minimal(result) 

1300 or "unknown" 

1301 ) 

1302 for result in filtered_results 

1303 } 

1304 ), 

1305 "attachments_only": attachment_filter.get("attachments_only", False), 

1306 }, 

1307 } 

1308 

1309 @staticmethod 

1310 def create_lightweight_attachment_results( 

1311 filtered_results: list[HybridSearchResult], 

1312 attachment_filter: dict[str, Any], 

1313 query: str = "", 

1314 ) -> dict[str, Any]: 

1315 """Return minimal attachment data for fast navigation and lazy loading.""" 

1316 

1317 # Create attachment index with minimal data (limit to 20 for performance) 

1318 attachment_index = [] 

1319 for result in filtered_results[:20]: 

1320 attachment_index.append( 

1321 { 

1322 "document_id": result.document_id, 

1323 "title": result.source_title or "Untitled", 

1324 "score": result.score, 

1325 "attachment_info": { 

1326 "filename": MCPFormatters._extract_safe_filename(result), 

1327 "file_type": MCPFormatters._extract_file_type_minimal(result), 

1328 "file_size": ( 

1329 result.file_size 

1330 if result.file_size and result.file_size > 0 

1331 else None 

1332 ), 

1333 "source_type": result.source_type, 

1334 }, 

1335 "navigation_hints": { 

1336 "parent_document": ( 

1337 getattr(result, "parent_document_title", None) 

1338 or getattr(result, "parent_title", None) 

1339 ), 

1340 "project_context": result.project_name or result.project_id, 

1341 "content_preview": ( 

1342 result.text[:100] + "..." if result.text else None 

1343 ), 

1344 }, 

1345 } 

1346 ) 

1347 

1348 # Create attachment groups for better organization 

1349 attachment_groups = MCPFormatters._organize_attachments_by_type( 

1350 filtered_results 

1351 ) 

1352 

1353 return { 

1354 "attachment_index": attachment_index, 

1355 "attachment_groups": attachment_groups, 

1356 "total_found": len(filtered_results), 

1357 "query_metadata": { 

1358 "search_query": query, 

1359 "source_types_found": list({r.source_type for r in filtered_results}), 

1360 "filters_applied": attachment_filter, 

1361 }, 

1362 } 

1363 

1364 @staticmethod 

1365 def _extract_safe_filename(result: HybridSearchResult) -> str: 

1366 """Fast filename extraction with minimal processing.""" 

1367 # Quick priority check - avoid expensive validation 

1368 original = getattr(result, "original_filename", None) 

1369 if isinstance(original, str) and len(original) < 200: 

1370 return original 

1371 

1372 file_path = getattr(result, "file_path", None) 

1373 if isinstance(file_path, str) and file_path: 

1374 import os 

1375 

1376 return os.path.basename(file_path) 

1377 

1378 # Fallback to source title but clean it 

1379 title = getattr(result, "source_title", None) or "untitled" 

1380 # Quick clean - remove obvious chunk indicators 

1381 if "(Chunk " in title: 

1382 title = title.split("(Chunk ")[0].strip() 

1383 

1384 return title[:100] # Truncate for safety 

1385 

1386 @staticmethod 

1387 def _extract_file_type_minimal(result: HybridSearchResult) -> str: 

1388 """Fast file type detection - minimal processing.""" 

1389 # Priority order with early returns for performance 

1390 mime_type = getattr(result, "mime_type", None) 

1391 if isinstance(mime_type, str) and mime_type: 

1392 return mime_type.split("/")[-1] # Get extension from MIME 

1393 

1394 # Try multiple filename sources for extension extraction 

1395 file_path = getattr(result, "file_path", None) 

1396 source_title = getattr(result, "source_title", None) 

1397 original_filename = getattr(result, "original_filename", None) 

1398 filename_candidates = [ 

1399 original_filename if isinstance(original_filename, str) else None, 

1400 source_title if isinstance(source_title, str) else None, 

1401 ( 

1402 file_path.split("/")[-1] 

1403 if isinstance(file_path, str) and file_path 

1404 else None 

1405 ), 

1406 ] 

1407 

1408 for filename in filename_candidates: 

1409 if filename and "." in filename: 

1410 ext = filename.split(".")[-1].lower().strip() 

1411 # Valid file extensions and common document types 

1412 if len(ext) <= 5 and ext.isalnum(): 

1413 return ext 

1414 

1415 return "unknown" 

1416 

1417 @staticmethod 

1418 def _organize_attachments_by_type(results: list[HybridSearchResult]) -> list[dict]: 

1419 """Organize attachments into logical groups for navigation.""" 

1420 from collections import defaultdict 

1421 

1422 type_groups = defaultdict(list) 

1423 

1424 for result in results: 

1425 # Group by file type first 

1426 file_type = MCPFormatters._extract_file_type_minimal(result) 

1427 group_key = MCPFormatters._get_attachment_group_key( 

1428 file_type, result.source_type 

1429 ) 

1430 type_groups[group_key].append(result.document_id) 

1431 

1432 # Convert to structured format 

1433 groups = [] 

1434 for group_key, doc_ids in type_groups.items(): 

1435 if len(doc_ids) >= 1: # Include all groups, even single files 

1436 groups.append( 

1437 { 

1438 "group_key": group_key, 

1439 "group_name": MCPFormatters._generate_friendly_group_name( 

1440 group_key 

1441 ), 

1442 "document_ids": doc_ids, 

1443 "file_count": len(doc_ids), 

1444 } 

1445 ) 

1446 

1447 # Sort by file count (most common types first) 

1448 return sorted(groups, key=lambda g: g["file_count"], reverse=True) 

1449 

1450 @staticmethod 

1451 def _get_attachment_group_key(file_type: str, source_type: str) -> str: 

1452 """Generate logical grouping keys for attachments.""" 

1453 # Map to broader categories for better UX 

1454 document_types = {"pdf", "doc", "docx", "txt", "md"} 

1455 spreadsheet_types = {"xls", "xlsx", "csv"} 

1456 image_types = {"png", "jpg", "jpeg", "gif", "svg"} 

1457 

1458 if file_type in document_types: 

1459 return f"documents_{source_type}" 

1460 elif file_type in spreadsheet_types: 

1461 return f"spreadsheets_{source_type}" 

1462 elif file_type in image_types: 

1463 return f"images_{source_type}" 

1464 else: 

1465 return f"other_{source_type}" 

1466 

1467 @staticmethod 

1468 def _generate_friendly_group_name(group_key: str) -> str: 

1469 """Generate user-friendly group names.""" 

1470 # Parse the group key format: "type_source" 

1471 if "_" in group_key: 

1472 file_category, source_type = group_key.split("_", 1) 

1473 

1474 # Capitalize and format 

1475 category_map = { 

1476 "documents": "Documents", 

1477 "spreadsheets": "Spreadsheets", 

1478 "images": "Images", 

1479 "other": "Other Files", 

1480 } 

1481 

1482 source_map = { 

1483 "confluence": "Confluence", 

1484 "localfile": "Local Files", 

1485 "git": "Git Repository", 

1486 "jira": "Jira", 

1487 } 

1488 

1489 category = category_map.get(file_category, file_category.title()) 

1490 source = source_map.get(source_type, source_type.title()) 

1491 

1492 return f"{category} ({source})" 

1493 

1494 return group_key.title() 

1495 

1496 @staticmethod 

1497 def _generate_conflict_resolution_suggestion(conflict_info: dict) -> str: 

1498 """Generate a resolution suggestion based on conflict type and information.""" 

1499 conflict_type = conflict_info.get("type", "unknown") 

1500 

1501 if conflict_type == "version_conflict": 

1502 return "Review documents for version consistency and update outdated information" 

1503 elif conflict_type == "contradictory_guidance": 

1504 return "Reconcile contradictory guidance by consulting authoritative sources or stakeholders" 

1505 elif conflict_type == "procedural_conflict": 

1506 return "Establish a single, authoritative procedure and deprecate conflicting processes" 

1507 elif conflict_type == "requirement_conflict": 

1508 return "Clarify requirements with stakeholders and update documentation to resolve ambiguity" 

1509 elif conflict_type == "implementation_conflict": 

1510 return "Review implementation approaches and standardize on the preferred solution" 

1511 else: 

1512 return ( 

1513 "Review conflicting information and establish a single source of truth" 

1514 ) 

1515 

1516 @staticmethod 

1517 def _extract_affected_sections(conflict_info: dict) -> list: 

1518 """Extract affected sections from conflict information.""" 

1519 affected_sections = [] 

1520 

1521 # Try to identify sections from structured indicators 

1522 structured_indicators = conflict_info.get("structured_indicators", []) 

1523 for indicator in structured_indicators: 

1524 if isinstance(indicator, dict): 

1525 # Look for section keywords in the snippets 

1526 doc1_snippet = indicator.get("doc1_snippet", "") 

1527 doc2_snippet = indicator.get("doc2_snippet", "") 

1528 

1529 sections = set() 

1530 for snippet in [doc1_snippet, doc2_snippet]: 

1531 # Common section patterns 

1532 if "introduction" in snippet.lower(): 

1533 sections.add("Introduction") 

1534 elif "requirement" in snippet.lower(): 

1535 sections.add("Requirements") 

1536 elif "procedure" in snippet.lower() or "process" in snippet.lower(): 

1537 sections.add("Procedures") 

1538 elif "implementation" in snippet.lower(): 

1539 sections.add("Implementation") 

1540 elif ( 

1541 "configuration" in snippet.lower() 

1542 or "config" in snippet.lower() 

1543 ): 

1544 sections.add("Configuration") 

1545 elif "guideline" in snippet.lower() or "guide" in snippet.lower(): 

1546 sections.add("Guidelines") 

1547 

1548 affected_sections.extend(list(sections)) 

1549 

1550 # Remove duplicates and return 

1551 return list(set(affected_sections)) if affected_sections else ["Content"]