Coverage for src/qdrant_loader_mcp_server/mcp/formatters/intelligence.py: 84%

235 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Intelligence Result Formatters - Analysis and Insights Formatting. 

3 

4This module handles formatting of intelligence analysis results including 

5relationship analysis, similarity detection, conflict analysis, and 

6complementary content discovery. 

7""" 

8 

9from typing import Any 

10 

11 

12class IntelligenceResultFormatters: 

13 """Handles intelligence analysis result formatting operations.""" 

14 

15 @staticmethod 

16 def format_relationship_analysis(analysis: dict[str, Any]) -> str: 

17 """Format document relationship analysis for display.""" 

18 if "error" in analysis: 

19 return f"❌ Error: {analysis['error']}" 

20 

21 summary = analysis.get("summary", {}) 

22 formatted = f"""🔍 **Document Relationship Analysis** 

23 

24📊 **Summary:** 

25• Total Documents: {summary.get('total_documents', 0)} 

26• Clusters Found: {summary.get('clusters_found', 0)} 

27• Citation Relationships: {summary.get('citation_relationships', 0)} 

28• Conflicts Detected: {summary.get('conflicts_detected', 0)} 

29 

30🏷️ **Query Information:** 

31• Original Query: {analysis.get('query_metadata', {}).get('original_query', 'N/A')} 

32• Documents Analyzed: {analysis.get('query_metadata', {}).get('document_count', 0)} 

33""" 

34 

35 # Accept multiple shapes for clusters 

36 clusters_candidate = None 

37 for key in ( 

38 "document_clusters", 

39 "topic_clusters", 

40 "entity_clusters", 

41 "clusters", 

42 ): 

43 value = analysis.get(key) 

44 if value: 

45 clusters_candidate = value 

46 break 

47 

48 cluster_list: list[Any] = [] 

49 if isinstance(clusters_candidate, list): 

50 cluster_list = clusters_candidate 

51 elif isinstance(clusters_candidate, dict): 

52 # Some producers return a dict of clusters 

53 cluster_list = list(clusters_candidate.values()) 

54 

55 if cluster_list: 

56 formatted += "\n🗂️ **Document Clusters:**\n" 

57 for i, cluster in enumerate(cluster_list[:3], 1): # Show first 3 clusters 

58 count = 0 

59 if isinstance(cluster, dict): 

60 items = ( 

61 cluster.get("documents") 

62 or cluster.get("items") 

63 or cluster.get("members") 

64 ) 

65 if isinstance(items, list): 

66 count = len(items) 

67 else: 

68 try: 

69 count = len(cluster) 

70 except Exception: 

71 count = 0 

72 else: 

73 try: 

74 count = len(cluster) # type: ignore[arg-type] 

75 except Exception: 

76 count = 0 

77 

78 formatted += f"• Cluster {i}: {count} documents\n" 

79 

80 # Aggregate conflicts across possible locations/shapes 

81 conflict_lists: list[list[Any]] = [] 

82 conflict_analysis = analysis.get("conflict_analysis", {}) or {} 

83 for key in ("conflicting_pairs", "conflicts"): 

84 lst = conflict_analysis.get(key) 

85 if isinstance(lst, list): 

86 conflict_lists.append(lst) 

87 

88 for key in ("conflicts", "conflicting_pairs"): 

89 lst = analysis.get(key) 

90 if isinstance(lst, list): 

91 conflict_lists.append(lst) 

92 

93 entity_relationships = analysis.get("entity_relationships", {}) or {} 

94 for key in ("conflicting_pairs", "conflicts"): 

95 lst = entity_relationships.get(key) 

96 if isinstance(lst, list): 

97 conflict_lists.append(lst) 

98 

99 total_conflicts = sum(len(lst) for lst in conflict_lists) 

100 if total_conflicts: 

101 formatted += f"\n⚠️ **Conflicts Detected:** {total_conflicts} conflicting document pairs\n" 

102 

103 return formatted 

104 

105 @staticmethod 

106 def format_similar_documents(similar_docs: list[dict[str, Any]]) -> str: 

107 """Format similar documents results for display.""" 

108 if not similar_docs: 

109 return "🔍 **Similar Documents**\n\nNo similar documents found." 

110 

111 formatted = f"🔍 **Similar Documents** ({len(similar_docs)} found)\n\n" 

112 

113 for i, doc_info in enumerate(similar_docs[:5], 1): # Show top 5 

114 # Robust similarity score extraction 

115 score_value: Any = None 

116 for key in ("overall_similarity", "similarity_score"): 

117 if key in doc_info: 

118 score_value = doc_info.get(key) 

119 break 

120 if score_value is None: 

121 similarity_scores = doc_info.get("similarity_scores") 

122 if isinstance(similarity_scores, dict): 

123 if "overall" in similarity_scores: 

124 score_value = similarity_scores.get("overall") 

125 else: 

126 for v in similarity_scores.values(): 

127 if isinstance(v, int | float): 

128 score_value = v 

129 break 

130 elif isinstance(similarity_scores, list): 

131 for v in similarity_scores: 

132 if isinstance(v, int | float): 

133 score_value = v 

134 break 

135 try: 

136 score = float(score_value) if score_value is not None else 0.0 

137 except (TypeError, ValueError): 

138 score = 0.0 

139 

140 document = doc_info.get("document", {}) 

141 

142 # Title extraction: document.source_title -> document.title -> top-level 

143 title_value = None 

144 if isinstance(document, dict): 

145 title_value = document.get("source_title") or document.get("title") 

146 else: 

147 title_value = getattr(document, "source_title", None) or getattr( 

148 document, "title", None 

149 ) 

150 if not title_value: 

151 title_value = doc_info.get("source_title") or doc_info.get("title") 

152 

153 # Reasons extraction: prefer list but normalize strings 

154 reasons_value = ( 

155 doc_info.get("similarity_reasons") 

156 or doc_info.get("reason") 

157 or doc_info.get("explanations") 

158 or doc_info.get("reasons") 

159 ) 

160 reasons_list: list[str] = [] 

161 if isinstance(reasons_value, list): 

162 reasons_list = [str(r) for r in reasons_value] 

163 elif isinstance(reasons_value, str): 

164 reasons_list = [reasons_value] 

165 

166 formatted += f"**{i}. Similarity Score: {score:.3f}**\n" 

167 if title_value: 

168 formatted += f"• Title: {title_value}\n" 

169 if reasons_list: 

170 formatted += f"• Reasons: {', '.join(reasons_list)}\n" 

171 formatted += "\n" 

172 

173 return formatted 

174 

175 @staticmethod 

176 def format_conflict_analysis(conflicts: dict[str, Any]) -> str: 

177 """Format conflict analysis results for display.""" 

178 # Handle both new format ("conflicts") and old format ("conflicting_pairs") 

179 conflict_list = conflicts.get("conflicts", []) 

180 conflicting_pairs = conflicts.get("conflicting_pairs", []) 

181 

182 # Use whichever format is provided 

183 if conflicting_pairs: 

184 conflict_list = conflicting_pairs 

185 

186 if not conflicts or not conflict_list: 

187 return ( 

188 "✅ **Conflict Analysis**\n\nNo conflicts detected between documents." 

189 ) 

190 

191 formatted = ( 

192 f"⚠️ **Conflict Analysis** ({len(conflict_list)} conflicts found)\n\n" 

193 ) 

194 

195 for i, conflict in enumerate(conflict_list[:3], 1): # Show top 3 conflicts 

196 # Handle tuple format (doc1, doc2, metadata) or dict format 

197 if isinstance(conflict, tuple) and len(conflict) == 3: 

198 doc1_name, doc2_name, metadata = conflict 

199 conflict_type = metadata.get("type", "unknown") 

200 doc1_title = doc1_name 

201 doc2_title = doc2_name 

202 else: 

203 # Dict format 

204 doc1 = conflict.get("document_1", {}) 

205 doc2 = conflict.get("document_2", {}) 

206 doc1_title = ( 

207 doc1.get("title", "Unknown") 

208 if isinstance(doc1, dict) 

209 else str(doc1) 

210 ) 

211 doc2_title = ( 

212 doc2.get("title", "Unknown") 

213 if isinstance(doc2, dict) 

214 else str(doc2) 

215 ) 

216 # severity currently unused in formatted output 

217 conflict_type = conflict.get("conflict_type", "unknown") 

218 

219 formatted += f"**{i}. Conflict Type: {conflict_type}**\n" 

220 formatted += f"• Document 1: {doc1_title}\n" 

221 formatted += f"• Document 2: {doc2_title}\n" 

222 

223 # Only check for conflicting_statements in dict format 

224 if isinstance(conflict, dict) and "conflicting_statements" in conflict: 

225 statements = conflict["conflicting_statements"] 

226 if statements: 

227 formatted += f"• Conflicting statements found: {len(statements)}\n" 

228 

229 formatted += "\n" 

230 

231 # Add resolution suggestions if available 

232 suggestions = conflicts.get("resolution_suggestions", {}) 

233 if suggestions: 

234 formatted += "💡 **Resolution Suggestions:**\n" 

235 if isinstance(suggestions, dict): 

236 # Handle dict format 

237 for _key, suggestion in list(suggestions.items())[ 

238 :2 

239 ]: # Show top 2 suggestions 

240 formatted += f"{suggestion}\n" 

241 else: 

242 # Handle list format 

243 for suggestion in suggestions[:2]: # Show top 2 suggestions 

244 formatted += f"{suggestion}\n" 

245 

246 return formatted 

247 

248 @staticmethod 

249 def format_complementary_content(complementary: list[dict[str, Any]]) -> str: 

250 """Format complementary content results for display.""" 

251 if not complementary: 

252 return "🔍 **Complementary Content**\n\nNo complementary content found." 

253 

254 formatted = ( 

255 f"🔗 **Complementary Content** ({len(complementary)} recommendations)\n\n" 

256 ) 

257 

258 for i, content in enumerate(complementary[:5], 1): # Show top 5 

259 document = content.get("document", {}) 

260 relevance = content.get("relevance_score", 0) 

261 

262 # Flattened or nested title 

263 title_value = content.get("title") or content.get("source_title") 

264 if not title_value: 

265 if isinstance(document, dict): 

266 title_value = document.get("source_title") or "Unknown" 

267 else: 

268 title_value = getattr(document, "source_title", "Unknown") 

269 title_value = title_value or "Unknown" 

270 

271 # Reasons and strategy 

272 reason = ( 

273 content.get("reason") or content.get("recommendation_reason") 

274 ) or "" 

275 if not reason and isinstance(document, dict): 

276 reason = document.get("recommendation_reason", "") or document.get( 

277 "reason", "" 

278 ) 

279 elif not reason and document is not None: 

280 reason = getattr(document, "recommendation_reason", "") or getattr( 

281 document, "reason", "" 

282 ) 

283 strategy = content.get("strategy") 

284 

285 formatted += f"**{i}. Complementary Score: {relevance:.3f}**\n" 

286 formatted += f"• Title: {title_value}\n" 

287 if reason: 

288 formatted += f"• Why Complementary: {reason}\n" 

289 if strategy: 

290 formatted += f"• Strategy: {strategy}\n" 

291 

292 formatted += "\n" 

293 

294 return formatted 

295 

296 @staticmethod 

297 def format_document_clusters(clusters: dict[str, Any]) -> str: 

298 """Format document clustering results for display.""" 

299 if not clusters or "clusters" not in clusters: 

300 return "🗂️ **Document Clustering**\n\nNo clusters found." 

301 

302 cluster_list = clusters["clusters"] 

303 if not cluster_list: 

304 metadata = clusters.get("clustering_metadata", {}) 

305 message = metadata.get("message", "No clusters found.") 

306 return f"🗂️ **Document Clustering**\n\n{message}" 

307 

308 formatted = "🗂️ **Document Clustering Results**\n\n" 

309 

310 for i, cluster in enumerate(cluster_list[:5], 1): # Show first 5 clusters 

311 documents = cluster.get("documents", []) 

312 cluster_metadata = ( 

313 cluster.get("cluster_metadata", {}) if isinstance(cluster, dict) else {} 

314 ) 

315 coherence = ( 

316 cluster_metadata.get( 

317 "coherence_score", cluster.get("coherence_score", 0) 

318 ) 

319 if isinstance(cluster, dict) 

320 else 0 

321 ) 

322 centroid_topics = ( 

323 cluster_metadata.get( 

324 "centroid_topics", cluster.get("centroid_topics", []) 

325 ) 

326 if isinstance(cluster, dict) 

327 else [] 

328 ) 

329 shared_entities = ( 

330 cluster_metadata.get( 

331 "shared_entities", cluster.get("shared_entities", []) 

332 ) 

333 if isinstance(cluster, dict) 

334 else [] 

335 ) 

336 cluster_summary = cluster.get("cluster_summary", "") 

337 

338 cluster_id = ( 

339 cluster_metadata.get("id", cluster.get("id", f"cluster_{i}")) 

340 if isinstance(cluster, dict) 

341 else f"cluster_{i}" 

342 ) 

343 formatted += f"**Cluster {i} (ID: {cluster_id})**\n" 

344 formatted += f"• Documents: {len(documents)}\n" 

345 formatted += f"• Coherence Score: {coherence:.3f}\n" 

346 

347 if centroid_topics: 

348 formatted += f"• Key Topics: {', '.join(centroid_topics[:3])}\n" # Show top 3 topics 

349 

350 if shared_entities: 

351 formatted += f"• Shared Entities: {', '.join(shared_entities[:3])}\n" # Show top 3 entities 

352 

353 if cluster_summary: 

354 formatted += f"• Summary: {cluster_summary}\n" 

355 

356 formatted += "\n" 

357 

358 # Add summary statistics 

359 total_docs = sum(len(cluster.get("documents", [])) for cluster in cluster_list) 

360 cluster_count = len(cluster_list) 

361 

362 # Compute average coherence using nested cluster_metadata when present; 

363 # if no per-cluster coherence is provided at all, fall back to overall_coherence. 

364 per_cluster_coherences: list[float] = [] 

365 any_coherence_present = False 

366 for cluster in cluster_list: 

367 if not isinstance(cluster, dict): 

368 per_cluster_coherences.append(0.0) 

369 continue 

370 cluster_metadata = cluster.get("cluster_metadata", {}) or {} 

371 if "coherence_score" in cluster_metadata: 

372 any_coherence_present = True 

373 value = cluster_metadata.get("coherence_score") 

374 elif "coherence_score" in cluster: 

375 any_coherence_present = True 

376 value = cluster.get("coherence_score") 

377 else: 

378 value = 0.0 

379 try: 

380 per_cluster_coherences.append(float(value)) 

381 except (TypeError, ValueError): 

382 per_cluster_coherences.append(0.0) 

383 

384 if cluster_count > 0 and any_coherence_present: 

385 avg_coherence = sum(per_cluster_coherences) / cluster_count 

386 else: 

387 metadata = clusters.get("clustering_metadata", {}) 

388 try: 

389 avg_coherence = float(metadata.get("overall_coherence", 0.0)) 

390 except (TypeError, ValueError): 

391 avg_coherence = 0.0 

392 

393 formatted += "📊 **Summary:**\n" 

394 formatted += f"• Total Clusters: {len(cluster_list)}\n" 

395 formatted += f"• Total Documents: {total_docs}\n" 

396 formatted += f"• Average Coherence Score: {avg_coherence:.3f}\n" 

397 

398 metadata = clusters.get("clustering_metadata", {}) 

399 strategy = metadata.get("strategy", "unknown") 

400 formatted += f"• Strategy: {strategy}\n" 

401 

402 original_query = metadata.get("original_query") 

403 if original_query: 

404 formatted += f"• Original Query: {original_query}\n" 

405 

406 return formatted