Coverage for src/qdrant_loader_mcp_server/mcp/formatters/lightweight.py: 100%
63 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Lightweight Result Formatters - Efficient Result Construction.
4This module handles the creation of lightweight, efficient result structures
5for MCP responses, optimizing for minimal data transfer and fast processing.
6"""
8from typing import Any
10from ...search.components.search_result_models import HybridSearchResult
11from .utils import FormatterUtils
14class LightweightResultFormatters:
15 """Handles lightweight result construction operations."""
17 @staticmethod
18 def create_lightweight_similar_documents_results(
19 similar_docs: list[dict[str, Any]],
20 target_query: str = "",
21 comparison_query: str = "",
22 ) -> dict[str, Any]:
23 """Create lightweight similar documents results."""
24 return {
25 "similarity_index": [
26 {
27 # Support dict or object for document field
28 **(
29 lambda document: {
30 "document_id": (
31 document.get("document_id", "")
32 if isinstance(document, dict)
33 else getattr(document, "document_id", "")
34 ),
35 "title": (
36 document.get("source_title", "Untitled")
37 if isinstance(document, dict)
38 else getattr(document, "source_title", "Untitled")
39 ),
40 "navigation_hints": {
41 "can_expand": True,
42 "has_children": (
43 document.get("has_children", False)
44 if isinstance(document, dict)
45 else getattr(document, "has_children", False)
46 ),
47 },
48 }
49 )(doc_info.get("document", {})),
50 "similarity_score": doc_info.get("similarity_score", 0),
51 "similarity_info": {
52 "metric_scores": doc_info.get("metric_scores", {}),
53 "reasons": doc_info.get("similarity_reasons", []),
54 },
55 }
56 for doc_info in similar_docs[:10] # Limit to top 10
57 ],
58 "query_info": {
59 "target_query": target_query,
60 "comparison_query": comparison_query,
61 "total_found": len(similar_docs),
62 },
63 "navigation": {
64 "total_found": len(similar_docs),
65 "showing": min(len(similar_docs), 10),
66 },
67 # Keep legacy fields for backward compatibility
68 "target_query": target_query,
69 "comparison_query": comparison_query,
70 "similar_documents": [
71 {
72 "document": FormatterUtils.extract_minimal_doc_fields(
73 doc_info.get("document")
74 ),
75 "similarity_score": doc_info.get("similarity_score", 0),
76 "similarity_reasons": doc_info.get("similarity_reasons", []),
77 }
78 for doc_info in similar_docs[:10] # Limit to top 10
79 ],
80 "total_found": len(similar_docs),
81 }
83 @staticmethod
84 def create_lightweight_conflict_results(
85 conflicts: dict[str, Any],
86 query: str = "",
87 documents: list = None,
88 ) -> dict[str, Any]:
89 """Create lightweight conflict analysis results."""
90 # Handle both new format ("conflicts") and old format ("conflicting_pairs")
91 conflict_list = conflicts.get("conflicts", [])
92 conflicting_pairs = conflicts.get("conflicting_pairs", [])
94 processed_conflicts = []
96 # Process conflicting_pairs format (tuples)
97 for pair in conflicting_pairs:
98 if len(pair) >= 3:
99 doc1_id, doc2_id, conflict_info = pair[0], pair[1], pair[2]
100 processed_conflicts.append(
101 {
102 "conflict_type": conflict_info.get("type", "unknown"),
103 "conflict_score": conflict_info.get("confidence", 0.0),
104 "conflict_description": conflict_info.get("description", ""),
105 "conflicting_statements": conflict_info.get(
106 "structured_indicators", []
107 ),
108 "document_1_id": doc1_id,
109 "document_2_id": doc2_id,
110 }
111 )
113 # Process conflicts format (dicts)
114 for conflict in conflict_list:
115 processed_conflicts.append(
116 {
117 "conflict_type": conflict.get("conflict_type", "unknown"),
118 "conflict_score": conflict.get(
119 "confidence", conflict.get("severity_score", 0.0)
120 ),
121 "conflict_description": conflict.get("description", ""),
122 "conflicting_statements": FormatterUtils.extract_conflicting_statements(
123 conflict
124 ),
125 }
126 )
128 return {
129 "conflicts_detected": processed_conflicts[:5], # Limit to top 5
130 "conflict_summary": {
131 "total_conflicts": len(processed_conflicts),
132 "avg_confidence": (
133 sum(c.get("conflict_score", 0) for c in processed_conflicts)
134 / len(processed_conflicts)
135 if processed_conflicts
136 else 0
137 ),
138 "conflict_types": list(
139 {c.get("conflict_type", "unknown") for c in processed_conflicts}
140 ),
141 },
142 "analysis_metadata": {
143 "query": query,
144 "document_count": conflicts.get("query_metadata", {}).get(
145 "document_count", len(documents) if documents else 0
146 ),
147 "analysis_depth": "lightweight",
148 },
149 "navigation": {
150 "total_found": len(processed_conflicts),
151 "showing": min(len(processed_conflicts), 5),
152 "has_more": len(processed_conflicts) > 5,
153 },
154 # Keep legacy fields for backward compatibility
155 "query": query,
156 "conflicts": processed_conflicts[:5],
157 "resolution_suggestions": conflicts.get("resolution_suggestions", []),
158 "total_conflicts": len(processed_conflicts),
159 }
161 @staticmethod
162 def create_lightweight_cluster_results(
163 clusters: dict[str, Any],
164 query: str = "",
165 ) -> dict[str, Any]:
166 """Create lightweight document clustering results."""
167 cluster_list = clusters.get("clusters", [])
168 clustering_metadata = clusters.get("clustering_metadata", {})
170 formatted_clusters = []
171 for cluster in cluster_list[:8]: # Limit to 8 clusters
172 formatted_clusters.append(
173 {
174 "cluster_id": cluster.get(
175 "id", f"cluster_{len(formatted_clusters)+1}"
176 ),
177 "cluster_name": cluster.get(
178 "name", f"Cluster {len(formatted_clusters)+1}"
179 ),
180 "coherence_score": cluster.get("coherence_score", 0),
181 "document_count": len(cluster.get("documents", [])),
182 "documents": [
183 FormatterUtils.extract_minimal_doc_fields(doc)
184 for doc in cluster.get("documents", [])[
185 :5
186 ] # Limit docs per cluster
187 ],
188 "cluster_themes": cluster.get(
189 "shared_entities", cluster.get("cluster_themes", [])
190 ),
191 "centroid_topics": cluster.get("centroid_topics", []),
192 }
193 )
195 return {
196 "cluster_index": formatted_clusters,
197 "clustering_metadata": {
198 "strategy": clustering_metadata.get("strategy", "unknown"),
199 "total_documents": clustering_metadata.get(
200 "total_documents",
201 sum(len(cluster.get("documents", [])) for cluster in cluster_list),
202 ),
203 "clusters_created": clustering_metadata.get(
204 "clusters_created", len(cluster_list)
205 ),
206 "query": query,
207 "analysis_depth": "lightweight",
208 },
209 "expansion_info": {
210 "total_clusters": len(cluster_list),
211 "showing": len(formatted_clusters),
212 "can_expand": len(cluster_list) > len(formatted_clusters),
213 "documents_per_cluster": 5, # Max docs shown per cluster
214 },
215 # Keep legacy fields for backward compatibility
216 "query": query,
217 "clusters": [
218 {
219 "cluster_id": cluster.get("id", f"cluster_{i+1}"),
220 "documents": [
221 FormatterUtils.extract_minimal_doc_fields(doc)
222 for doc in cluster.get("documents", [])[
223 :5
224 ] # Limit docs per cluster
225 ],
226 "cluster_themes": cluster.get(
227 "shared_entities", cluster.get("cluster_themes", [])
228 ),
229 "coherence_score": cluster.get("coherence_score", 0),
230 "document_count": len(cluster.get("documents", [])),
231 }
232 for i, cluster in enumerate(cluster_list[:8]) # Limit to 8 clusters
233 ],
234 "total_clusters": len(cluster_list),
235 "total_documents": sum(
236 len(cluster.get("documents", [])) for cluster in cluster_list
237 ),
238 }
240 @staticmethod
241 def create_lightweight_hierarchy_results(
242 filtered_results: list[HybridSearchResult],
243 organized_results: dict[str, list[HybridSearchResult]],
244 query: str = "",
245 ) -> dict[str, Any]:
246 """Create lightweight hierarchical results."""
247 hierarchy_groups_data = []
248 hierarchy_index_data = []
250 for group_name, results in organized_results.items():
251 clean_group_name = FormatterUtils.generate_clean_group_name(
252 group_name, results
253 )
254 documents_data = [
255 {
256 **FormatterUtils.extract_minimal_doc_fields(result),
257 "depth": FormatterUtils.extract_synthetic_depth(result),
258 "has_children": FormatterUtils.extract_has_children(result),
259 "parent_title": FormatterUtils.extract_synthetic_parent_title(
260 result
261 ),
262 }
263 for result in results[:10] # Limit per group
264 ]
265 # Calculate depth range for the group
266 depths = [
267 FormatterUtils.extract_synthetic_depth(result) for result in results
268 ]
269 depth_range = [min(depths), max(depths)] if depths else [0, 0]
271 group_data = {
272 "group_key": group_name, # Original key
273 "group_name": clean_group_name, # Clean display name
274 "documents": documents_data,
275 "document_ids": [doc["document_id"] for doc in documents_data],
276 "depth_range": depth_range,
277 "total_documents": len(results),
278 }
279 hierarchy_groups_data.append(group_data)
281 # Create index entries as individual documents for compatibility
282 for result in results:
283 hierarchy_index_data.append(
284 {
285 "document_id": getattr(
286 result, "document_id", f"doc_{id(result)}"
287 ),
288 "title": getattr(result, "title", "Untitled"),
289 "score": getattr(result, "score", 0.0),
290 "hierarchy_info": {
291 "depth": FormatterUtils.extract_synthetic_depth(result),
292 "has_children": FormatterUtils.extract_has_children(result),
293 "parent_title": FormatterUtils.extract_synthetic_parent_title(
294 result
295 ),
296 "group_name": clean_group_name,
297 "source_type": getattr(result, "source_type", "unknown"),
298 },
299 "navigation_hints": {
300 "breadcrumb": FormatterUtils.extract_synthetic_parent_title(
301 result
302 ),
303 "level": FormatterUtils.extract_synthetic_depth(result),
304 "group": clean_group_name,
305 "siblings_count": len(results)
306 - 1, # Other docs in same group
307 "children_count": 0, # Default, could be enhanced with actual child detection
308 },
309 }
310 )
312 return {
313 "hierarchy_index": hierarchy_index_data,
314 "hierarchy_groups": hierarchy_groups_data,
315 "total_found": len(filtered_results),
316 "query_metadata": {
317 "query": query,
318 "search_query": query, # Alias for compatibility
319 "total_documents": len(filtered_results),
320 "total_groups": len(organized_results),
321 "analysis_type": "hierarchy",
322 "source_types_found": list(
323 {
324 getattr(result, "source_type", "unknown")
325 for result in filtered_results
326 }
327 ),
328 },
329 # Keep legacy fields for backward compatibility
330 "query": query,
331 "total_groups": len(organized_results),
332 }
334 @staticmethod
335 def create_lightweight_complementary_results(
336 complementary_recommendations: list[dict[str, Any]],
337 target_document: "HybridSearchResult" = None,
338 context_documents_analyzed: int = 0,
339 target_query: str = "",
340 ) -> dict[str, Any]:
341 """Create lightweight complementary content results."""
342 result: dict[str, Any] = {
343 "target_query": target_query,
344 "complementary_index": [
345 {
346 "document_id": getattr(
347 rec.get("document"), "document_id", rec.get("document_id")
348 ),
349 "title": getattr(
350 rec.get("document"),
351 "source_title",
352 rec.get("title", "Untitled"),
353 ),
354 "complementary_score": rec.get("relevance_score", 0),
355 "complementary_reason": rec.get(
356 "recommendation_reason", rec.get("reason", "")
357 ),
358 "relationship_type": rec.get("strategy", ""),
359 "basic_metadata": (
360 lambda doc_obj, rec_dict: {
361 "source_type": (
362 getattr(doc_obj, "source_type", None)
363 if doc_obj is not None
364 else None
365 )
366 or rec_dict.get("source_type")
367 or (
368 doc_obj.get("source_type")
369 if isinstance(doc_obj, dict)
370 else None
371 )
372 or "unknown",
373 "project_id": (
374 getattr(doc_obj, "project_id", None)
375 if doc_obj is not None
376 else None
377 )
378 or rec_dict.get("project_id")
379 or (
380 doc_obj.get("project_id")
381 if isinstance(doc_obj, dict)
382 else None
383 ),
384 }
385 )(rec.get("document"), rec),
386 }
387 for rec in complementary_recommendations
388 ],
389 "complementary_recommendations": [
390 {
391 "document": {
392 "document_id": rec.get("document_id"),
393 "title": rec.get("title"),
394 "source_type": rec.get("source_type", "unknown"),
395 "score": rec.get("relevance_score", 0),
396 },
397 "relationship_type": "related",
398 "relevance_score": rec.get("relevance_score", 0),
399 "reasons": [rec.get("reason", "")] if rec.get("reason") else [],
400 }
401 for rec in complementary_recommendations[:8] # Limit to 8
402 ],
403 "context_documents_analyzed": context_documents_analyzed,
404 "total_recommendations": len(complementary_recommendations),
405 "complementary_summary": {
406 "total_found": len(complementary_recommendations),
407 "complementary_found": len(complementary_recommendations),
408 "total_analyzed": context_documents_analyzed,
409 "average_score": (
410 sum(
411 rec.get("relevance_score", 0)
412 for rec in complementary_recommendations
413 )
414 / len(complementary_recommendations)
415 if complementary_recommendations
416 else 0
417 ),
418 "strategies_used": list(
419 {
420 rec.get("strategy", "unknown")
421 for rec in complementary_recommendations
422 }
423 ),
424 },
425 "lazy_loading_enabled": False,
426 "expand_document_hint": "Use tools/call with 'search' to get full document details",
427 }
429 # Only include target_document if available; shape must match schema
430 if target_document is not None:
431 if isinstance(target_document, dict):
432 result["target_document"] = {
433 "document_id": target_document.get(
434 "document_id", target_document.get("id", "")
435 ),
436 "title": target_document.get("title", "Untitled"),
437 "content_preview": target_document.get("content_preview", ""),
438 "source_type": target_document.get("source_type", "unknown"),
439 }
440 else:
441 # Assume HybridSearchResult-like object
442 title_val = (
443 target_document.get_display_title()
444 if hasattr(target_document, "get_display_title")
445 else getattr(target_document, "source_title", "Untitled")
446 )
447 text_val = getattr(target_document, "text", "") or ""
448 result["target_document"] = {
449 "document_id": getattr(
450 target_document,
451 "document_id",
452 getattr(target_document, "id", ""),
453 ),
454 "title": title_val,
455 "content_preview": (
456 text_val[:200] + "..."
457 if isinstance(text_val, str) and len(text_val) > 200
458 else text_val
459 ),
460 "source_type": getattr(target_document, "source_type", "unknown"),
461 }
463 return result
465 @staticmethod
466 def create_lightweight_attachment_results(
467 filtered_results: list[HybridSearchResult],
468 attachment_filter: dict[str, Any],
469 query: str = "",
470 ) -> dict[str, Any]:
471 """Create lightweight attachment results."""
472 # Filter only attachment results
473 attachment_results = [
474 result
475 for result in filtered_results
476 if getattr(result, "is_attachment", False)
477 ]
479 # Group by file type for organized display
480 organized_attachments = {}
481 for result in attachment_results:
482 file_type = FormatterUtils.extract_file_type_minimal(result)
483 if file_type not in organized_attachments:
484 organized_attachments[file_type] = []
485 organized_attachments[file_type].append(result)
487 # Create attachment index
488 attachment_index = [
489 {
490 "document_id": getattr(result, "document_id", ""),
491 "title": getattr(result, "source_title", "Untitled"),
492 "attachment_info": {
493 "filename": FormatterUtils.extract_safe_filename(result),
494 "file_type": FormatterUtils.extract_file_type_minimal(result),
495 "file_size": getattr(result, "file_size", None),
496 },
497 "score": getattr(result, "score", 0.0),
498 "source_url": getattr(result, "source_url", None),
499 }
500 for result in attachment_results[:20] # Limit to top 20
501 ]
503 # Create attachment groups
504 attachment_groups = [
505 {
506 "file_type": file_type,
507 "attachments": [
508 {
509 **FormatterUtils.extract_minimal_doc_fields(result),
510 "filename": FormatterUtils.extract_safe_filename(result),
511 "file_type": FormatterUtils.extract_file_type_minimal(result),
512 }
513 for result in results[:15] # Limit per group
514 ],
515 "total_attachments": len(results),
516 }
517 for file_type, results in organized_attachments.items()
518 ]
520 return {
521 "attachment_index": attachment_index,
522 "attachment_groups": attachment_groups,
523 "total_found": len(attachment_results),
524 "query_metadata": {
525 "query": query,
526 "filter": attachment_filter,
527 "total_attachments": len(attachment_results),
528 "file_types": list(organized_attachments.keys()),
529 },
530 # Keep legacy fields for backward compatibility
531 "query": query,
532 "total_groups": len(organized_attachments),
533 }