Coverage for src/qdrant_loader_mcp_server/mcp/formatters.py: 83%
602 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Response formatters for MCP server."""
3from typing import Any
5from ..search.components.search_result_models import HybridSearchResult
8class MCPFormatters:
9 """Response formatters for MCP server."""
11 @staticmethod
12 def format_search_result(result: HybridSearchResult) -> str:
13 """Format a search result for display."""
14 formatted_result = f"Score: {result.score}\n"
15 formatted_result += f"Text: {result.text}\n"
16 formatted_result += f"Source: {result.source_type}"
18 if result.source_title:
19 formatted_result += f" - {result.source_title}"
21 # Add project information if available
22 project_info = result.get_project_info()
23 if project_info:
24 formatted_result += f"\n🏗️ {project_info}"
26 # Add attachment information if this is a file attachment
27 if result.is_attachment:
28 formatted_result += "\n📎 Attachment"
29 if result.original_filename:
30 formatted_result += f": {result.original_filename}"
31 if result.attachment_context:
32 formatted_result += f"\n📋 {result.attachment_context}"
33 if result.parent_document_title:
34 formatted_result += f"\n📄 Attached to: {result.parent_document_title}"
36 # Add hierarchy context for Confluence documents
37 if result.source_type == "confluence" and result.breadcrumb_text:
38 formatted_result += f"\n📍 Path: {result.breadcrumb_text}"
40 if result.source_url:
41 formatted_result += f" ({result.source_url})"
43 if result.file_path:
44 formatted_result += f"\nFile: {result.file_path}"
46 if result.repo_name:
47 formatted_result += f"\nRepo: {result.repo_name}"
49 # Add hierarchy information for Confluence documents
50 if result.source_type == "confluence" and result.hierarchy_context:
51 formatted_result += f"\n🏗️ {result.hierarchy_context}"
53 # Add parent information if available (for hierarchy, not attachments)
54 if result.parent_title and not result.is_attachment:
55 formatted_result += f"\n⬆️ Parent: {result.parent_title}"
57 # Add children count if available
58 if result.has_children():
59 formatted_result += f"\n⬇️ Children: {result.children_count}"
61 return formatted_result
63 @staticmethod
64 def format_attachment_search_result(result: HybridSearchResult) -> str:
65 """Format an attachment search result for display."""
66 formatted_result = f"Score: {result.score}\n"
67 formatted_result += f"Text: {result.text}\n"
68 formatted_result += f"Source: {result.source_type}"
70 if result.source_title:
71 formatted_result += f" - {result.source_title}"
73 # Add attachment information
74 formatted_result += "\n📎 Attachment"
75 if result.original_filename:
76 formatted_result += f": {result.original_filename}"
77 if result.attachment_context:
78 formatted_result += f"\n📋 {result.attachment_context}"
79 if result.parent_document_title:
80 formatted_result += f"\n📄 Attached to: {result.parent_document_title}"
82 # Add hierarchy context for Confluence documents
83 if result.source_type == "confluence" and result.breadcrumb_text:
84 formatted_result += f"\n📍 Path: {result.breadcrumb_text}"
86 if result.source_url:
87 formatted_result += f" ({result.source_url})"
89 if result.file_path:
90 formatted_result += f"\nFile: {result.file_path}"
92 if result.repo_name:
93 formatted_result += f"\nRepo: {result.repo_name}"
95 # Add hierarchy information for Confluence documents
96 if result.source_type == "confluence" and result.hierarchy_context:
97 formatted_result += f"\n🏗️ {result.hierarchy_context}"
99 # Add parent information if available (for hierarchy, not attachments)
100 if result.parent_title and not result.is_attachment:
101 formatted_result += f"\n⬆️ Parent: {result.parent_title}"
103 # Add children count if available
104 if result.has_children():
105 formatted_result += f"\n⬇️ Children: {result.children_count}"
107 return formatted_result
109 @staticmethod
110 def format_hierarchical_results(
111 organized_results: dict[str, list[HybridSearchResult]],
112 ) -> str:
113 """Format hierarchically organized results for display."""
114 formatted_sections = []
116 for root_title, results in organized_results.items():
117 section = f"📁 **{root_title}** ({len(results)} results)\n"
119 for result in results:
120 indent = " " * (result.depth or 0)
121 section += f"{indent}📄 {result.source_title}"
122 if result.hierarchy_context:
123 section += f" | {result.hierarchy_context}"
124 section += f" (Score: {result.score:.3f})\n"
126 # Add a snippet of the content
127 content_snippet = (
128 result.text[:150] + "..." if len(result.text) > 150 else result.text
129 )
130 section += f"{indent} {content_snippet}\n"
132 if result.source_url:
133 section += f"{indent} 🔗 {result.source_url}\n"
134 section += "\n"
136 formatted_sections.append(section)
138 return (
139 f"Found {sum(len(results) for results in organized_results.values())} results organized by hierarchy:\n\n"
140 + "\n".join(formatted_sections)
141 )
143 @staticmethod
144 def format_relationship_analysis(analysis: dict[str, Any]) -> str:
145 """Format document relationship analysis for display."""
146 if "error" in analysis:
147 return f"❌ Error: {analysis['error']}"
149 summary = analysis.get("summary", {})
150 formatted = f"""🔍 **Document Relationship Analysis**
152📊 **Summary:**
153• Total Documents: {summary.get('total_documents', 0)}
154• Clusters Found: {summary.get('clusters_found', 0)}
155• Citation Relationships: {summary.get('citation_relationships', 0)}
156• Conflicts Detected: {summary.get('conflicts_detected', 0)}
158🏷️ **Query Information:**
159• Original Query: {analysis.get('query_metadata', {}).get('original_query', 'N/A')}
160• Documents Analyzed: {analysis.get('query_metadata', {}).get('document_count', 0)}
161"""
163 clusters = analysis.get("document_clusters", [])
164 if clusters:
165 formatted += "\n🗂️ **Document Clusters:**\n"
166 for i, cluster in enumerate(clusters[:3], 1): # Show first 3 clusters
167 formatted += (
168 f"• Cluster {i}: {len(cluster.get('documents', []))} documents\n"
169 )
171 conflicts = analysis.get("conflict_analysis", {}).get("conflicting_pairs", [])
172 if conflicts:
173 formatted += f"\n⚠️ **Conflicts Detected:** {len(conflicts)} conflicting document pairs\n"
175 return formatted
177 @staticmethod
178 def format_similar_documents(similar_docs: list[dict[str, Any]]) -> str:
179 """Format similar documents results for display."""
180 if not similar_docs:
181 return "🔍 **Similar Documents**\n\nNo similar documents found."
183 formatted = f"🔍 **Similar Documents** ({len(similar_docs)} found)\n\n"
185 for i, doc_info in enumerate(similar_docs[:5], 1): # Show top 5
186 score = doc_info.get("similarity_score", 0)
187 document = doc_info.get("document", {})
188 reasons = doc_info.get("similarity_reasons", [])
190 formatted += f"**{i}. Similarity Score: {score:.3f}**\n"
191 if hasattr(document, "source_title"):
192 formatted += f"• Title: {document.source_title}\n"
193 if reasons:
194 formatted += f"• Reasons: {', '.join(reasons)}\n"
195 formatted += "\n"
197 return formatted
199 @staticmethod
200 def create_lightweight_similar_documents_results(
201 similar_docs: list[dict[str, Any]],
202 target_query: str = "",
203 comparison_query: str = "",
204 ) -> dict[str, Any]:
205 """Return minimal similar documents data for fast navigation."""
207 # Create similarity index with minimal data
208 similarity_index: list[dict[str, Any]] = []
209 for doc_info in similar_docs:
210 document = doc_info.get("document", {})
211 fields = MCPFormatters._extract_minimal_doc_fields(
212 document, context=doc_info
213 )
215 similarity_index.append(
216 {
217 "document_id": fields["document_id"],
218 "title": fields["title"],
219 "similarity_score": doc_info.get("similarity_score", 0.0),
220 "similarity_info": {
221 "metric_scores": doc_info.get("metric_scores", {}),
222 "similarity_reasons": doc_info.get("similarity_reasons", []),
223 "source_type": fields["source_type"],
224 },
225 "navigation_hints": {
226 "can_expand": True,
227 "has_content": fields["text_length"] > 0,
228 "content_length": fields["text_length"],
229 "expand_tool": "expand_document", # Unified expand tool
230 },
231 }
232 )
234 # Extract similarity metrics used
235 metrics_used: list[str] = []
236 if similar_docs:
237 first_doc_metrics = similar_docs[0].get("metric_scores", {})
238 metrics_used = list(first_doc_metrics.keys())
240 return {
241 "similarity_index": similarity_index,
242 "query_info": {
243 "target_query": target_query,
244 "comparison_query": comparison_query,
245 "total_found": len(similarity_index),
246 "metrics_used": metrics_used,
247 },
248 "navigation": {
249 "supports_lazy_loading": True,
250 "expand_document_tool": "expand_document",
251 "sort_order": "similarity_desc",
252 "max_displayed": len(similarity_index),
253 },
254 }
256 @staticmethod
257 def _extract_minimal_doc_fields(
258 document: Any, context: dict[str, Any] | None = None
259 ) -> dict[str, Any]:
260 """Extract minimal document fields from either an object or a dict.
262 Returns a dict with keys: document_id, title, source_type, text_length.
263 """
264 # Defaults
265 doc_id = ""
266 title = "Untitled"
267 source_type = "unknown"
268 text_length = 0
270 # Object-like document (e.g., HybridSearchResult)
271 if document is not None and not isinstance(document, dict):
272 doc_id = getattr(document, "document_id", None) or ""
273 title = getattr(document, "source_title", None) or title
274 source_type = getattr(document, "source_type", None) or source_type
275 text_val = getattr(document, "text", None)
276 else:
277 # Dict-like document
278 doc_id = (document or {}).get("document_id") or (context or {}).get(
279 "document_id", ""
280 )
281 title = (
282 (document or {}).get("source_title")
283 or (document or {}).get("title")
284 or title
285 )
286 source_type = (document or {}).get("source_type") or source_type
287 text_val = (document or {}).get("text", None)
289 # Compute text_length robustly
290 if isinstance(text_val, str):
291 text_length = len(text_val)
292 elif isinstance(text_val, int | float):
293 # Treat numeric and all non-string types as missing content
294 text_length = 0
296 return {
297 "document_id": doc_id,
298 "title": title,
299 "source_type": source_type,
300 "text_length": text_length,
301 }
303 @staticmethod
304 def format_conflict_analysis(conflicts: dict[str, Any]) -> str:
305 """Format conflict analysis results for display."""
306 conflicting_pairs = conflicts.get("conflicting_pairs", [])
308 if not conflicting_pairs:
309 return (
310 "✅ **Conflict Analysis**\n\nNo conflicts detected between documents."
311 )
313 formatted = (
314 f"⚠️ **Conflict Analysis** ({len(conflicting_pairs)} conflicts found)\n\n"
315 )
317 for i, (doc1, doc2, conflict_info) in enumerate(conflicting_pairs[:5], 1):
318 conflict_type = conflict_info.get("type", "unknown")
319 formatted += f"**{i}. Conflict Type: {conflict_type}**\n"
320 formatted += f"• Document 1: {doc1}\n"
321 formatted += f"• Document 2: {doc2}\n\n"
323 suggestions = conflicts.get("resolution_suggestions", {})
324 if suggestions:
325 formatted += "💡 **Resolution Suggestions:**\n"
326 # Convert dict values to list and take first 3
327 suggestion_list = list(suggestions.values())[:3]
328 for suggestion in suggestion_list:
329 formatted += f"• {suggestion}\n"
331 # Append detector/runtime stats if available for transparency
332 qmeta = conflicts.get("query_metadata", {}) or {}
333 stats = qmeta.get("detector_stats", {}) or {}
334 if stats:
335 pairs_considered = stats.get("pairs_considered")
336 pairs_analyzed = stats.get("pairs_analyzed")
337 llm_pairs = stats.get("llm_pairs")
338 elapsed_ms = stats.get("elapsed_ms")
339 partial = qmeta.get("partial_results") or stats.get("partial_results")
341 formatted += "\n\n🧪 **Analysis Stats:**\n"
342 if pairs_considered is not None:
343 formatted += f"• Pairs considered: {pairs_considered}\n"
344 if pairs_analyzed is not None:
345 formatted += f"• Pairs analyzed: {pairs_analyzed}\n"
346 if llm_pairs is not None:
347 formatted += f"• LLM pairs: {llm_pairs}\n"
348 if elapsed_ms is not None:
349 try:
350 formatted += f"• Elapsed: {float(elapsed_ms):.0f} ms\n"
351 except Exception:
352 formatted += f"• Elapsed: {elapsed_ms} ms\n"
353 if partial:
354 formatted += "• Partial results due to time budget\n"
356 return formatted
358 @staticmethod
359 def create_lightweight_conflict_results(
360 conflicts: dict[str, Any], query: str = "", documents: list = None
361 ) -> dict[str, Any]:
362 """Create lightweight conflict results for fast navigation and lazy loading."""
364 conflicting_pairs = conflicts.get("conflicting_pairs", [])
366 # Create conflict index with comprehensive information
367 conflict_index = []
368 involved_document_ids = set()
370 for i, (doc1_id, doc2_id, conflict_info) in enumerate(conflicting_pairs):
371 conflict_id = f"conf_{i+1:03d}"
373 # Extract document titles from IDs or use the IDs themselves
374 title_1 = doc1_id.split(":", 1)[-1] if ":" in doc1_id else doc1_id
375 title_2 = doc2_id.split(":", 1)[-1] if ":" in doc2_id else doc2_id
377 # Extract conflicting statements using centralized helper
378 conflicting_statements = MCPFormatters._extract_conflicting_statements(
379 conflict_info
380 )
382 # Create rich conflict entry with comprehensive information
383 conflict_entry = {
384 "conflict_id": conflict_id,
385 "document_1_id": doc1_id,
386 "document_2_id": doc2_id,
387 "conflict_type": conflict_info.get("type", "unknown"),
388 "confidence_score": round(conflict_info.get("confidence", 0.0), 3),
389 "title_1": title_1[:100] + "..." if len(title_1) > 100 else title_1,
390 "title_2": title_2[:100] + "..." if len(title_2) > 100 else title_2,
391 "summary": conflict_info.get(
392 "description", "Potential conflict detected"
393 ),
394 "detailed_description": conflict_info.get(
395 "description",
396 "Documents contain contradictory or inconsistent information",
397 ),
398 "resolution_suggestion": MCPFormatters._generate_conflict_resolution_suggestion(
399 conflict_info
400 ),
401 "conflict_indicators": conflict_info.get("indicators", []),
402 "conflicting_statements": conflicting_statements,
403 "analysis_tier": conflict_info.get("analysis_tier", "unknown"),
404 "tier_score": round(conflict_info.get("tier_score", 0.0), 3),
405 "affected_sections": MCPFormatters._extract_affected_sections(
406 conflict_info
407 ),
408 }
410 conflict_index.append(conflict_entry)
411 involved_document_ids.add(doc1_id)
412 involved_document_ids.add(doc2_id)
414 # Create document index for involved documents
415 document_index = []
416 if documents:
417 # Create a lookup for document details
418 doc_lookup = {}
419 for doc in documents:
420 # Handle both SearchResult objects and dictionaries
421 if isinstance(doc, dict):
422 doc_id = doc.get("document_id") or f"{doc.get('source_type', 'unknown')}:{doc.get('title', 'Untitled')}"
423 else:
424 doc_id = doc.document_id or f"{doc.source_type}:{doc.source_title}"
425 doc_lookup[doc_id] = doc
427 # Build document index for involved documents
428 for doc_id in involved_document_ids:
429 if doc_id in doc_lookup:
430 doc = doc_lookup[doc_id]
431 # Handle both SearchResult objects and dictionaries
432 if isinstance(doc, dict):
433 title = doc.get("title", "Untitled")
434 source_type = doc.get("source_type", "unknown")
435 text_length = 0 # Not available in dict format
436 last_modified = None # Not available in dict format
437 else:
438 title = doc.source_title or "Untitled"
439 source_type = doc.source_type
440 text_length = len(doc.text) if doc.text else 0
441 last_modified = (
442 doc.last_modified
443 if hasattr(doc, "last_modified")
444 else None
445 )
447 document_index.append(
448 {
449 "document_id": doc_id,
450 "title": title,
451 "source_type": source_type,
452 "text_length": text_length,
453 "conflict_count": sum(
454 1
455 for conflict in conflict_index
456 if conflict["document_1_id"] == doc_id
457 or conflict["document_2_id"] == doc_id
458 ),
459 "last_modified": last_modified,
460 }
461 )
462 else:
463 # Fallback for documents not in the lookup
464 document_index.append(
465 {
466 "document_id": doc_id,
467 "title": (
468 doc_id.split(":", 1)[-1] if ":" in doc_id else doc_id
469 ),
470 "source_type": (
471 doc_id.split(":", 1)[0] if ":" in doc_id else "unknown"
472 ),
473 "text_length": 0,
474 "conflict_count": sum(
475 1
476 for conflict in conflict_index
477 if conflict["document_1_id"] == doc_id
478 or conflict["document_2_id"] == doc_id
479 ),
480 "last_modified": None,
481 }
482 )
484 # Create conflict summary
485 conflict_types = {}
486 for conflict in conflict_index:
487 conflict_type = conflict["conflict_type"]
488 conflict_types[conflict_type] = conflict_types.get(conflict_type, 0) + 1
490 conflict_summary = {
491 "total_documents_analyzed": conflicts.get("query_metadata", {}).get(
492 "document_count", 0
493 ),
494 "documents_with_conflicts": len(involved_document_ids),
495 "total_conflicts_found": len(conflict_index),
496 "conflict_types": conflict_types,
497 "highest_confidence_score": max(
498 [c["confidence_score"] for c in conflict_index], default=0.0
499 ),
500 }
502 # Analysis metadata (copy to avoid mutating input)
503 analysis_metadata = dict(conflicts.get("query_metadata", {}))
504 analysis_metadata.update(
505 {"analysis_strategy": "tiered_analysis", "response_type": "lightweight"}
506 )
508 # Convert conflict_index to the expected schema format
509 conflicts_detected = []
510 for conflict in conflict_index:
511 # Find document details for the conflicting documents
512 doc1_info = next(
513 (
514 doc
515 for doc in document_index
516 if doc["document_id"] == conflict["document_1_id"]
517 ),
518 None,
519 )
520 doc2_info = next(
521 (
522 doc
523 for doc in document_index
524 if doc["document_id"] == conflict["document_2_id"]
525 ),
526 None,
527 )
529 conflicts_detected.append(
530 {
531 "conflict_id": conflict["conflict_id"],
532 "document_1": {
533 "title": (
534 doc1_info["title"] if doc1_info else conflict["title_1"]
535 ),
536 "content_preview": "", # Can be populated if needed
537 "source_type": (
538 doc1_info["source_type"] if doc1_info else "unknown"
539 ),
540 "document_id": conflict[
541 "document_1_id"
542 ], # Add this for expand_document compatibility
543 },
544 "document_2": {
545 "title": (
546 doc2_info["title"] if doc2_info else conflict["title_2"]
547 ),
548 "content_preview": "", # Can be populated if needed
549 "source_type": (
550 doc2_info["source_type"] if doc2_info else "unknown"
551 ),
552 "document_id": conflict[
553 "document_2_id"
554 ], # Add this for expand_document compatibility
555 },
556 "conflict_type": conflict["conflict_type"],
557 "conflict_score": conflict["confidence_score"],
558 "conflict_description": conflict["summary"],
559 "conflicting_statements": conflict.get(
560 "conflicting_statements", []
561 ),
562 "analysis_tier": conflict["analysis_tier"], # Keep our enhancement
563 }
564 )
566 # Update conflict summary to match expected format
567 updated_conflict_summary = {
568 "total_documents_analyzed": conflict_summary["total_documents_analyzed"],
569 "conflicts_found": conflict_summary["total_conflicts_found"],
570 "conflict_types": list(conflict_summary["conflict_types"].keys()),
571 "highest_conflict_score": conflict_summary["highest_confidence_score"],
572 }
574 return {
575 "conflicts_detected": conflicts_detected,
576 "conflict_summary": updated_conflict_summary,
577 "analysis_metadata": analysis_metadata,
578 "document_index": document_index, # Keep for our enhancement
579 "navigation": {
580 "total_conflicts": len(conflict_index),
581 "max_displayed": len(conflict_index),
582 "can_expand_documents": True,
583 "expand_tool": "expand_document",
584 },
585 }
587 @staticmethod
588 def _extract_conflicting_statements(
589 conflict_info: dict[str, Any],
590 ) -> list[dict[str, str]]:
591 """Extract actual conflicting statements from structured conflict data."""
592 statements = []
594 # Check if we have new structured indicators
595 structured_indicators = conflict_info.get("structured_indicators", [])
597 if structured_indicators:
598 # Use the new structured data with actual text snippets
599 for indicator in structured_indicators[:3]: # Limit to 3 for brevity
600 doc1_snippet = indicator.get("doc1_snippet", "[Content not available]")
601 doc2_snippet = indicator.get("doc2_snippet", "[Content not available]")
603 statements.append(
604 {"from_doc1": doc1_snippet, "from_doc2": doc2_snippet}
605 )
606 else:
607 # Fallback to old format (summary only)
608 indicators = conflict_info.get("indicators", [])
609 for indicator in indicators[:3]:
610 statements.append({"from_doc1": indicator, "from_doc2": indicator})
612 return statements
614 @staticmethod
615 def format_complementary_content(complementary: list[dict[str, Any]]) -> str:
616 """Format complementary content results for display."""
617 if not complementary:
618 return "🔍 **Complementary Content**\n\nNo complementary content found."
620 formatted = (
621 f"🔗 **Complementary Content** ({len(complementary)} recommendations)\n\n"
622 )
624 for i, item in enumerate(complementary[:5], 1): # Show top 5
625 document = item.get("document", {})
626 score = item.get("relevance_score", 0) # Fixed: use correct key
627 reason = item.get("recommendation_reason", "") # Fixed: singular form
629 formatted += f"**{i}. Complementary Score: {score:.3f}**\n"
630 if hasattr(document, "source_title"):
631 formatted += f"• Title: {document.source_title}\n"
632 if reason:
633 formatted += f"• Why Complementary: {reason}\n"
634 formatted += "\n"
636 return formatted
638 @staticmethod
639 def format_document_clusters(clusters: dict[str, Any]) -> str:
640 """Format document clustering results for display."""
641 cluster_list = clusters.get("clusters", [])
642 metadata = clusters.get("clustering_metadata", {})
644 if not cluster_list:
645 message = metadata.get("message", "No clusters could be formed.")
646 return f"🗂️ **Document Clustering**\n\n{message}"
648 formatted = f"""🗂️ **Document Clustering Results**
650📊 **Clustering Summary:**
651• Strategy: {metadata.get('strategy', 'unknown')}
652• Total Clusters: {metadata.get('total_clusters', 0)}
653• Total Documents: {metadata.get('total_documents', 0)}
654• Original Query: {metadata.get('original_query', 'N/A')}
656"""
658 for i, cluster in enumerate(cluster_list[:5], 1): # Show first 5 clusters
659 formatted += f"**Cluster {i} (ID: {cluster.get('id', 'unknown')})**\n"
660 formatted += f"• Documents: {len(cluster.get('documents', []))}\n"
661 formatted += f"• Coherence Score: {cluster.get('coherence_score', 0):.3f}\n"
663 topics = cluster.get("centroid_topics", [])
664 if topics:
665 formatted += f"• Key Topics: {', '.join(topics[:3])}\n"
667 entities = cluster.get("shared_entities", [])
668 if entities:
669 formatted += f"• Shared Entities: {', '.join(entities[:3])}\n"
671 summary = cluster.get("cluster_summary", "")
672 if summary:
673 formatted += f"• Summary: {summary}\n"
675 formatted += "\n"
677 return formatted
679 @staticmethod
680 def create_lightweight_cluster_results(
681 clustering_results: dict[str, Any], query: str = ""
682 ) -> dict[str, Any]:
683 """Create lightweight cluster results for lazy loading following hierarchy_search pattern."""
685 clusters = clustering_results.get("clusters", [])
686 metadata = clustering_results.get("clustering_metadata", {})
688 # Create cluster index with minimal data (limit documents per cluster for performance)
689 cluster_index = []
690 total_documents_shown = 0
691 max_docs_per_cluster = 5 # Show only first 5 documents per cluster initially
693 for cluster in clusters:
694 cluster_documents = cluster.get("documents", [])
696 # Create lightweight document entries (only first few per cluster)
697 lightweight_docs = []
698 for doc in cluster_documents[:max_docs_per_cluster]:
699 doc_id = None
700 title = "Untitled"
701 source_type = "unknown"
703 if hasattr(doc, "document_id"):
704 doc_id = doc.document_id
705 # Safely resolve title: prefer get_display_title() when available, otherwise fallback to source_title
706 if hasattr(doc, "get_display_title"):
707 title = doc.get_display_title() or getattr(
708 doc, "source_title", "Untitled"
709 )
710 else:
711 title = getattr(doc, "source_title", "Untitled")
712 source_type = doc.source_type
713 elif hasattr(doc, "source_title"):
714 doc_id = f"{doc.source_type}:{doc.source_title}"
715 title = doc.source_title
716 source_type = doc.source_type
717 elif isinstance(doc, dict):
718 doc_id = doc.get("document_id", "")
719 title = (
720 doc.get("title")
721 or doc.get("source_title")
722 or doc.get("parent_document_title")
723 or "Untitled"
724 )
725 source_type = doc.get("source_type", "unknown")
727 lightweight_docs.append(
728 {
729 "document_id": doc_id,
730 "title": title,
731 "source_type": source_type,
732 "cluster_relevance": 1.0,
733 }
734 )
735 total_documents_shown += 1
737 # Build cluster info
738 cluster_info = {
739 "cluster_id": cluster.get("id", f"cluster_{len(cluster_index)}"),
740 "cluster_name": cluster.get(
741 "name", f"Cluster {len(cluster_index) + 1}"
742 ),
743 "cluster_theme": cluster.get("cluster_summary", "Mixed documents"),
744 "document_count": len(cluster_documents),
745 "documents_shown": len(lightweight_docs),
746 "coherence_score": cluster.get("coherence_score", 0.0),
747 "representative_doc_id": cluster.get("representative_doc_id"),
748 "cluster_strategy": cluster.get(
749 "cluster_strategy", metadata.get("strategy", "mixed_features")
750 ),
751 "quality_metrics": cluster.get("quality_metrics", {}),
752 "documents": lightweight_docs,
753 "cluster_metadata": {
754 "shared_entities": cluster.get("shared_entities", [])[
755 :5
756 ], # Limit to first 5
757 "shared_topics": cluster.get("centroid_topics", [])[
758 :5
759 ], # Limit to first 5
760 "cluster_keywords": cluster.get("cluster_keywords", [])[:5],
761 },
762 }
763 cluster_index.append(cluster_info)
765 # Create enhanced clustering metadata
766 enhanced_metadata = {
767 "strategy": metadata.get("strategy", "mixed_features"),
768 "total_documents": metadata.get("total_documents", 0),
769 "clusters_created": metadata.get("clusters_created", len(clusters)),
770 "unclustered_documents": metadata.get("unclustered_documents", 0),
771 "document_retrieval_rate": metadata.get("document_retrieval_rate", 1.0),
772 "clustering_quality": metadata.get("clustering_quality", 0.0),
773 "processing_time_ms": metadata.get("processing_time_ms", 0),
774 "strategy_performance": metadata.get("strategy_performance", {}),
775 "recommendations": metadata.get("recommendations", {}),
776 "query_metadata": {
777 "search_query": query,
778 "documents_shown": total_documents_shown,
779 "max_docs_per_cluster": max_docs_per_cluster,
780 "lazy_loading_enabled": True,
781 },
782 }
784 return {
785 "cluster_index": cluster_index,
786 "clustering_metadata": enhanced_metadata,
787 "expansion_info": {
788 "cluster_expansion_available": True,
789 "document_expansion_available": True,
790 "expansion_instructions": "Use expand_document tool with document_id or expand_cluster with cluster_id for full content",
791 },
792 }
794 @staticmethod
795 def create_structured_search_results(
796 results: list[HybridSearchResult],
797 ) -> list[dict[str, Any]]:
798 """Create structured results matching Qdrant document structure."""
799 return [
800 {
801 # 🔥 ROOT LEVEL FIELDS (matching Qdrant structure)
802 "score": getattr(result, "score", 0.0),
803 "document_id": getattr(result, "document_id", "") or "",
804 "title": (
805 result.get_display_title()
806 if hasattr(result, "get_display_title")
807 else (getattr(result, "source_title", None) or "Untitled")
808 ),
809 "content": getattr(result, "text", None) or "",
810 "source_type": getattr(result, "source_type", "unknown"),
811 "source": getattr(result, "repo_name", None) or "",
812 "url": getattr(result, "source_url", None) or "",
813 "created_at": getattr(result, "created_at", None) or "",
814 "updated_at": getattr(result, "last_modified", None) or "",
815 # 🔥 NESTED METADATA (matching Qdrant structure)
816 "metadata": {
817 # Project information
818 "project_id": getattr(result, "project_id", None) or "",
819 "project_name": getattr(result, "project_name", None) or "",
820 "project_description": getattr(result, "project_description", None)
821 or "",
822 "collection_name": getattr(result, "collection_name", None) or "",
823 # File information (from rich Qdrant metadata)
824 "file_path": getattr(result, "file_path", None) or "",
825 "file_name": getattr(result, "original_filename", None) or "",
826 "file_type": getattr(result, "original_file_type", None) or "",
827 "file_size": getattr(result, "file_size", None),
828 # Content analysis (from rich Qdrant metadata)
829 "word_count": getattr(result, "word_count", None),
830 "char_count": getattr(result, "char_count", None),
831 "estimated_read_time": getattr(result, "estimated_read_time", None),
832 # Chunking information (from rich Qdrant metadata)
833 "chunk_index": getattr(result, "chunk_index", None),
834 "total_chunks": getattr(result, "total_chunks", None),
835 "chunk_info": (
836 f"Chunk {getattr(result, 'chunk_index', 0) + 1}/{getattr(result, 'total_chunks', 1)}"
837 if isinstance(getattr(result, "chunk_index", None), int)
838 and isinstance(getattr(result, "total_chunks", None), int)
839 else None
840 ),
841 "chunking_strategy": getattr(result, "chunking_strategy", None)
842 or "",
843 # Enhanced context and analysis
844 "hierarchy_context": (
845 result.get_hierarchy_info()
846 if hasattr(result, "get_hierarchy_info")
847 else {}
848 ),
849 "content_analysis": (
850 result.get_content_info()
851 if hasattr(result, "get_content_info")
852 else {}
853 ),
854 "semantic_analysis": (
855 result.get_semantic_info()
856 if hasattr(result, "get_semantic_info")
857 else {}
858 ),
859 "section_context": (
860 result.get_section_context()
861 if hasattr(result, "get_section_context")
862 else ""
863 ),
864 "attachment_info": (
865 result.get_attachment_info()
866 if hasattr(result, "get_attachment_info")
867 else {}
868 ),
869 },
870 }
871 for result in results
872 ]
874 @staticmethod
875 def create_lightweight_hierarchy_results(
876 filtered_results: list[HybridSearchResult],
877 organized_results: dict[str, list[HybridSearchResult]] = None,
878 query: str = "",
879 ) -> dict[str, Any]:
880 """Return minimal hierarchy data for fast navigation."""
882 # Create hierarchy index with minimal data (up to 20 hierarchy nodes)
883 hierarchy_index = []
884 for result in filtered_results[:20]:
885 hierarchy_index.append(
886 {
887 "document_id": getattr(result, "document_id", ""),
888 "title": getattr(result, "source_title", None) or "Untitled",
889 "score": getattr(result, "score", 0.0),
890 "hierarchy_info": {
891 "depth": MCPFormatters._extract_synthetic_depth(result),
892 "parent_id": MCPFormatters._extract_synthetic_parent_id(result),
893 "parent_title": MCPFormatters._extract_synthetic_parent_title(
894 result
895 ),
896 "breadcrumb": MCPFormatters._extract_synthetic_breadcrumb(
897 result
898 ),
899 "has_children": MCPFormatters._extract_has_children(result),
900 "source_type": getattr(result, "source_type", "unknown"),
901 },
902 "navigation_hints": {
903 "group": MCPFormatters._get_group_key(result),
904 "siblings_count": MCPFormatters._count_siblings(
905 result, filtered_results
906 ),
907 "children_count": MCPFormatters._extract_children_count(
908 result, filtered_results
909 ),
910 },
911 }
912 )
914 # Create clean hierarchy groups
915 hierarchy_groups = []
916 if organized_results:
917 for group_key, results in organized_results.items():
918 hierarchy_groups.append(
919 {
920 "group_key": group_key,
921 "group_name": MCPFormatters._generate_clean_group_name(
922 group_key, results
923 ),
924 "document_ids": [r.document_id for r in results],
925 "depth_range": [
926 min(getattr(r, "depth", 0) or 0 for r in results),
927 max(getattr(r, "depth", 0) or 0 for r in results),
928 ],
929 "total_documents": len(results),
930 }
931 )
933 return {
934 "hierarchy_index": hierarchy_index,
935 "hierarchy_groups": hierarchy_groups,
936 "total_found": len(filtered_results),
937 "query_metadata": {
938 "search_query": query,
939 "source_types_found": list(
940 {getattr(r, "source_type", "unknown") for r in filtered_results}
941 ),
942 },
943 }
945 @staticmethod
946 def create_lightweight_complementary_results(
947 complementary_recommendations: list[dict[str, Any]],
948 target_document: "HybridSearchResult" = None,
949 context_documents_analyzed: int = 0,
950 target_query: str = "",
951 ) -> dict[str, Any]:
952 """Create lightweight complementary content results for lazy loading."""
954 # Create complementary index with minimal data
955 complementary_index = []
956 for result in complementary_recommendations:
957 document = result.get("document")
958 if document:
959 complementary_index.append(
960 {
961 "document_id": document.document_id,
962 "title": document.source_title or "Untitled",
963 "complementary_score": result.get("relevance_score", 0.0),
964 "complementary_reason": result.get("recommendation_reason", ""),
965 "relationship_type": result.get("strategy", "related"),
966 "source_type": document.source_type or "",
967 "basic_metadata": {
968 "project_id": document.project_id or "",
969 "created_at": document.created_at or "",
970 "source_url": document.source_url or "",
971 },
972 # NO content_preview - use expand_document for full content
973 }
974 )
976 # Target document info (accept object or lightweight dict)
977 target_info = {
978 "title": target_query, # Fallback to query
979 "content_preview": "",
980 "source_type": "",
981 }
983 if target_document:
984 if isinstance(target_document, dict):
985 target_info = {
986 "document_id": target_document.get("document_id", ""),
987 "title": target_document.get("title", target_query),
988 "source_type": target_document.get("source_type", ""),
989 }
990 else:
991 target_info = {
992 "document_id": getattr(target_document, "document_id", ""),
993 "title": getattr(target_document, "source_title", None)
994 or target_query,
995 "source_type": getattr(target_document, "source_type", "") or "",
996 }
998 # Calculate summary statistics
999 scores = [item.get("complementary_score", 0.0) for item in complementary_index]
1000 relationship_types = [
1001 item.get("relationship_type", "related") for item in complementary_index
1002 ]
1004 return {
1005 "complementary_index": complementary_index,
1006 "target_document": target_info,
1007 "complementary_summary": {
1008 "total_analyzed": context_documents_analyzed,
1009 "complementary_found": len(complementary_index),
1010 "highest_score": max(scores, default=0.0),
1011 "relationship_types": list(set(relationship_types)),
1012 },
1013 "lazy_loading_enabled": True,
1014 "expand_document_hint": "Use expand_document tool with document_id for full content",
1015 }
1017 @staticmethod
1018 def _generate_clean_group_name(group_key: str, results: list) -> str:
1019 """Generate clear, short group names."""
1020 # Remove chunk/content prefixes from group names
1021 if group_key.startswith("Exists, limited clarity"):
1022 return "Technical Documentation"
1023 if group_key.startswith("Immediately begin compiling"):
1024 return "Product Management"
1025 if group_key.startswith("Purpose and Scope"):
1026 return "Project Overview"
1028 # Use first meaningful part of breadcrumb
1029 if " > " in group_key:
1030 return group_key.split(" > ")[0]
1032 # Truncate long names and add context
1033 if len(group_key) > 50:
1034 source_type = results[0].source_type if results else "unknown"
1035 return f"{group_key[:47]}... ({source_type.title()})"
1037 return group_key
1039 @staticmethod
1040 def _get_group_key(result) -> str:
1041 """Generate a stable group key for hierarchy organization."""
1042 # Try synthetic breadcrumb first
1043 synthetic_breadcrumb = MCPFormatters._extract_synthetic_breadcrumb(result)
1044 if synthetic_breadcrumb:
1045 if result.source_type == "confluence":
1046 return synthetic_breadcrumb
1047 elif result.source_type == "localfile":
1048 # Use root folder from breadcrumb
1049 return (
1050 synthetic_breadcrumb.split(" > ")[0]
1051 if " > " in synthetic_breadcrumb
1052 else synthetic_breadcrumb
1053 )
1055 # Fallback to file path for localfiles
1056 if result.source_type == "localfile" and result.file_path:
1057 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
1058 return path_parts[0] if path_parts else "Root"
1060 # Fallback to title
1061 return result.source_title or "Uncategorized"
1063 @staticmethod
1064 def _count_siblings(result, all_results: list) -> int:
1065 """Count sibling documents at the same hierarchy level."""
1066 target_depth = MCPFormatters._extract_synthetic_depth(result)
1067 target_parent = MCPFormatters._extract_synthetic_parent_title(result)
1068 target_group = MCPFormatters._get_group_key(result)
1070 siblings = 0
1071 for other_result in all_results:
1072 other_depth = MCPFormatters._extract_synthetic_depth(other_result)
1073 other_parent = MCPFormatters._extract_synthetic_parent_title(other_result)
1074 other_group = MCPFormatters._get_group_key(other_result)
1076 # Count as siblings if same depth and same parent/group
1077 if (
1078 other_depth == target_depth
1079 and (other_parent == target_parent or other_group == target_group)
1080 and other_result.document_id != result.document_id
1081 ):
1082 siblings += 1
1084 return siblings
1086 @staticmethod
1087 def _extract_synthetic_depth(result) -> int:
1088 """Extract or synthesize depth information from available data."""
1089 # Try native hierarchy first
1090 if hasattr(result, "depth") and result.depth is not None:
1091 return result.depth
1093 # For localfiles, use folder depth
1094 if result.source_type == "localfile" and result.file_path:
1095 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
1096 return max(0, len(path_parts) - 1) # Exclude filename
1098 # For confluence with section context
1099 if result.source_type == "confluence":
1100 section_context = getattr(result, "section_context", "")
1101 if section_context and "[H" in section_context:
1102 # Extract header level from section context like "[H2]"
1103 try:
1104 header_level = int(section_context.split("[H")[1][0])
1105 return header_level - 1 # H1=0, H2=1, etc.
1106 except (IndexError, ValueError):
1107 pass
1109 return 0
1111 @staticmethod
1112 def _extract_synthetic_parent_id(result) -> str | None:
1113 """Extract or synthesize parent ID from available data."""
1114 # For chunked documents, use base document ID if different chunk
1115 try:
1116 chunk_index = getattr(result, "chunk_index", 0)
1117 if isinstance(chunk_index, int) and chunk_index > 0:
1118 # Generate a parent ID for chunk 0 of the same document
1119 document_id = getattr(result, "document_id", None)
1120 if document_id and isinstance(document_id, str):
1121 base_id = document_id.split("-")[0]
1122 return f"{base_id}-chunk-0" if base_id else None
1123 except (TypeError, AttributeError):
1124 pass
1126 return None
1128 @staticmethod
1129 def _extract_synthetic_parent_title(result) -> str | None:
1130 """Extract or synthesize parent title from available data."""
1131 try:
1132 # For localfiles, use parent folder name
1133 source_type = getattr(result, "source_type", "")
1134 if source_type == "localfile":
1135 file_path = getattr(result, "file_path", "")
1136 if file_path and isinstance(file_path, str):
1137 path_parts = [p for p in file_path.split("/") if p and p != "."]
1138 if len(path_parts) > 1:
1139 return path_parts[-2] # Parent folder
1141 # For chunked documents, use the base document title
1142 chunk_index = getattr(result, "chunk_index", 0)
1143 if isinstance(chunk_index, int) and chunk_index > 0:
1144 title = getattr(result, "source_title", "") or ""
1145 if isinstance(title, str) and "(Chunk " in title:
1146 return title.split("(Chunk ")[0].strip()
1147 except (TypeError, AttributeError):
1148 pass
1150 return None
1152 @staticmethod
1153 def _extract_synthetic_breadcrumb(result) -> str | None:
1154 """Extract or synthesize breadcrumb from available data."""
1155 # Try native breadcrumb first
1156 if hasattr(result, "breadcrumb_text") and result.breadcrumb_text:
1157 return result.breadcrumb_text
1159 # For localfiles, create breadcrumb from file path
1160 if getattr(result, "source_type", None) == "localfile" and getattr(
1161 result, "file_path", None
1162 ):
1163 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
1164 if len(path_parts) > 1:
1165 return " > ".join(path_parts[:-1]) # Exclude filename
1167 # For confluence with section context, create from section info
1168 if getattr(result, "source_type", None) == "confluence":
1169 section_context = getattr(result, "section_context", "")
1170 if isinstance(section_context, str) and section_context:
1171 # Extract section title from context like "[H2] Functions - Beta release"
1172 if "]" in section_context:
1173 section_title = section_context.split("]", 1)[1].strip()
1174 if section_title and "(#" in section_title:
1175 section_title = section_title.split("(#")[0].strip()
1176 return section_title
1178 return None
1180 @staticmethod
1181 def _extract_has_children(result) -> bool:
1182 """Extract or synthesize has_children information."""
1183 try:
1184 # Try native hierarchy first
1185 if hasattr(result, "has_children") and callable(result.has_children):
1186 return result.has_children()
1188 # For chunked documents, check if this is not the last chunk
1189 chunk_index = getattr(result, "chunk_index", 0)
1190 total_chunks = getattr(result, "total_chunks", 1)
1191 if isinstance(chunk_index, int) and isinstance(total_chunks, int):
1192 return chunk_index < (total_chunks - 1)
1193 except (TypeError, AttributeError):
1194 pass
1196 return False
1198 @staticmethod
1199 def _extract_children_count(result, all_results: list) -> int:
1200 """Extract or synthesize children count from available data."""
1201 try:
1202 # Try native children count first
1203 children_count = getattr(result, "children_count", None)
1204 if children_count is not None and isinstance(children_count, int):
1205 return children_count
1207 # For chunked documents, count remaining chunks in same document
1208 chunk_index = getattr(result, "chunk_index", 0)
1209 total_chunks = getattr(result, "total_chunks", 1)
1210 if isinstance(chunk_index, int) and isinstance(total_chunks, int):
1211 return max(0, total_chunks - chunk_index - 1)
1213 # For localfiles, count files in subdirectories (rough estimate)
1214 source_type = getattr(result, "source_type", "")
1215 file_path = getattr(result, "file_path", "")
1216 if (
1217 source_type == "localfile"
1218 and file_path
1219 and isinstance(file_path, str)
1220 and all_results
1221 ):
1222 base_path = "/".join(file_path.split("/")[:-1]) # Remove filename
1223 children = 0
1224 for other in all_results:
1225 other_source_type = getattr(other, "source_type", "")
1226 other_file_path = getattr(other, "file_path", "")
1227 if (
1228 other_source_type == "localfile"
1229 and other_file_path
1230 and isinstance(other_file_path, str)
1231 and other_file_path.startswith(base_path + "/")
1232 and other_file_path != file_path
1233 ):
1234 children += 1
1235 return min(children, 10) # Cap to reasonable number
1236 except (TypeError, AttributeError):
1237 pass
1239 return 0
1241 @staticmethod
1242 def create_structured_hierarchy_results(
1243 filtered_results: list[HybridSearchResult],
1244 organize_by_hierarchy: bool,
1245 organized_results: dict[str, list[HybridSearchResult]] = None,
1246 ) -> dict[str, Any]:
1247 """Legacy method - replaced by create_lightweight_hierarchy_results."""
1248 # For backward compatibility during transition, delegate to lightweight version
1249 return MCPFormatters.create_lightweight_hierarchy_results(
1250 filtered_results, organized_results
1251 )
1253 @staticmethod
1254 def create_structured_attachment_results(
1255 filtered_results: list[HybridSearchResult],
1256 attachment_filter: dict[str, Any],
1257 include_parent_context: bool = True,
1258 ) -> dict[str, Any]:
1259 """Create structured content for attachment search MCP compliance."""
1260 return {
1261 "results": [
1262 {
1263 "score": result.score,
1264 "title": result.source_title or "Untitled",
1265 "content": result.text,
1266 "attachment_info": {
1267 "filename": getattr(
1268 result, "original_filename", result.source_title or "Untitled"
1269 )
1270 or "Untitled",
1271 "file_type": (
1272 getattr(result, "original_file_type", None)
1273 or MCPFormatters._extract_file_type_minimal(result)
1274 or "unknown"
1275 ),
1276 "file_size": getattr(result, "file_size", 0) or 0,
1277 "parent_document": (
1278 (getattr(result, "parent_document_title", "") or "")
1279 if include_parent_context
1280 else ""
1281 ),
1282 },
1283 "metadata": {
1284 "file_path": result.file_path or "",
1285 "project_id": result.project_id or "",
1286 "upload_date": getattr(result, "created_at", "") or "",
1287 "author": getattr(result, "author", "") or "",
1288 },
1289 }
1290 for result in filtered_results
1291 ],
1292 "total_found": len(filtered_results),
1293 "attachment_summary": {
1294 "total_attachments": len(filtered_results),
1295 "file_types": list(
1296 {
1297 (
1298 getattr(result, "original_file_type", None)
1299 or MCPFormatters._extract_file_type_minimal(result)
1300 or "unknown"
1301 )
1302 for result in filtered_results
1303 }
1304 ),
1305 "attachments_only": attachment_filter.get("attachments_only", False),
1306 },
1307 }
1309 @staticmethod
1310 def create_lightweight_attachment_results(
1311 filtered_results: list[HybridSearchResult],
1312 attachment_filter: dict[str, Any],
1313 query: str = "",
1314 ) -> dict[str, Any]:
1315 """Return minimal attachment data for fast navigation and lazy loading."""
1317 # Create attachment index with minimal data (limit to 20 for performance)
1318 attachment_index = []
1319 for result in filtered_results[:20]:
1320 attachment_index.append(
1321 {
1322 "document_id": result.document_id,
1323 "title": result.source_title or "Untitled",
1324 "score": result.score,
1325 "attachment_info": {
1326 "filename": MCPFormatters._extract_safe_filename(result),
1327 "file_type": MCPFormatters._extract_file_type_minimal(result),
1328 "file_size": (
1329 result.file_size
1330 if result.file_size and result.file_size > 0
1331 else None
1332 ),
1333 "source_type": result.source_type,
1334 },
1335 "navigation_hints": {
1336 "parent_document": (
1337 getattr(result, "parent_document_title", None)
1338 or getattr(result, "parent_title", None)
1339 ),
1340 "project_context": result.project_name or result.project_id,
1341 "content_preview": (
1342 result.text[:100] + "..." if result.text else None
1343 ),
1344 },
1345 }
1346 )
1348 # Create attachment groups for better organization
1349 attachment_groups = MCPFormatters._organize_attachments_by_type(
1350 filtered_results
1351 )
1353 return {
1354 "attachment_index": attachment_index,
1355 "attachment_groups": attachment_groups,
1356 "total_found": len(filtered_results),
1357 "query_metadata": {
1358 "search_query": query,
1359 "source_types_found": list({r.source_type for r in filtered_results}),
1360 "filters_applied": attachment_filter,
1361 },
1362 }
1364 @staticmethod
1365 def _extract_safe_filename(result: HybridSearchResult) -> str:
1366 """Fast filename extraction with minimal processing."""
1367 # Quick priority check - avoid expensive validation
1368 original = getattr(result, "original_filename", None)
1369 if isinstance(original, str) and len(original) < 200:
1370 return original
1372 file_path = getattr(result, "file_path", None)
1373 if isinstance(file_path, str) and file_path:
1374 import os
1376 return os.path.basename(file_path)
1378 # Fallback to source title but clean it
1379 title = getattr(result, "source_title", None) or "untitled"
1380 # Quick clean - remove obvious chunk indicators
1381 if "(Chunk " in title:
1382 title = title.split("(Chunk ")[0].strip()
1384 return title[:100] # Truncate for safety
1386 @staticmethod
1387 def _extract_file_type_minimal(result: HybridSearchResult) -> str:
1388 """Fast file type detection - minimal processing."""
1389 # Priority order with early returns for performance
1390 mime_type = getattr(result, "mime_type", None)
1391 if isinstance(mime_type, str) and mime_type:
1392 return mime_type.split("/")[-1] # Get extension from MIME
1394 # Try multiple filename sources for extension extraction
1395 file_path = getattr(result, "file_path", None)
1396 source_title = getattr(result, "source_title", None)
1397 original_filename = getattr(result, "original_filename", None)
1398 filename_candidates = [
1399 original_filename if isinstance(original_filename, str) else None,
1400 source_title if isinstance(source_title, str) else None,
1401 (
1402 file_path.split("/")[-1]
1403 if isinstance(file_path, str) and file_path
1404 else None
1405 ),
1406 ]
1408 for filename in filename_candidates:
1409 if filename and "." in filename:
1410 ext = filename.split(".")[-1].lower().strip()
1411 # Valid file extensions and common document types
1412 if len(ext) <= 5 and ext.isalnum():
1413 return ext
1415 return "unknown"
1417 @staticmethod
1418 def _organize_attachments_by_type(results: list[HybridSearchResult]) -> list[dict]:
1419 """Organize attachments into logical groups for navigation."""
1420 from collections import defaultdict
1422 type_groups = defaultdict(list)
1424 for result in results:
1425 # Group by file type first
1426 file_type = MCPFormatters._extract_file_type_minimal(result)
1427 group_key = MCPFormatters._get_attachment_group_key(
1428 file_type, result.source_type
1429 )
1430 type_groups[group_key].append(result.document_id)
1432 # Convert to structured format
1433 groups = []
1434 for group_key, doc_ids in type_groups.items():
1435 if len(doc_ids) >= 1: # Include all groups, even single files
1436 groups.append(
1437 {
1438 "group_key": group_key,
1439 "group_name": MCPFormatters._generate_friendly_group_name(
1440 group_key
1441 ),
1442 "document_ids": doc_ids,
1443 "file_count": len(doc_ids),
1444 }
1445 )
1447 # Sort by file count (most common types first)
1448 return sorted(groups, key=lambda g: g["file_count"], reverse=True)
1450 @staticmethod
1451 def _get_attachment_group_key(file_type: str, source_type: str) -> str:
1452 """Generate logical grouping keys for attachments."""
1453 # Map to broader categories for better UX
1454 document_types = {"pdf", "doc", "docx", "txt", "md"}
1455 spreadsheet_types = {"xls", "xlsx", "csv"}
1456 image_types = {"png", "jpg", "jpeg", "gif", "svg"}
1458 if file_type in document_types:
1459 return f"documents_{source_type}"
1460 elif file_type in spreadsheet_types:
1461 return f"spreadsheets_{source_type}"
1462 elif file_type in image_types:
1463 return f"images_{source_type}"
1464 else:
1465 return f"other_{source_type}"
1467 @staticmethod
1468 def _generate_friendly_group_name(group_key: str) -> str:
1469 """Generate user-friendly group names."""
1470 # Parse the group key format: "type_source"
1471 if "_" in group_key:
1472 file_category, source_type = group_key.split("_", 1)
1474 # Capitalize and format
1475 category_map = {
1476 "documents": "Documents",
1477 "spreadsheets": "Spreadsheets",
1478 "images": "Images",
1479 "other": "Other Files",
1480 }
1482 source_map = {
1483 "confluence": "Confluence",
1484 "localfile": "Local Files",
1485 "git": "Git Repository",
1486 "jira": "Jira",
1487 }
1489 category = category_map.get(file_category, file_category.title())
1490 source = source_map.get(source_type, source_type.title())
1492 return f"{category} ({source})"
1494 return group_key.title()
1496 @staticmethod
1497 def _generate_conflict_resolution_suggestion(conflict_info: dict) -> str:
1498 """Generate a resolution suggestion based on conflict type and information."""
1499 conflict_type = conflict_info.get("type", "unknown")
1501 if conflict_type == "version_conflict":
1502 return "Review documents for version consistency and update outdated information"
1503 elif conflict_type == "contradictory_guidance":
1504 return "Reconcile contradictory guidance by consulting authoritative sources or stakeholders"
1505 elif conflict_type == "procedural_conflict":
1506 return "Establish a single, authoritative procedure and deprecate conflicting processes"
1507 elif conflict_type == "requirement_conflict":
1508 return "Clarify requirements with stakeholders and update documentation to resolve ambiguity"
1509 elif conflict_type == "implementation_conflict":
1510 return "Review implementation approaches and standardize on the preferred solution"
1511 else:
1512 return (
1513 "Review conflicting information and establish a single source of truth"
1514 )
1516 @staticmethod
1517 def _extract_affected_sections(conflict_info: dict) -> list:
1518 """Extract affected sections from conflict information."""
1519 affected_sections = []
1521 # Try to identify sections from structured indicators
1522 structured_indicators = conflict_info.get("structured_indicators", [])
1523 for indicator in structured_indicators:
1524 if isinstance(indicator, dict):
1525 # Look for section keywords in the snippets
1526 doc1_snippet = indicator.get("doc1_snippet", "")
1527 doc2_snippet = indicator.get("doc2_snippet", "")
1529 sections = set()
1530 for snippet in [doc1_snippet, doc2_snippet]:
1531 # Common section patterns
1532 if "introduction" in snippet.lower():
1533 sections.add("Introduction")
1534 elif "requirement" in snippet.lower():
1535 sections.add("Requirements")
1536 elif "procedure" in snippet.lower() or "process" in snippet.lower():
1537 sections.add("Procedures")
1538 elif "implementation" in snippet.lower():
1539 sections.add("Implementation")
1540 elif (
1541 "configuration" in snippet.lower()
1542 or "config" in snippet.lower()
1543 ):
1544 sections.add("Configuration")
1545 elif "guideline" in snippet.lower() or "guide" in snippet.lower():
1546 sections.add("Guidelines")
1548 affected_sections.extend(list(sections))
1550 # Remove duplicates and return
1551 return list(set(affected_sections)) if affected_sections else ["Content"]