Coverage for src/qdrant_loader_mcp_server/mcp/handlers/intelligence/relationships.py: 80%
103 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import math
4from typing import Any
6from .utils import get_or_create_document_id
9def _safe_score(d: Any) -> float:
10 try:
11 if isinstance(d, dict):
12 return float(
13 d.get("score") or d.get("similarity") or d.get("relevance") or 0.0
14 )
15 return float(
16 getattr(d, "score", None)
17 or getattr(d, "similarity", None)
18 or getattr(d, "relevance", None)
19 or 0.0
20 )
21 except Exception:
22 return 0.0
25def _display_title(d: Any) -> str:
26 if isinstance(d, dict):
27 return (d.get("title") or d.get("source_title") or str(d))[:100]
28 return (getattr(d, "source_title", None) or getattr(d, "title", None) or str(d))[
29 :100
30 ]
33def process_analysis_results(
34 analysis_results: dict[str, Any], params: dict[str, Any]
35) -> dict[str, Any]:
36 relationships: list[dict[str, Any]] = []
37 summary_parts: list[str] = []
38 total_analyzed = analysis_results.get("query_metadata", {}).get("document_count", 0)
40 # Document clusters → similarity relationships
41 if "document_clusters" in analysis_results:
42 clusters = analysis_results["document_clusters"]
43 summary_parts.append(f"{len(clusters)} document clusters found")
45 for cluster in clusters:
46 cluster_docs = cluster.get("documents", [])
47 max_pairs: int = params.get("max_similarity_pairs_per_cluster", 50)
48 try:
49 sorted_docs = sorted(cluster_docs, key=_safe_score, reverse=True)
50 except Exception:
51 sorted_docs = list(cluster_docs)
53 # Validate inputs and clamp quadratic-based doc count
54 try:
55 max_pairs_val = int(max_pairs) if max_pairs is not None else 0
56 except Exception:
57 max_pairs_val = 0
58 if max_pairs_val < 0:
59 max_pairs_val = 0
61 if not sorted_docs:
62 docs_for_pairs = []
63 else:
64 # Guard isqrt by ensuring non-negative discriminant
65 discriminant = 1 + 8 * max_pairs_val
66 if discriminant < 0:
67 discriminant = 0
68 root = math.isqrt(discriminant)
69 max_docs = int((1 + root) // 2)
70 # Clamp to [2, len(sorted_docs)] where possible
71 max_docs = max(2, max_docs)
72 max_docs = min(len(sorted_docs), max_docs)
73 docs_for_pairs = sorted_docs[:max_docs]
75 emitted_pairs = 0
76 for i, doc1 in enumerate(docs_for_pairs):
77 for doc2 in docs_for_pairs[i + 1 :]:
78 if emitted_pairs >= max_pairs:
79 break
80 relationships.append(
81 {
82 "document_1_id": get_or_create_document_id(doc1),
83 "document_2_id": get_or_create_document_id(doc2),
84 "document_1_title": _display_title(doc1),
85 "document_2_title": _display_title(doc2),
86 "relationship_type": "similarity",
87 "confidence_score": cluster.get("cohesion_score", 0.8),
88 "relationship_summary": f"Both documents belong to cluster: {cluster.get('theme', 'unnamed cluster')}",
89 }
90 )
91 emitted_pairs += 1
93 # Conflicts → conflict relationships
94 if "conflict_analysis" in analysis_results:
95 conflicts = analysis_results["conflict_analysis"].get("conflicting_pairs", [])
96 if conflicts:
97 summary_parts.append(f"{len(conflicts)} conflicts detected")
98 for conflict in conflicts:
99 if isinstance(conflict, list | tuple) and len(conflict) >= 2:
100 doc1, doc2 = conflict[0], conflict[1]
101 conflict_info = conflict[2] if len(conflict) > 2 else {}
102 relationships.append(
103 {
104 "document_1_id": get_or_create_document_id(doc1),
105 "document_2_id": get_or_create_document_id(doc2),
106 "document_1_title": _display_title(doc1),
107 "document_2_title": _display_title(doc2),
108 "relationship_type": "conflict",
109 "confidence_score": conflict_info.get("severity", 0.5),
110 "relationship_summary": f"Conflict detected: {conflict_info.get('type', 'unknown conflict')}",
111 }
112 )
114 # Complementary content → complementary relationships
115 if "complementary_content" in analysis_results:
116 complementary = analysis_results["complementary_content"]
117 comp_count = 0
118 # Build a lightweight documents lookup for title resolution
119 docs_lookup: dict[str, Any] = {}
120 try:
121 # From clusters if available
122 for cluster in analysis_results.get("document_clusters", []) or []:
123 for d in cluster.get("documents", []) or []:
124 if isinstance(d, dict):
125 doc_key = d.get("document_id") or get_or_create_document_id(d)
126 if doc_key:
127 docs_lookup[str(doc_key)] = d
128 # Also try a composite key commonly used elsewhere
129 st = d.get("source_type", "unknown")
130 tt = d.get("source_title", d.get("title", "unknown"))
131 docs_lookup.setdefault(f"{st}:{tt}", d)
132 except Exception:
133 pass
134 for doc_id, complementary_content in complementary.items():
135 if hasattr(complementary_content, "get_top_recommendations"):
136 recommendations = complementary_content.get_top_recommendations()
137 else:
138 recommendations = (
139 complementary_content
140 if isinstance(complementary_content, list)
141 else []
142 )
143 for rec in recommendations:
144 if isinstance(rec, dict):
145 target_doc_id = rec.get("document_id", "Unknown")
146 score = rec.get("relevance_score", 0.5)
147 reason = rec.get("recommendation_reason", "complementary content")
148 # Resolve titles consistently using _display_title with lookups
149 source_doc_obj = docs_lookup.get(str(doc_id))
150 document_1_title = (
151 _display_title(source_doc_obj)
152 if source_doc_obj is not None
153 else str(doc_id)
154 )[:100]
155 target_doc_obj = rec.get("document") or docs_lookup.get(
156 str(target_doc_id)
157 )
158 fallback_title = rec.get("title", str(target_doc_id))
159 document_2_title = (
160 _display_title(target_doc_obj)
161 if target_doc_obj is not None
162 else fallback_title
163 )[:100]
164 relationships.append(
165 {
166 "document_1_id": doc_id,
167 "document_2_id": target_doc_id,
168 "document_1_title": document_1_title,
169 "document_2_title": document_2_title,
170 "relationship_type": "complementary",
171 "confidence_score": score,
172 "relationship_summary": f"Complementary content: {reason}",
173 }
174 )
175 comp_count += 1
176 if comp_count > 0:
177 summary_parts.append(f"{comp_count} complementary relationships")
179 # Citations and insights → summary only
180 if "citation_network" in analysis_results:
181 citation_net = analysis_results["citation_network"]
182 if citation_net.get("edges", 0) > 0:
183 summary_parts.append(f"{citation_net['edges']} citation relationships")
185 if analysis_results.get("similarity_insights"):
186 summary_parts.append("similarity patterns identified")
188 summary_text = (
189 f"Analyzed {total_analyzed} documents: {', '.join(summary_parts)}"
190 if summary_parts
191 else f"Analyzed {total_analyzed} documents with no significant relationships found"
192 )
194 return {
195 "relationships": relationships,
196 "total_analyzed": total_analyzed,
197 "summary": summary_text,
198 }