Coverage for src / qdrant_loader_mcp_server / mcp / formatters / lightweight.py: 100%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1""" 

2Lightweight Result Formatters - Efficient Result Construction. 

3 

4This module handles the creation of lightweight, efficient result structures 

5for MCP responses, optimizing for minimal data transfer and fast processing. 

6""" 

7 

8from typing import Any 

9 

10from ...search.components.search_result_models import HybridSearchResult 

11from .utils import FormatterUtils 

12 

13 

14class LightweightResultFormatters: 

15 """Handles lightweight result construction operations.""" 

16 

17 @staticmethod 

18 def create_lightweight_similar_documents_results( 

19 similar_docs: list[dict[str, Any]], 

20 target_query: str = "", 

21 comparison_query: str = "", 

22 ) -> dict[str, Any]: 

23 """Create lightweight similar documents results.""" 

24 return { 

25 "similarity_index": [ 

26 { 

27 # Support dict or object for document field 

28 **( 

29 lambda document: { 

30 "document_id": ( 

31 document.get("document_id", "") 

32 if isinstance(document, dict) 

33 else getattr(document, "document_id", "") 

34 ), 

35 "title": ( 

36 document.get("source_title", "Untitled") 

37 if isinstance(document, dict) 

38 else getattr(document, "source_title", "Untitled") 

39 ), 

40 "navigation_hints": { 

41 "can_expand": True, 

42 "has_children": ( 

43 document.get("has_children", False) 

44 if isinstance(document, dict) 

45 else getattr(document, "has_children", False) 

46 ), 

47 }, 

48 } 

49 )(doc_info.get("document", {})), 

50 "similarity_score": doc_info.get("similarity_score", 0), 

51 "similarity_info": { 

52 "metric_scores": doc_info.get("metric_scores", {}), 

53 "reasons": doc_info.get("similarity_reasons", []), 

54 }, 

55 } 

56 for doc_info in similar_docs[:10] # Limit to top 10 

57 ], 

58 "query_info": { 

59 "target_query": target_query, 

60 "comparison_query": comparison_query, 

61 "total_found": len(similar_docs), 

62 }, 

63 "navigation": { 

64 "total_found": len(similar_docs), 

65 "showing": min(len(similar_docs), 10), 

66 }, 

67 # Keep legacy fields for backward compatibility 

68 "target_query": target_query, 

69 "comparison_query": comparison_query, 

70 "similar_documents": [ 

71 { 

72 "document": ( 

73 FormatterUtils.extract_minimal_doc_fields(doc) 

74 if (doc := doc_info.get("document")) is not None 

75 else { 

76 "document_id": "", 

77 "title": "Untitled", 

78 "source_type": "unknown", 

79 "score": 0.0, 

80 } 

81 ), 

82 "similarity_score": doc_info.get("similarity_score", 0), 

83 "similarity_reasons": doc_info.get("similarity_reasons") or [], 

84 } 

85 for doc_info in similar_docs[:10] # Limit to top 10 

86 ], 

87 "total_found": len(similar_docs), 

88 } 

89 

90 @staticmethod 

91 def create_lightweight_conflict_results( 

92 conflicts: dict[str, Any], 

93 query: str = "", 

94 documents: list[Any] | None = None, 

95 ) -> dict[str, Any]: 

96 """Create lightweight conflict analysis results.""" 

97 # Handle both new format ("conflicts") and old format ("conflicting_pairs") 

98 conflict_list = conflicts.get("conflicts", []) 

99 conflicting_pairs = conflicts.get("conflicting_pairs", []) 

100 

101 processed_conflicts = [] 

102 

103 # Process conflicting_pairs format (tuples) 

104 for pair in conflicting_pairs: 

105 if len(pair) >= 3: 

106 doc1_id, doc2_id, conflict_info = pair[0], pair[1], pair[2] 

107 processed_conflicts.append( 

108 { 

109 "conflict_type": conflict_info.get("type", "unknown"), 

110 "conflict_score": conflict_info.get("confidence", 0.0), 

111 "conflict_description": conflict_info.get("description", ""), 

112 "conflicting_statements": conflict_info.get( 

113 "structured_indicators", [] 

114 ), 

115 "document_1_id": doc1_id, 

116 "document_2_id": doc2_id, 

117 } 

118 ) 

119 

120 # Process conflicts format (dicts) 

121 for conflict in conflict_list: 

122 processed_conflicts.append( 

123 { 

124 "conflict_type": conflict.get("conflict_type", "unknown"), 

125 "conflict_score": conflict.get( 

126 "confidence", conflict.get("severity_score", 0.0) 

127 ), 

128 "conflict_description": conflict.get("description", ""), 

129 "conflicting_statements": FormatterUtils.extract_conflicting_statements( 

130 conflict 

131 ), 

132 } 

133 ) 

134 

135 return { 

136 "conflicts_detected": processed_conflicts[:5], # Limit to top 5 

137 "conflict_summary": { 

138 "total_conflicts": len(processed_conflicts), 

139 "avg_confidence": ( 

140 sum(c.get("conflict_score", 0) for c in processed_conflicts) 

141 / len(processed_conflicts) 

142 if processed_conflicts 

143 else 0 

144 ), 

145 "conflict_types": list( 

146 {c.get("conflict_type", "unknown") for c in processed_conflicts} 

147 ), 

148 }, 

149 "analysis_metadata": { 

150 "query": query, 

151 "document_count": conflicts.get("query_metadata", {}).get( 

152 "document_count", len(documents) if documents else 0 

153 ), 

154 "analysis_depth": "lightweight", 

155 }, 

156 "navigation": { 

157 "total_found": len(processed_conflicts), 

158 "showing": min(len(processed_conflicts), 5), 

159 "has_more": len(processed_conflicts) > 5, 

160 }, 

161 # Keep legacy fields for backward compatibility 

162 "query": query, 

163 "conflicts": processed_conflicts[:5], 

164 "resolution_suggestions": conflicts.get("resolution_suggestions", []), 

165 "total_conflicts": len(processed_conflicts), 

166 } 

167 

168 @staticmethod 

169 def create_lightweight_cluster_results( 

170 clusters: dict[str, Any], 

171 query: str = "", 

172 ) -> dict[str, Any]: 

173 """Create lightweight document clustering results.""" 

174 cluster_list = clusters.get("clusters", []) 

175 clustering_metadata = clusters.get("clustering_metadata", {}) 

176 

177 formatted_clusters = [] 

178 for cluster in cluster_list[:8]: # Limit to 8 clusters 

179 formatted_clusters.append( 

180 { 

181 "cluster_id": cluster.get( 

182 "id", f"cluster_{len(formatted_clusters) + 1}" 

183 ), 

184 "cluster_name": cluster.get( 

185 "name", f"Cluster {len(formatted_clusters) + 1}" 

186 ), 

187 "coherence_score": cluster.get("coherence_score", 0), 

188 "document_count": len(cluster.get("documents", [])), 

189 "documents": [ 

190 FormatterUtils.extract_minimal_doc_fields(doc) 

191 for doc in cluster.get("documents", [])[ 

192 :5 

193 ] # Limit docs per cluster 

194 ], 

195 "cluster_themes": cluster.get( 

196 "shared_entities", cluster.get("cluster_themes", []) 

197 ), 

198 "centroid_topics": cluster.get("centroid_topics", []), 

199 } 

200 ) 

201 

202 return { 

203 "cluster_index": formatted_clusters, 

204 "clustering_metadata": { 

205 "strategy": clustering_metadata.get("strategy", "unknown"), 

206 "total_documents": clustering_metadata.get( 

207 "total_documents", 

208 sum(len(cluster.get("documents", [])) for cluster in cluster_list), 

209 ), 

210 "clusters_created": clustering_metadata.get( 

211 "clusters_created", len(cluster_list) 

212 ), 

213 "query": query, 

214 "analysis_depth": "lightweight", 

215 }, 

216 "expansion_info": { 

217 "total_clusters": len(cluster_list), 

218 "showing": len(formatted_clusters), 

219 "can_expand": len(cluster_list) > len(formatted_clusters), 

220 "documents_per_cluster": 5, # Max docs shown per cluster 

221 }, 

222 # Keep legacy fields for backward compatibility 

223 "query": query, 

224 "clusters": [ 

225 { 

226 "cluster_id": cluster.get("id", f"cluster_{i + 1}"), 

227 "documents": [ 

228 FormatterUtils.extract_minimal_doc_fields(doc) 

229 for doc in cluster.get("documents", [])[ 

230 :5 

231 ] # Limit docs per cluster 

232 ], 

233 "cluster_themes": cluster.get( 

234 "shared_entities", cluster.get("cluster_themes", []) 

235 ), 

236 "coherence_score": cluster.get("coherence_score", 0), 

237 "document_count": len(cluster.get("documents", [])), 

238 } 

239 for i, cluster in enumerate(cluster_list[:8]) # Limit to 8 clusters 

240 ], 

241 "total_clusters": len(cluster_list), 

242 "total_documents": sum( 

243 len(cluster.get("documents", [])) for cluster in cluster_list 

244 ), 

245 } 

246 

247 @staticmethod 

248 def create_lightweight_hierarchy_results( 

249 filtered_results: list[HybridSearchResult], 

250 organized_results: dict[str, list[HybridSearchResult]], 

251 query: str = "", 

252 ) -> dict[str, Any]: 

253 """Create lightweight hierarchical results.""" 

254 hierarchy_groups_data = [] 

255 hierarchy_index_data = [] 

256 

257 for group_name, results in organized_results.items(): 

258 clean_group_name = FormatterUtils.generate_clean_group_name( 

259 group_name, results 

260 ) 

261 documents_data = [ 

262 { 

263 **FormatterUtils.extract_minimal_doc_fields(result), 

264 "depth": FormatterUtils.extract_synthetic_depth(result), 

265 "has_children": FormatterUtils.extract_has_children(result), 

266 "parent_title": FormatterUtils.extract_synthetic_parent_title( 

267 result 

268 ), 

269 } 

270 for result in results[:10] # Limit per group 

271 ] 

272 # Calculate depth range for the group 

273 depths = [ 

274 FormatterUtils.extract_synthetic_depth(result) for result in results 

275 ] 

276 depth_range = [min(depths), max(depths)] if depths else [0, 0] 

277 

278 group_data = { 

279 "group_key": group_name, # Original key 

280 "group_name": clean_group_name, # Clean display name 

281 "documents": documents_data, 

282 "document_ids": [doc["document_id"] for doc in documents_data], 

283 "depth_range": depth_range, 

284 "total_documents": len(results), 

285 } 

286 hierarchy_groups_data.append(group_data) 

287 

288 # Create index entries as individual documents for compatibility 

289 for result in results: 

290 hierarchy_index_data.append( 

291 { 

292 "document_id": getattr( 

293 result, "document_id", f"doc_{id(result)}" 

294 ), 

295 "title": getattr(result, "title", "Untitled"), 

296 "score": getattr(result, "score", 0.0), 

297 "hierarchy_info": { 

298 "depth": FormatterUtils.extract_synthetic_depth(result), 

299 "has_children": FormatterUtils.extract_has_children(result), 

300 "parent_title": FormatterUtils.extract_synthetic_parent_title( 

301 result 

302 ), 

303 "group_name": clean_group_name, 

304 "source_type": getattr(result, "source_type", "unknown"), 

305 }, 

306 "navigation_hints": { 

307 "breadcrumb": FormatterUtils.extract_synthetic_parent_title( 

308 result 

309 ), 

310 "level": FormatterUtils.extract_synthetic_depth(result), 

311 "group": clean_group_name, 

312 "siblings_count": len(results) 

313 - 1, # Other docs in same group 

314 "children_count": 0, # Default, could be enhanced with actual child detection 

315 }, 

316 } 

317 ) 

318 

319 return { 

320 "hierarchy_index": hierarchy_index_data, 

321 "hierarchy_groups": hierarchy_groups_data, 

322 "total_found": len(filtered_results), 

323 "query_metadata": { 

324 "query": query, 

325 "search_query": query, # Alias for compatibility 

326 "total_documents": len(filtered_results), 

327 "total_groups": len(organized_results), 

328 "analysis_type": "hierarchy", 

329 "source_types_found": list( 

330 { 

331 getattr(result, "source_type", "unknown") 

332 for result in filtered_results 

333 } 

334 ), 

335 }, 

336 # Keep legacy fields for backward compatibility 

337 "query": query, 

338 "total_groups": len(organized_results), 

339 } 

340 

341 @staticmethod 

342 def create_lightweight_complementary_results( 

343 complementary_recommendations: list[dict[str, Any]], 

344 target_document: "HybridSearchResult | None" = None, 

345 context_documents_analyzed: int = 0, 

346 target_query: str = "", 

347 ) -> dict[str, Any]: 

348 """Create lightweight complementary content results.""" 

349 result: dict[str, Any] = { 

350 "target_query": target_query, 

351 "complementary_index": [ 

352 { 

353 "document_id": getattr( 

354 rec.get("document"), "document_id", rec.get("document_id") 

355 ), 

356 "title": getattr( 

357 rec.get("document"), 

358 "source_title", 

359 rec.get("title", "Untitled"), 

360 ), 

361 "complementary_score": rec.get("relevance_score", 0), 

362 "complementary_reason": rec.get( 

363 "recommendation_reason", rec.get("reason", "") 

364 ), 

365 "relationship_type": rec.get("strategy", ""), 

366 "basic_metadata": ( 

367 lambda doc_obj, rec_dict: { 

368 "source_type": ( 

369 getattr(doc_obj, "source_type", None) 

370 if doc_obj is not None 

371 else None 

372 ) 

373 or rec_dict.get("source_type") 

374 or ( 

375 doc_obj.get("source_type") 

376 if isinstance(doc_obj, dict) 

377 else None 

378 ) 

379 or "unknown", 

380 "project_id": ( 

381 getattr(doc_obj, "project_id", None) 

382 if doc_obj is not None 

383 else None 

384 ) 

385 or rec_dict.get("project_id") 

386 or ( 

387 doc_obj.get("project_id") 

388 if isinstance(doc_obj, dict) 

389 else None 

390 ), 

391 } 

392 )(rec.get("document"), rec), 

393 } 

394 for rec in complementary_recommendations 

395 ], 

396 "complementary_recommendations": [ 

397 { 

398 "document": { 

399 "document_id": rec.get("document_id"), 

400 "title": rec.get("title"), 

401 "source_type": rec.get("source_type", "unknown"), 

402 "score": rec.get("relevance_score", 0), 

403 }, 

404 "relationship_type": "related", 

405 "relevance_score": rec.get("relevance_score", 0), 

406 "reasons": [rec.get("reason", "")] if rec.get("reason") else [], 

407 } 

408 for rec in complementary_recommendations[:8] # Limit to 8 

409 ], 

410 "context_documents_analyzed": context_documents_analyzed, 

411 "total_recommendations": len(complementary_recommendations), 

412 "complementary_summary": { 

413 "total_found": len(complementary_recommendations), 

414 "complementary_found": len(complementary_recommendations), 

415 "total_analyzed": context_documents_analyzed, 

416 "average_score": ( 

417 sum( 

418 rec.get("relevance_score", 0) 

419 for rec in complementary_recommendations 

420 ) 

421 / len(complementary_recommendations) 

422 if complementary_recommendations 

423 else 0 

424 ), 

425 "strategies_used": list( 

426 { 

427 rec.get("strategy", "unknown") 

428 for rec in complementary_recommendations 

429 } 

430 ), 

431 }, 

432 "lazy_loading_enabled": False, 

433 "expand_document_hint": "Use tools/call with 'expand_document' and document_id to get full document details", 

434 } 

435 

436 # Only include target_document if available; shape must match schema 

437 if target_document is not None: 

438 if isinstance(target_document, dict): 

439 result["target_document"] = { 

440 "document_id": target_document.get( 

441 "document_id", target_document.get("id", "") 

442 ), 

443 "title": target_document.get("title", "Untitled"), 

444 "content_preview": target_document.get("content_preview", ""), 

445 "source_type": target_document.get("source_type", "unknown"), 

446 } 

447 else: 

448 # Assume HybridSearchResult-like object 

449 title_val = ( 

450 target_document.get_display_title() 

451 if hasattr(target_document, "get_display_title") 

452 else getattr(target_document, "source_title", "Untitled") 

453 ) 

454 text_val = getattr(target_document, "text", "") or "" 

455 result["target_document"] = { 

456 "document_id": getattr( 

457 target_document, 

458 "document_id", 

459 getattr(target_document, "id", ""), 

460 ), 

461 "title": title_val, 

462 "content_preview": ( 

463 text_val[:200] + "..." 

464 if isinstance(text_val, str) and len(text_val) > 200 

465 else text_val 

466 ), 

467 "source_type": getattr(target_document, "source_type", "unknown"), 

468 } 

469 

470 return result 

471 

472 @staticmethod 

473 def create_lightweight_attachment_results( 

474 filtered_results: list[HybridSearchResult], 

475 attachment_filter: dict[str, Any], 

476 query: str = "", 

477 ) -> dict[str, Any]: 

478 """Create lightweight attachment results.""" 

479 # Filter only attachment results 

480 attachment_results = [ 

481 result 

482 for result in filtered_results 

483 if getattr(result, "is_attachment", False) 

484 ] 

485 

486 # Group by file type for organized display 

487 organized_attachments = {} 

488 for result in attachment_results: 

489 file_type = FormatterUtils.extract_file_type_minimal(result) 

490 if file_type not in organized_attachments: 

491 organized_attachments[file_type] = [] 

492 organized_attachments[file_type].append(result) 

493 

494 # Create attachment index 

495 attachment_index = [ 

496 { 

497 "document_id": getattr(result, "document_id", ""), 

498 "title": getattr(result, "source_title", "Untitled"), 

499 "attachment_info": { 

500 "filename": FormatterUtils.extract_safe_filename(result), 

501 "file_type": FormatterUtils.extract_file_type_minimal(result), 

502 "file_size": getattr(result, "file_size", None), 

503 }, 

504 "score": getattr(result, "score", 0.0), 

505 "source_url": getattr(result, "source_url", None), 

506 } 

507 for result in attachment_results[:20] # Limit to top 20 

508 ] 

509 

510 # Create attachment groups 

511 attachment_groups = [ 

512 { 

513 "file_type": file_type, 

514 "attachments": [ 

515 { 

516 **FormatterUtils.extract_minimal_doc_fields(result), 

517 "filename": FormatterUtils.extract_safe_filename(result), 

518 "file_type": FormatterUtils.extract_file_type_minimal(result), 

519 } 

520 for result in results[:15] # Limit per group 

521 ], 

522 "total_attachments": len(results), 

523 } 

524 for file_type, results in organized_attachments.items() 

525 ] 

526 

527 return { 

528 "attachment_index": attachment_index, 

529 "attachment_groups": attachment_groups, 

530 "total_found": len(attachment_results), 

531 "query_metadata": { 

532 "query": query, 

533 "filter": attachment_filter, 

534 "total_attachments": len(attachment_results), 

535 "file_types": list(organized_attachments.keys()), 

536 }, 

537 # Keep legacy fields for backward compatibility 

538 "query": query, 

539 "total_groups": len(organized_attachments), 

540 }