Coverage for src/qdrant_loader_mcp_server/mcp/formatters/intelligence.py: 84%
235 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Intelligence Result Formatters - Analysis and Insights Formatting.
4This module handles formatting of intelligence analysis results including
5relationship analysis, similarity detection, conflict analysis, and
6complementary content discovery.
7"""
9from typing import Any
12class IntelligenceResultFormatters:
13 """Handles intelligence analysis result formatting operations."""
15 @staticmethod
16 def format_relationship_analysis(analysis: dict[str, Any]) -> str:
17 """Format document relationship analysis for display."""
18 if "error" in analysis:
19 return f"❌ Error: {analysis['error']}"
21 summary = analysis.get("summary", {})
22 formatted = f"""🔍 **Document Relationship Analysis**
24📊 **Summary:**
25• Total Documents: {summary.get('total_documents', 0)}
26• Clusters Found: {summary.get('clusters_found', 0)}
27• Citation Relationships: {summary.get('citation_relationships', 0)}
28• Conflicts Detected: {summary.get('conflicts_detected', 0)}
30🏷️ **Query Information:**
31• Original Query: {analysis.get('query_metadata', {}).get('original_query', 'N/A')}
32• Documents Analyzed: {analysis.get('query_metadata', {}).get('document_count', 0)}
33"""
35 # Accept multiple shapes for clusters
36 clusters_candidate = None
37 for key in (
38 "document_clusters",
39 "topic_clusters",
40 "entity_clusters",
41 "clusters",
42 ):
43 value = analysis.get(key)
44 if value:
45 clusters_candidate = value
46 break
48 cluster_list: list[Any] = []
49 if isinstance(clusters_candidate, list):
50 cluster_list = clusters_candidate
51 elif isinstance(clusters_candidate, dict):
52 # Some producers return a dict of clusters
53 cluster_list = list(clusters_candidate.values())
55 if cluster_list:
56 formatted += "\n🗂️ **Document Clusters:**\n"
57 for i, cluster in enumerate(cluster_list[:3], 1): # Show first 3 clusters
58 count = 0
59 if isinstance(cluster, dict):
60 items = (
61 cluster.get("documents")
62 or cluster.get("items")
63 or cluster.get("members")
64 )
65 if isinstance(items, list):
66 count = len(items)
67 else:
68 try:
69 count = len(cluster)
70 except Exception:
71 count = 0
72 else:
73 try:
74 count = len(cluster) # type: ignore[arg-type]
75 except Exception:
76 count = 0
78 formatted += f"• Cluster {i}: {count} documents\n"
80 # Aggregate conflicts across possible locations/shapes
81 conflict_lists: list[list[Any]] = []
82 conflict_analysis = analysis.get("conflict_analysis", {}) or {}
83 for key in ("conflicting_pairs", "conflicts"):
84 lst = conflict_analysis.get(key)
85 if isinstance(lst, list):
86 conflict_lists.append(lst)
88 for key in ("conflicts", "conflicting_pairs"):
89 lst = analysis.get(key)
90 if isinstance(lst, list):
91 conflict_lists.append(lst)
93 entity_relationships = analysis.get("entity_relationships", {}) or {}
94 for key in ("conflicting_pairs", "conflicts"):
95 lst = entity_relationships.get(key)
96 if isinstance(lst, list):
97 conflict_lists.append(lst)
99 total_conflicts = sum(len(lst) for lst in conflict_lists)
100 if total_conflicts:
101 formatted += f"\n⚠️ **Conflicts Detected:** {total_conflicts} conflicting document pairs\n"
103 return formatted
105 @staticmethod
106 def format_similar_documents(similar_docs: list[dict[str, Any]]) -> str:
107 """Format similar documents results for display."""
108 if not similar_docs:
109 return "🔍 **Similar Documents**\n\nNo similar documents found."
111 formatted = f"🔍 **Similar Documents** ({len(similar_docs)} found)\n\n"
113 for i, doc_info in enumerate(similar_docs[:5], 1): # Show top 5
114 # Robust similarity score extraction
115 score_value: Any = None
116 for key in ("overall_similarity", "similarity_score"):
117 if key in doc_info:
118 score_value = doc_info.get(key)
119 break
120 if score_value is None:
121 similarity_scores = doc_info.get("similarity_scores")
122 if isinstance(similarity_scores, dict):
123 if "overall" in similarity_scores:
124 score_value = similarity_scores.get("overall")
125 else:
126 for v in similarity_scores.values():
127 if isinstance(v, int | float):
128 score_value = v
129 break
130 elif isinstance(similarity_scores, list):
131 for v in similarity_scores:
132 if isinstance(v, int | float):
133 score_value = v
134 break
135 try:
136 score = float(score_value) if score_value is not None else 0.0
137 except (TypeError, ValueError):
138 score = 0.0
140 document = doc_info.get("document", {})
142 # Title extraction: document.source_title -> document.title -> top-level
143 title_value = None
144 if isinstance(document, dict):
145 title_value = document.get("source_title") or document.get("title")
146 else:
147 title_value = getattr(document, "source_title", None) or getattr(
148 document, "title", None
149 )
150 if not title_value:
151 title_value = doc_info.get("source_title") or doc_info.get("title")
153 # Reasons extraction: prefer list but normalize strings
154 reasons_value = (
155 doc_info.get("similarity_reasons")
156 or doc_info.get("reason")
157 or doc_info.get("explanations")
158 or doc_info.get("reasons")
159 )
160 reasons_list: list[str] = []
161 if isinstance(reasons_value, list):
162 reasons_list = [str(r) for r in reasons_value]
163 elif isinstance(reasons_value, str):
164 reasons_list = [reasons_value]
166 formatted += f"**{i}. Similarity Score: {score:.3f}**\n"
167 if title_value:
168 formatted += f"• Title: {title_value}\n"
169 if reasons_list:
170 formatted += f"• Reasons: {', '.join(reasons_list)}\n"
171 formatted += "\n"
173 return formatted
175 @staticmethod
176 def format_conflict_analysis(conflicts: dict[str, Any]) -> str:
177 """Format conflict analysis results for display."""
178 # Handle both new format ("conflicts") and old format ("conflicting_pairs")
179 conflict_list = conflicts.get("conflicts", [])
180 conflicting_pairs = conflicts.get("conflicting_pairs", [])
182 # Use whichever format is provided
183 if conflicting_pairs:
184 conflict_list = conflicting_pairs
186 if not conflicts or not conflict_list:
187 return (
188 "✅ **Conflict Analysis**\n\nNo conflicts detected between documents."
189 )
191 formatted = (
192 f"⚠️ **Conflict Analysis** ({len(conflict_list)} conflicts found)\n\n"
193 )
195 for i, conflict in enumerate(conflict_list[:3], 1): # Show top 3 conflicts
196 # Handle tuple format (doc1, doc2, metadata) or dict format
197 if isinstance(conflict, tuple) and len(conflict) == 3:
198 doc1_name, doc2_name, metadata = conflict
199 conflict_type = metadata.get("type", "unknown")
200 doc1_title = doc1_name
201 doc2_title = doc2_name
202 else:
203 # Dict format
204 doc1 = conflict.get("document_1", {})
205 doc2 = conflict.get("document_2", {})
206 doc1_title = (
207 doc1.get("title", "Unknown")
208 if isinstance(doc1, dict)
209 else str(doc1)
210 )
211 doc2_title = (
212 doc2.get("title", "Unknown")
213 if isinstance(doc2, dict)
214 else str(doc2)
215 )
216 # severity currently unused in formatted output
217 conflict_type = conflict.get("conflict_type", "unknown")
219 formatted += f"**{i}. Conflict Type: {conflict_type}**\n"
220 formatted += f"• Document 1: {doc1_title}\n"
221 formatted += f"• Document 2: {doc2_title}\n"
223 # Only check for conflicting_statements in dict format
224 if isinstance(conflict, dict) and "conflicting_statements" in conflict:
225 statements = conflict["conflicting_statements"]
226 if statements:
227 formatted += f"• Conflicting statements found: {len(statements)}\n"
229 formatted += "\n"
231 # Add resolution suggestions if available
232 suggestions = conflicts.get("resolution_suggestions", {})
233 if suggestions:
234 formatted += "💡 **Resolution Suggestions:**\n"
235 if isinstance(suggestions, dict):
236 # Handle dict format
237 for _key, suggestion in list(suggestions.items())[
238 :2
239 ]: # Show top 2 suggestions
240 formatted += f"• {suggestion}\n"
241 else:
242 # Handle list format
243 for suggestion in suggestions[:2]: # Show top 2 suggestions
244 formatted += f"• {suggestion}\n"
246 return formatted
248 @staticmethod
249 def format_complementary_content(complementary: list[dict[str, Any]]) -> str:
250 """Format complementary content results for display."""
251 if not complementary:
252 return "🔍 **Complementary Content**\n\nNo complementary content found."
254 formatted = (
255 f"🔗 **Complementary Content** ({len(complementary)} recommendations)\n\n"
256 )
258 for i, content in enumerate(complementary[:5], 1): # Show top 5
259 document = content.get("document", {})
260 relevance = content.get("relevance_score", 0)
262 # Flattened or nested title
263 title_value = content.get("title") or content.get("source_title")
264 if not title_value:
265 if isinstance(document, dict):
266 title_value = document.get("source_title") or "Unknown"
267 else:
268 title_value = getattr(document, "source_title", "Unknown")
269 title_value = title_value or "Unknown"
271 # Reasons and strategy
272 reason = (
273 content.get("reason") or content.get("recommendation_reason")
274 ) or ""
275 if not reason and isinstance(document, dict):
276 reason = document.get("recommendation_reason", "") or document.get(
277 "reason", ""
278 )
279 elif not reason and document is not None:
280 reason = getattr(document, "recommendation_reason", "") or getattr(
281 document, "reason", ""
282 )
283 strategy = content.get("strategy")
285 formatted += f"**{i}. Complementary Score: {relevance:.3f}**\n"
286 formatted += f"• Title: {title_value}\n"
287 if reason:
288 formatted += f"• Why Complementary: {reason}\n"
289 if strategy:
290 formatted += f"• Strategy: {strategy}\n"
292 formatted += "\n"
294 return formatted
296 @staticmethod
297 def format_document_clusters(clusters: dict[str, Any]) -> str:
298 """Format document clustering results for display."""
299 if not clusters or "clusters" not in clusters:
300 return "🗂️ **Document Clustering**\n\nNo clusters found."
302 cluster_list = clusters["clusters"]
303 if not cluster_list:
304 metadata = clusters.get("clustering_metadata", {})
305 message = metadata.get("message", "No clusters found.")
306 return f"🗂️ **Document Clustering**\n\n{message}"
308 formatted = "🗂️ **Document Clustering Results**\n\n"
310 for i, cluster in enumerate(cluster_list[:5], 1): # Show first 5 clusters
311 documents = cluster.get("documents", [])
312 cluster_metadata = (
313 cluster.get("cluster_metadata", {}) if isinstance(cluster, dict) else {}
314 )
315 coherence = (
316 cluster_metadata.get(
317 "coherence_score", cluster.get("coherence_score", 0)
318 )
319 if isinstance(cluster, dict)
320 else 0
321 )
322 centroid_topics = (
323 cluster_metadata.get(
324 "centroid_topics", cluster.get("centroid_topics", [])
325 )
326 if isinstance(cluster, dict)
327 else []
328 )
329 shared_entities = (
330 cluster_metadata.get(
331 "shared_entities", cluster.get("shared_entities", [])
332 )
333 if isinstance(cluster, dict)
334 else []
335 )
336 cluster_summary = cluster.get("cluster_summary", "")
338 cluster_id = (
339 cluster_metadata.get("id", cluster.get("id", f"cluster_{i}"))
340 if isinstance(cluster, dict)
341 else f"cluster_{i}"
342 )
343 formatted += f"**Cluster {i} (ID: {cluster_id})**\n"
344 formatted += f"• Documents: {len(documents)}\n"
345 formatted += f"• Coherence Score: {coherence:.3f}\n"
347 if centroid_topics:
348 formatted += f"• Key Topics: {', '.join(centroid_topics[:3])}\n" # Show top 3 topics
350 if shared_entities:
351 formatted += f"• Shared Entities: {', '.join(shared_entities[:3])}\n" # Show top 3 entities
353 if cluster_summary:
354 formatted += f"• Summary: {cluster_summary}\n"
356 formatted += "\n"
358 # Add summary statistics
359 total_docs = sum(len(cluster.get("documents", [])) for cluster in cluster_list)
360 cluster_count = len(cluster_list)
362 # Compute average coherence using nested cluster_metadata when present;
363 # if no per-cluster coherence is provided at all, fall back to overall_coherence.
364 per_cluster_coherences: list[float] = []
365 any_coherence_present = False
366 for cluster in cluster_list:
367 if not isinstance(cluster, dict):
368 per_cluster_coherences.append(0.0)
369 continue
370 cluster_metadata = cluster.get("cluster_metadata", {}) or {}
371 if "coherence_score" in cluster_metadata:
372 any_coherence_present = True
373 value = cluster_metadata.get("coherence_score")
374 elif "coherence_score" in cluster:
375 any_coherence_present = True
376 value = cluster.get("coherence_score")
377 else:
378 value = 0.0
379 try:
380 per_cluster_coherences.append(float(value))
381 except (TypeError, ValueError):
382 per_cluster_coherences.append(0.0)
384 if cluster_count > 0 and any_coherence_present:
385 avg_coherence = sum(per_cluster_coherences) / cluster_count
386 else:
387 metadata = clusters.get("clustering_metadata", {})
388 try:
389 avg_coherence = float(metadata.get("overall_coherence", 0.0))
390 except (TypeError, ValueError):
391 avg_coherence = 0.0
393 formatted += "📊 **Summary:**\n"
394 formatted += f"• Total Clusters: {len(cluster_list)}\n"
395 formatted += f"• Total Documents: {total_docs}\n"
396 formatted += f"• Average Coherence Score: {avg_coherence:.3f}\n"
398 metadata = clusters.get("clustering_metadata", {})
399 strategy = metadata.get("strategy", "unknown")
400 formatted += f"• Strategy: {strategy}\n"
402 original_query = metadata.get("original_query")
403 if original_query:
404 formatted += f"• Original Query: {original_query}\n"
406 return formatted