Coverage for src/qdrant_loader_mcp_server/mcp/formatters/lightweight.py: 100%

63 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Lightweight Result Formatters - Efficient Result Construction. 

3 

4This module handles the creation of lightweight, efficient result structures 

5for MCP responses, optimizing for minimal data transfer and fast processing. 

6""" 

7 

8from typing import Any 

9 

10from ...search.components.search_result_models import HybridSearchResult 

11from .utils import FormatterUtils 

12 

13 

14class LightweightResultFormatters: 

15 """Handles lightweight result construction operations.""" 

16 

17 @staticmethod 

18 def create_lightweight_similar_documents_results( 

19 similar_docs: list[dict[str, Any]], 

20 target_query: str = "", 

21 comparison_query: str = "", 

22 ) -> dict[str, Any]: 

23 """Create lightweight similar documents results.""" 

24 return { 

25 "similarity_index": [ 

26 { 

27 # Support dict or object for document field 

28 **( 

29 lambda document: { 

30 "document_id": ( 

31 document.get("document_id", "") 

32 if isinstance(document, dict) 

33 else getattr(document, "document_id", "") 

34 ), 

35 "title": ( 

36 document.get("source_title", "Untitled") 

37 if isinstance(document, dict) 

38 else getattr(document, "source_title", "Untitled") 

39 ), 

40 "navigation_hints": { 

41 "can_expand": True, 

42 "has_children": ( 

43 document.get("has_children", False) 

44 if isinstance(document, dict) 

45 else getattr(document, "has_children", False) 

46 ), 

47 }, 

48 } 

49 )(doc_info.get("document", {})), 

50 "similarity_score": doc_info.get("similarity_score", 0), 

51 "similarity_info": { 

52 "metric_scores": doc_info.get("metric_scores", {}), 

53 "reasons": doc_info.get("similarity_reasons", []), 

54 }, 

55 } 

56 for doc_info in similar_docs[:10] # Limit to top 10 

57 ], 

58 "query_info": { 

59 "target_query": target_query, 

60 "comparison_query": comparison_query, 

61 "total_found": len(similar_docs), 

62 }, 

63 "navigation": { 

64 "total_found": len(similar_docs), 

65 "showing": min(len(similar_docs), 10), 

66 }, 

67 # Keep legacy fields for backward compatibility 

68 "target_query": target_query, 

69 "comparison_query": comparison_query, 

70 "similar_documents": [ 

71 { 

72 "document": FormatterUtils.extract_minimal_doc_fields( 

73 doc_info.get("document") 

74 ), 

75 "similarity_score": doc_info.get("similarity_score", 0), 

76 "similarity_reasons": doc_info.get("similarity_reasons", []), 

77 } 

78 for doc_info in similar_docs[:10] # Limit to top 10 

79 ], 

80 "total_found": len(similar_docs), 

81 } 

82 

83 @staticmethod 

84 def create_lightweight_conflict_results( 

85 conflicts: dict[str, Any], 

86 query: str = "", 

87 documents: list = None, 

88 ) -> dict[str, Any]: 

89 """Create lightweight conflict analysis results.""" 

90 # Handle both new format ("conflicts") and old format ("conflicting_pairs") 

91 conflict_list = conflicts.get("conflicts", []) 

92 conflicting_pairs = conflicts.get("conflicting_pairs", []) 

93 

94 processed_conflicts = [] 

95 

96 # Process conflicting_pairs format (tuples) 

97 for pair in conflicting_pairs: 

98 if len(pair) >= 3: 

99 doc1_id, doc2_id, conflict_info = pair[0], pair[1], pair[2] 

100 processed_conflicts.append( 

101 { 

102 "conflict_type": conflict_info.get("type", "unknown"), 

103 "conflict_score": conflict_info.get("confidence", 0.0), 

104 "conflict_description": conflict_info.get("description", ""), 

105 "conflicting_statements": conflict_info.get( 

106 "structured_indicators", [] 

107 ), 

108 "document_1_id": doc1_id, 

109 "document_2_id": doc2_id, 

110 } 

111 ) 

112 

113 # Process conflicts format (dicts) 

114 for conflict in conflict_list: 

115 processed_conflicts.append( 

116 { 

117 "conflict_type": conflict.get("conflict_type", "unknown"), 

118 "conflict_score": conflict.get( 

119 "confidence", conflict.get("severity_score", 0.0) 

120 ), 

121 "conflict_description": conflict.get("description", ""), 

122 "conflicting_statements": FormatterUtils.extract_conflicting_statements( 

123 conflict 

124 ), 

125 } 

126 ) 

127 

128 return { 

129 "conflicts_detected": processed_conflicts[:5], # Limit to top 5 

130 "conflict_summary": { 

131 "total_conflicts": len(processed_conflicts), 

132 "avg_confidence": ( 

133 sum(c.get("conflict_score", 0) for c in processed_conflicts) 

134 / len(processed_conflicts) 

135 if processed_conflicts 

136 else 0 

137 ), 

138 "conflict_types": list( 

139 {c.get("conflict_type", "unknown") for c in processed_conflicts} 

140 ), 

141 }, 

142 "analysis_metadata": { 

143 "query": query, 

144 "document_count": conflicts.get("query_metadata", {}).get( 

145 "document_count", len(documents) if documents else 0 

146 ), 

147 "analysis_depth": "lightweight", 

148 }, 

149 "navigation": { 

150 "total_found": len(processed_conflicts), 

151 "showing": min(len(processed_conflicts), 5), 

152 "has_more": len(processed_conflicts) > 5, 

153 }, 

154 # Keep legacy fields for backward compatibility 

155 "query": query, 

156 "conflicts": processed_conflicts[:5], 

157 "resolution_suggestions": conflicts.get("resolution_suggestions", []), 

158 "total_conflicts": len(processed_conflicts), 

159 } 

160 

161 @staticmethod 

162 def create_lightweight_cluster_results( 

163 clusters: dict[str, Any], 

164 query: str = "", 

165 ) -> dict[str, Any]: 

166 """Create lightweight document clustering results.""" 

167 cluster_list = clusters.get("clusters", []) 

168 clustering_metadata = clusters.get("clustering_metadata", {}) 

169 

170 formatted_clusters = [] 

171 for cluster in cluster_list[:8]: # Limit to 8 clusters 

172 formatted_clusters.append( 

173 { 

174 "cluster_id": cluster.get( 

175 "id", f"cluster_{len(formatted_clusters)+1}" 

176 ), 

177 "cluster_name": cluster.get( 

178 "name", f"Cluster {len(formatted_clusters)+1}" 

179 ), 

180 "coherence_score": cluster.get("coherence_score", 0), 

181 "document_count": len(cluster.get("documents", [])), 

182 "documents": [ 

183 FormatterUtils.extract_minimal_doc_fields(doc) 

184 for doc in cluster.get("documents", [])[ 

185 :5 

186 ] # Limit docs per cluster 

187 ], 

188 "cluster_themes": cluster.get( 

189 "shared_entities", cluster.get("cluster_themes", []) 

190 ), 

191 "centroid_topics": cluster.get("centroid_topics", []), 

192 } 

193 ) 

194 

195 return { 

196 "cluster_index": formatted_clusters, 

197 "clustering_metadata": { 

198 "strategy": clustering_metadata.get("strategy", "unknown"), 

199 "total_documents": clustering_metadata.get( 

200 "total_documents", 

201 sum(len(cluster.get("documents", [])) for cluster in cluster_list), 

202 ), 

203 "clusters_created": clustering_metadata.get( 

204 "clusters_created", len(cluster_list) 

205 ), 

206 "query": query, 

207 "analysis_depth": "lightweight", 

208 }, 

209 "expansion_info": { 

210 "total_clusters": len(cluster_list), 

211 "showing": len(formatted_clusters), 

212 "can_expand": len(cluster_list) > len(formatted_clusters), 

213 "documents_per_cluster": 5, # Max docs shown per cluster 

214 }, 

215 # Keep legacy fields for backward compatibility 

216 "query": query, 

217 "clusters": [ 

218 { 

219 "cluster_id": cluster.get("id", f"cluster_{i+1}"), 

220 "documents": [ 

221 FormatterUtils.extract_minimal_doc_fields(doc) 

222 for doc in cluster.get("documents", [])[ 

223 :5 

224 ] # Limit docs per cluster 

225 ], 

226 "cluster_themes": cluster.get( 

227 "shared_entities", cluster.get("cluster_themes", []) 

228 ), 

229 "coherence_score": cluster.get("coherence_score", 0), 

230 "document_count": len(cluster.get("documents", [])), 

231 } 

232 for i, cluster in enumerate(cluster_list[:8]) # Limit to 8 clusters 

233 ], 

234 "total_clusters": len(cluster_list), 

235 "total_documents": sum( 

236 len(cluster.get("documents", [])) for cluster in cluster_list 

237 ), 

238 } 

239 

240 @staticmethod 

241 def create_lightweight_hierarchy_results( 

242 filtered_results: list[HybridSearchResult], 

243 organized_results: dict[str, list[HybridSearchResult]], 

244 query: str = "", 

245 ) -> dict[str, Any]: 

246 """Create lightweight hierarchical results.""" 

247 hierarchy_groups_data = [] 

248 hierarchy_index_data = [] 

249 

250 for group_name, results in organized_results.items(): 

251 clean_group_name = FormatterUtils.generate_clean_group_name( 

252 group_name, results 

253 ) 

254 documents_data = [ 

255 { 

256 **FormatterUtils.extract_minimal_doc_fields(result), 

257 "depth": FormatterUtils.extract_synthetic_depth(result), 

258 "has_children": FormatterUtils.extract_has_children(result), 

259 "parent_title": FormatterUtils.extract_synthetic_parent_title( 

260 result 

261 ), 

262 } 

263 for result in results[:10] # Limit per group 

264 ] 

265 # Calculate depth range for the group 

266 depths = [ 

267 FormatterUtils.extract_synthetic_depth(result) for result in results 

268 ] 

269 depth_range = [min(depths), max(depths)] if depths else [0, 0] 

270 

271 group_data = { 

272 "group_key": group_name, # Original key 

273 "group_name": clean_group_name, # Clean display name 

274 "documents": documents_data, 

275 "document_ids": [doc["document_id"] for doc in documents_data], 

276 "depth_range": depth_range, 

277 "total_documents": len(results), 

278 } 

279 hierarchy_groups_data.append(group_data) 

280 

281 # Create index entries as individual documents for compatibility 

282 for result in results: 

283 hierarchy_index_data.append( 

284 { 

285 "document_id": getattr( 

286 result, "document_id", f"doc_{id(result)}" 

287 ), 

288 "title": getattr(result, "title", "Untitled"), 

289 "score": getattr(result, "score", 0.0), 

290 "hierarchy_info": { 

291 "depth": FormatterUtils.extract_synthetic_depth(result), 

292 "has_children": FormatterUtils.extract_has_children(result), 

293 "parent_title": FormatterUtils.extract_synthetic_parent_title( 

294 result 

295 ), 

296 "group_name": clean_group_name, 

297 "source_type": getattr(result, "source_type", "unknown"), 

298 }, 

299 "navigation_hints": { 

300 "breadcrumb": FormatterUtils.extract_synthetic_parent_title( 

301 result 

302 ), 

303 "level": FormatterUtils.extract_synthetic_depth(result), 

304 "group": clean_group_name, 

305 "siblings_count": len(results) 

306 - 1, # Other docs in same group 

307 "children_count": 0, # Default, could be enhanced with actual child detection 

308 }, 

309 } 

310 ) 

311 

312 return { 

313 "hierarchy_index": hierarchy_index_data, 

314 "hierarchy_groups": hierarchy_groups_data, 

315 "total_found": len(filtered_results), 

316 "query_metadata": { 

317 "query": query, 

318 "search_query": query, # Alias for compatibility 

319 "total_documents": len(filtered_results), 

320 "total_groups": len(organized_results), 

321 "analysis_type": "hierarchy", 

322 "source_types_found": list( 

323 { 

324 getattr(result, "source_type", "unknown") 

325 for result in filtered_results 

326 } 

327 ), 

328 }, 

329 # Keep legacy fields for backward compatibility 

330 "query": query, 

331 "total_groups": len(organized_results), 

332 } 

333 

334 @staticmethod 

335 def create_lightweight_complementary_results( 

336 complementary_recommendations: list[dict[str, Any]], 

337 target_document: "HybridSearchResult" = None, 

338 context_documents_analyzed: int = 0, 

339 target_query: str = "", 

340 ) -> dict[str, Any]: 

341 """Create lightweight complementary content results.""" 

342 result: dict[str, Any] = { 

343 "target_query": target_query, 

344 "complementary_index": [ 

345 { 

346 "document_id": getattr( 

347 rec.get("document"), "document_id", rec.get("document_id") 

348 ), 

349 "title": getattr( 

350 rec.get("document"), 

351 "source_title", 

352 rec.get("title", "Untitled"), 

353 ), 

354 "complementary_score": rec.get("relevance_score", 0), 

355 "complementary_reason": rec.get( 

356 "recommendation_reason", rec.get("reason", "") 

357 ), 

358 "relationship_type": rec.get("strategy", ""), 

359 "basic_metadata": ( 

360 lambda doc_obj, rec_dict: { 

361 "source_type": ( 

362 getattr(doc_obj, "source_type", None) 

363 if doc_obj is not None 

364 else None 

365 ) 

366 or rec_dict.get("source_type") 

367 or ( 

368 doc_obj.get("source_type") 

369 if isinstance(doc_obj, dict) 

370 else None 

371 ) 

372 or "unknown", 

373 "project_id": ( 

374 getattr(doc_obj, "project_id", None) 

375 if doc_obj is not None 

376 else None 

377 ) 

378 or rec_dict.get("project_id") 

379 or ( 

380 doc_obj.get("project_id") 

381 if isinstance(doc_obj, dict) 

382 else None 

383 ), 

384 } 

385 )(rec.get("document"), rec), 

386 } 

387 for rec in complementary_recommendations 

388 ], 

389 "complementary_recommendations": [ 

390 { 

391 "document": { 

392 "document_id": rec.get("document_id"), 

393 "title": rec.get("title"), 

394 "source_type": rec.get("source_type", "unknown"), 

395 "score": rec.get("relevance_score", 0), 

396 }, 

397 "relationship_type": "related", 

398 "relevance_score": rec.get("relevance_score", 0), 

399 "reasons": [rec.get("reason", "")] if rec.get("reason") else [], 

400 } 

401 for rec in complementary_recommendations[:8] # Limit to 8 

402 ], 

403 "context_documents_analyzed": context_documents_analyzed, 

404 "total_recommendations": len(complementary_recommendations), 

405 "complementary_summary": { 

406 "total_found": len(complementary_recommendations), 

407 "complementary_found": len(complementary_recommendations), 

408 "total_analyzed": context_documents_analyzed, 

409 "average_score": ( 

410 sum( 

411 rec.get("relevance_score", 0) 

412 for rec in complementary_recommendations 

413 ) 

414 / len(complementary_recommendations) 

415 if complementary_recommendations 

416 else 0 

417 ), 

418 "strategies_used": list( 

419 { 

420 rec.get("strategy", "unknown") 

421 for rec in complementary_recommendations 

422 } 

423 ), 

424 }, 

425 "lazy_loading_enabled": False, 

426 "expand_document_hint": "Use tools/call with 'search' to get full document details", 

427 } 

428 

429 # Only include target_document if available; shape must match schema 

430 if target_document is not None: 

431 if isinstance(target_document, dict): 

432 result["target_document"] = { 

433 "document_id": target_document.get( 

434 "document_id", target_document.get("id", "") 

435 ), 

436 "title": target_document.get("title", "Untitled"), 

437 "content_preview": target_document.get("content_preview", ""), 

438 "source_type": target_document.get("source_type", "unknown"), 

439 } 

440 else: 

441 # Assume HybridSearchResult-like object 

442 title_val = ( 

443 target_document.get_display_title() 

444 if hasattr(target_document, "get_display_title") 

445 else getattr(target_document, "source_title", "Untitled") 

446 ) 

447 text_val = getattr(target_document, "text", "") or "" 

448 result["target_document"] = { 

449 "document_id": getattr( 

450 target_document, 

451 "document_id", 

452 getattr(target_document, "id", ""), 

453 ), 

454 "title": title_val, 

455 "content_preview": ( 

456 text_val[:200] + "..." 

457 if isinstance(text_val, str) and len(text_val) > 200 

458 else text_val 

459 ), 

460 "source_type": getattr(target_document, "source_type", "unknown"), 

461 } 

462 

463 return result 

464 

465 @staticmethod 

466 def create_lightweight_attachment_results( 

467 filtered_results: list[HybridSearchResult], 

468 attachment_filter: dict[str, Any], 

469 query: str = "", 

470 ) -> dict[str, Any]: 

471 """Create lightweight attachment results.""" 

472 # Filter only attachment results 

473 attachment_results = [ 

474 result 

475 for result in filtered_results 

476 if getattr(result, "is_attachment", False) 

477 ] 

478 

479 # Group by file type for organized display 

480 organized_attachments = {} 

481 for result in attachment_results: 

482 file_type = FormatterUtils.extract_file_type_minimal(result) 

483 if file_type not in organized_attachments: 

484 organized_attachments[file_type] = [] 

485 organized_attachments[file_type].append(result) 

486 

487 # Create attachment index 

488 attachment_index = [ 

489 { 

490 "document_id": getattr(result, "document_id", ""), 

491 "title": getattr(result, "source_title", "Untitled"), 

492 "attachment_info": { 

493 "filename": FormatterUtils.extract_safe_filename(result), 

494 "file_type": FormatterUtils.extract_file_type_minimal(result), 

495 "file_size": getattr(result, "file_size", None), 

496 }, 

497 "score": getattr(result, "score", 0.0), 

498 "source_url": getattr(result, "source_url", None), 

499 } 

500 for result in attachment_results[:20] # Limit to top 20 

501 ] 

502 

503 # Create attachment groups 

504 attachment_groups = [ 

505 { 

506 "file_type": file_type, 

507 "attachments": [ 

508 { 

509 **FormatterUtils.extract_minimal_doc_fields(result), 

510 "filename": FormatterUtils.extract_safe_filename(result), 

511 "file_type": FormatterUtils.extract_file_type_minimal(result), 

512 } 

513 for result in results[:15] # Limit per group 

514 ], 

515 "total_attachments": len(results), 

516 } 

517 for file_type, results in organized_attachments.items() 

518 ] 

519 

520 return { 

521 "attachment_index": attachment_index, 

522 "attachment_groups": attachment_groups, 

523 "total_found": len(attachment_results), 

524 "query_metadata": { 

525 "query": query, 

526 "filter": attachment_filter, 

527 "total_attachments": len(attachment_results), 

528 "file_types": list(organized_attachments.keys()), 

529 }, 

530 # Keep legacy fields for backward compatibility 

531 "query": query, 

532 "total_groups": len(organized_attachments), 

533 }