Coverage for src / qdrant_loader_mcp_server / mcp / formatters / lightweight.py: 100%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""
2Lightweight Result Formatters - Efficient Result Construction.
4This module handles the creation of lightweight, efficient result structures
5for MCP responses, optimizing for minimal data transfer and fast processing.
6"""
8from typing import Any
10from ...search.components.search_result_models import HybridSearchResult
11from .utils import FormatterUtils
14class LightweightResultFormatters:
15 """Handles lightweight result construction operations."""
17 @staticmethod
18 def create_lightweight_similar_documents_results(
19 similar_docs: list[dict[str, Any]],
20 target_query: str = "",
21 comparison_query: str = "",
22 ) -> dict[str, Any]:
23 """Create lightweight similar documents results."""
24 return {
25 "similarity_index": [
26 {
27 # Support dict or object for document field
28 **(
29 lambda document: {
30 "document_id": (
31 document.get("document_id", "")
32 if isinstance(document, dict)
33 else getattr(document, "document_id", "")
34 ),
35 "title": (
36 document.get("source_title", "Untitled")
37 if isinstance(document, dict)
38 else getattr(document, "source_title", "Untitled")
39 ),
40 "navigation_hints": {
41 "can_expand": True,
42 "has_children": (
43 document.get("has_children", False)
44 if isinstance(document, dict)
45 else getattr(document, "has_children", False)
46 ),
47 },
48 }
49 )(doc_info.get("document", {})),
50 "similarity_score": doc_info.get("similarity_score", 0),
51 "similarity_info": {
52 "metric_scores": doc_info.get("metric_scores", {}),
53 "reasons": doc_info.get("similarity_reasons", []),
54 },
55 }
56 for doc_info in similar_docs[:10] # Limit to top 10
57 ],
58 "query_info": {
59 "target_query": target_query,
60 "comparison_query": comparison_query,
61 "total_found": len(similar_docs),
62 },
63 "navigation": {
64 "total_found": len(similar_docs),
65 "showing": min(len(similar_docs), 10),
66 },
67 # Keep legacy fields for backward compatibility
68 "target_query": target_query,
69 "comparison_query": comparison_query,
70 "similar_documents": [
71 {
72 "document": (
73 FormatterUtils.extract_minimal_doc_fields(doc)
74 if (doc := doc_info.get("document")) is not None
75 else {
76 "document_id": "",
77 "title": "Untitled",
78 "source_type": "unknown",
79 "score": 0.0,
80 }
81 ),
82 "similarity_score": doc_info.get("similarity_score", 0),
83 "similarity_reasons": doc_info.get("similarity_reasons") or [],
84 }
85 for doc_info in similar_docs[:10] # Limit to top 10
86 ],
87 "total_found": len(similar_docs),
88 }
90 @staticmethod
91 def create_lightweight_conflict_results(
92 conflicts: dict[str, Any],
93 query: str = "",
94 documents: list[Any] | None = None,
95 ) -> dict[str, Any]:
96 """Create lightweight conflict analysis results."""
97 # Handle both new format ("conflicts") and old format ("conflicting_pairs")
98 conflict_list = conflicts.get("conflicts", [])
99 conflicting_pairs = conflicts.get("conflicting_pairs", [])
101 processed_conflicts = []
103 # Process conflicting_pairs format (tuples)
104 for pair in conflicting_pairs:
105 if len(pair) >= 3:
106 doc1_id, doc2_id, conflict_info = pair[0], pair[1], pair[2]
107 processed_conflicts.append(
108 {
109 "conflict_type": conflict_info.get("type", "unknown"),
110 "conflict_score": conflict_info.get("confidence", 0.0),
111 "conflict_description": conflict_info.get("description", ""),
112 "conflicting_statements": conflict_info.get(
113 "structured_indicators", []
114 ),
115 "document_1_id": doc1_id,
116 "document_2_id": doc2_id,
117 }
118 )
120 # Process conflicts format (dicts)
121 for conflict in conflict_list:
122 processed_conflicts.append(
123 {
124 "conflict_type": conflict.get("conflict_type", "unknown"),
125 "conflict_score": conflict.get(
126 "confidence", conflict.get("severity_score", 0.0)
127 ),
128 "conflict_description": conflict.get("description", ""),
129 "conflicting_statements": FormatterUtils.extract_conflicting_statements(
130 conflict
131 ),
132 }
133 )
135 return {
136 "conflicts_detected": processed_conflicts[:5], # Limit to top 5
137 "conflict_summary": {
138 "total_conflicts": len(processed_conflicts),
139 "avg_confidence": (
140 sum(c.get("conflict_score", 0) for c in processed_conflicts)
141 / len(processed_conflicts)
142 if processed_conflicts
143 else 0
144 ),
145 "conflict_types": list(
146 {c.get("conflict_type", "unknown") for c in processed_conflicts}
147 ),
148 },
149 "analysis_metadata": {
150 "query": query,
151 "document_count": conflicts.get("query_metadata", {}).get(
152 "document_count", len(documents) if documents else 0
153 ),
154 "analysis_depth": "lightweight",
155 },
156 "navigation": {
157 "total_found": len(processed_conflicts),
158 "showing": min(len(processed_conflicts), 5),
159 "has_more": len(processed_conflicts) > 5,
160 },
161 # Keep legacy fields for backward compatibility
162 "query": query,
163 "conflicts": processed_conflicts[:5],
164 "resolution_suggestions": conflicts.get("resolution_suggestions", []),
165 "total_conflicts": len(processed_conflicts),
166 }
168 @staticmethod
169 def create_lightweight_cluster_results(
170 clusters: dict[str, Any],
171 query: str = "",
172 ) -> dict[str, Any]:
173 """Create lightweight document clustering results."""
174 cluster_list = clusters.get("clusters", [])
175 clustering_metadata = clusters.get("clustering_metadata", {})
177 formatted_clusters = []
178 for cluster in cluster_list[:8]: # Limit to 8 clusters
179 formatted_clusters.append(
180 {
181 "cluster_id": cluster.get(
182 "id", f"cluster_{len(formatted_clusters) + 1}"
183 ),
184 "cluster_name": cluster.get(
185 "name", f"Cluster {len(formatted_clusters) + 1}"
186 ),
187 "coherence_score": cluster.get("coherence_score", 0),
188 "document_count": len(cluster.get("documents", [])),
189 "documents": [
190 FormatterUtils.extract_minimal_doc_fields(doc)
191 for doc in cluster.get("documents", [])[
192 :5
193 ] # Limit docs per cluster
194 ],
195 "cluster_themes": cluster.get(
196 "shared_entities", cluster.get("cluster_themes", [])
197 ),
198 "centroid_topics": cluster.get("centroid_topics", []),
199 }
200 )
202 return {
203 "cluster_index": formatted_clusters,
204 "clustering_metadata": {
205 "strategy": clustering_metadata.get("strategy", "unknown"),
206 "total_documents": clustering_metadata.get(
207 "total_documents",
208 sum(len(cluster.get("documents", [])) for cluster in cluster_list),
209 ),
210 "clusters_created": clustering_metadata.get(
211 "clusters_created", len(cluster_list)
212 ),
213 "query": query,
214 "analysis_depth": "lightweight",
215 },
216 "expansion_info": {
217 "total_clusters": len(cluster_list),
218 "showing": len(formatted_clusters),
219 "can_expand": len(cluster_list) > len(formatted_clusters),
220 "documents_per_cluster": 5, # Max docs shown per cluster
221 },
222 # Keep legacy fields for backward compatibility
223 "query": query,
224 "clusters": [
225 {
226 "cluster_id": cluster.get("id", f"cluster_{i + 1}"),
227 "documents": [
228 FormatterUtils.extract_minimal_doc_fields(doc)
229 for doc in cluster.get("documents", [])[
230 :5
231 ] # Limit docs per cluster
232 ],
233 "cluster_themes": cluster.get(
234 "shared_entities", cluster.get("cluster_themes", [])
235 ),
236 "coherence_score": cluster.get("coherence_score", 0),
237 "document_count": len(cluster.get("documents", [])),
238 }
239 for i, cluster in enumerate(cluster_list[:8]) # Limit to 8 clusters
240 ],
241 "total_clusters": len(cluster_list),
242 "total_documents": sum(
243 len(cluster.get("documents", [])) for cluster in cluster_list
244 ),
245 }
247 @staticmethod
248 def create_lightweight_hierarchy_results(
249 filtered_results: list[HybridSearchResult],
250 organized_results: dict[str, list[HybridSearchResult]],
251 query: str = "",
252 ) -> dict[str, Any]:
253 """Create lightweight hierarchical results."""
254 hierarchy_groups_data = []
255 hierarchy_index_data = []
257 for group_name, results in organized_results.items():
258 clean_group_name = FormatterUtils.generate_clean_group_name(
259 group_name, results
260 )
261 documents_data = [
262 {
263 **FormatterUtils.extract_minimal_doc_fields(result),
264 "depth": FormatterUtils.extract_synthetic_depth(result),
265 "has_children": FormatterUtils.extract_has_children(result),
266 "parent_title": FormatterUtils.extract_synthetic_parent_title(
267 result
268 ),
269 }
270 for result in results[:10] # Limit per group
271 ]
272 # Calculate depth range for the group
273 depths = [
274 FormatterUtils.extract_synthetic_depth(result) for result in results
275 ]
276 depth_range = [min(depths), max(depths)] if depths else [0, 0]
278 group_data = {
279 "group_key": group_name, # Original key
280 "group_name": clean_group_name, # Clean display name
281 "documents": documents_data,
282 "document_ids": [doc["document_id"] for doc in documents_data],
283 "depth_range": depth_range,
284 "total_documents": len(results),
285 }
286 hierarchy_groups_data.append(group_data)
288 # Create index entries as individual documents for compatibility
289 for result in results:
290 hierarchy_index_data.append(
291 {
292 "document_id": getattr(
293 result, "document_id", f"doc_{id(result)}"
294 ),
295 "title": getattr(result, "title", "Untitled"),
296 "score": getattr(result, "score", 0.0),
297 "hierarchy_info": {
298 "depth": FormatterUtils.extract_synthetic_depth(result),
299 "has_children": FormatterUtils.extract_has_children(result),
300 "parent_title": FormatterUtils.extract_synthetic_parent_title(
301 result
302 ),
303 "group_name": clean_group_name,
304 "source_type": getattr(result, "source_type", "unknown"),
305 },
306 "navigation_hints": {
307 "breadcrumb": FormatterUtils.extract_synthetic_parent_title(
308 result
309 ),
310 "level": FormatterUtils.extract_synthetic_depth(result),
311 "group": clean_group_name,
312 "siblings_count": len(results)
313 - 1, # Other docs in same group
314 "children_count": 0, # Default, could be enhanced with actual child detection
315 },
316 }
317 )
319 return {
320 "hierarchy_index": hierarchy_index_data,
321 "hierarchy_groups": hierarchy_groups_data,
322 "total_found": len(filtered_results),
323 "query_metadata": {
324 "query": query,
325 "search_query": query, # Alias for compatibility
326 "total_documents": len(filtered_results),
327 "total_groups": len(organized_results),
328 "analysis_type": "hierarchy",
329 "source_types_found": list(
330 {
331 getattr(result, "source_type", "unknown")
332 for result in filtered_results
333 }
334 ),
335 },
336 # Keep legacy fields for backward compatibility
337 "query": query,
338 "total_groups": len(organized_results),
339 }
341 @staticmethod
342 def create_lightweight_complementary_results(
343 complementary_recommendations: list[dict[str, Any]],
344 target_document: "HybridSearchResult | None" = None,
345 context_documents_analyzed: int = 0,
346 target_query: str = "",
347 ) -> dict[str, Any]:
348 """Create lightweight complementary content results."""
349 result: dict[str, Any] = {
350 "target_query": target_query,
351 "complementary_index": [
352 {
353 "document_id": getattr(
354 rec.get("document"), "document_id", rec.get("document_id")
355 ),
356 "title": getattr(
357 rec.get("document"),
358 "source_title",
359 rec.get("title", "Untitled"),
360 ),
361 "complementary_score": rec.get("relevance_score", 0),
362 "complementary_reason": rec.get(
363 "recommendation_reason", rec.get("reason", "")
364 ),
365 "relationship_type": rec.get("strategy", ""),
366 "basic_metadata": (
367 lambda doc_obj, rec_dict: {
368 "source_type": (
369 getattr(doc_obj, "source_type", None)
370 if doc_obj is not None
371 else None
372 )
373 or rec_dict.get("source_type")
374 or (
375 doc_obj.get("source_type")
376 if isinstance(doc_obj, dict)
377 else None
378 )
379 or "unknown",
380 "project_id": (
381 getattr(doc_obj, "project_id", None)
382 if doc_obj is not None
383 else None
384 )
385 or rec_dict.get("project_id")
386 or (
387 doc_obj.get("project_id")
388 if isinstance(doc_obj, dict)
389 else None
390 ),
391 }
392 )(rec.get("document"), rec),
393 }
394 for rec in complementary_recommendations
395 ],
396 "complementary_recommendations": [
397 {
398 "document": {
399 "document_id": rec.get("document_id"),
400 "title": rec.get("title"),
401 "source_type": rec.get("source_type", "unknown"),
402 "score": rec.get("relevance_score", 0),
403 },
404 "relationship_type": "related",
405 "relevance_score": rec.get("relevance_score", 0),
406 "reasons": [rec.get("reason", "")] if rec.get("reason") else [],
407 }
408 for rec in complementary_recommendations[:8] # Limit to 8
409 ],
410 "context_documents_analyzed": context_documents_analyzed,
411 "total_recommendations": len(complementary_recommendations),
412 "complementary_summary": {
413 "total_found": len(complementary_recommendations),
414 "complementary_found": len(complementary_recommendations),
415 "total_analyzed": context_documents_analyzed,
416 "average_score": (
417 sum(
418 rec.get("relevance_score", 0)
419 for rec in complementary_recommendations
420 )
421 / len(complementary_recommendations)
422 if complementary_recommendations
423 else 0
424 ),
425 "strategies_used": list(
426 {
427 rec.get("strategy", "unknown")
428 for rec in complementary_recommendations
429 }
430 ),
431 },
432 "lazy_loading_enabled": False,
433 "expand_document_hint": "Use tools/call with 'expand_document' and document_id to get full document details",
434 }
436 # Only include target_document if available; shape must match schema
437 if target_document is not None:
438 if isinstance(target_document, dict):
439 result["target_document"] = {
440 "document_id": target_document.get(
441 "document_id", target_document.get("id", "")
442 ),
443 "title": target_document.get("title", "Untitled"),
444 "content_preview": target_document.get("content_preview", ""),
445 "source_type": target_document.get("source_type", "unknown"),
446 }
447 else:
448 # Assume HybridSearchResult-like object
449 title_val = (
450 target_document.get_display_title()
451 if hasattr(target_document, "get_display_title")
452 else getattr(target_document, "source_title", "Untitled")
453 )
454 text_val = getattr(target_document, "text", "") or ""
455 result["target_document"] = {
456 "document_id": getattr(
457 target_document,
458 "document_id",
459 getattr(target_document, "id", ""),
460 ),
461 "title": title_val,
462 "content_preview": (
463 text_val[:200] + "..."
464 if isinstance(text_val, str) and len(text_val) > 200
465 else text_val
466 ),
467 "source_type": getattr(target_document, "source_type", "unknown"),
468 }
470 return result
472 @staticmethod
473 def create_lightweight_attachment_results(
474 filtered_results: list[HybridSearchResult],
475 attachment_filter: dict[str, Any],
476 query: str = "",
477 ) -> dict[str, Any]:
478 """Create lightweight attachment results."""
479 # Filter only attachment results
480 attachment_results = [
481 result
482 for result in filtered_results
483 if getattr(result, "is_attachment", False)
484 ]
486 # Group by file type for organized display
487 organized_attachments = {}
488 for result in attachment_results:
489 file_type = FormatterUtils.extract_file_type_minimal(result)
490 if file_type not in organized_attachments:
491 organized_attachments[file_type] = []
492 organized_attachments[file_type].append(result)
494 # Create attachment index
495 attachment_index = [
496 {
497 "document_id": getattr(result, "document_id", ""),
498 "title": getattr(result, "source_title", "Untitled"),
499 "attachment_info": {
500 "filename": FormatterUtils.extract_safe_filename(result),
501 "file_type": FormatterUtils.extract_file_type_minimal(result),
502 "file_size": getattr(result, "file_size", None),
503 },
504 "score": getattr(result, "score", 0.0),
505 "source_url": getattr(result, "source_url", None),
506 }
507 for result in attachment_results[:20] # Limit to top 20
508 ]
510 # Create attachment groups
511 attachment_groups = [
512 {
513 "file_type": file_type,
514 "attachments": [
515 {
516 **FormatterUtils.extract_minimal_doc_fields(result),
517 "filename": FormatterUtils.extract_safe_filename(result),
518 "file_type": FormatterUtils.extract_file_type_minimal(result),
519 }
520 for result in results[:15] # Limit per group
521 ],
522 "total_attachments": len(results),
523 }
524 for file_type, results in organized_attachments.items()
525 ]
527 return {
528 "attachment_index": attachment_index,
529 "attachment_groups": attachment_groups,
530 "total_found": len(attachment_results),
531 "query_metadata": {
532 "query": query,
533 "filter": attachment_filter,
534 "total_attachments": len(attachment_results),
535 "file_types": list(organized_attachments.keys()),
536 },
537 # Keep legacy fields for backward compatibility
538 "query": query,
539 "total_groups": len(organized_attachments),
540 }