Coverage for src/qdrant_loader_mcp_server/mcp/handlers/intelligence/relationships.py: 80%

103 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import math 

4from typing import Any 

5 

6from .utils import get_or_create_document_id 

7 

8 

9def _safe_score(d: Any) -> float: 

10 try: 

11 if isinstance(d, dict): 

12 return float( 

13 d.get("score") or d.get("similarity") or d.get("relevance") or 0.0 

14 ) 

15 return float( 

16 getattr(d, "score", None) 

17 or getattr(d, "similarity", None) 

18 or getattr(d, "relevance", None) 

19 or 0.0 

20 ) 

21 except Exception: 

22 return 0.0 

23 

24 

25def _display_title(d: Any) -> str: 

26 if isinstance(d, dict): 

27 return (d.get("title") or d.get("source_title") or str(d))[:100] 

28 return (getattr(d, "source_title", None) or getattr(d, "title", None) or str(d))[ 

29 :100 

30 ] 

31 

32 

33def process_analysis_results( 

34 analysis_results: dict[str, Any], params: dict[str, Any] 

35) -> dict[str, Any]: 

36 relationships: list[dict[str, Any]] = [] 

37 summary_parts: list[str] = [] 

38 total_analyzed = analysis_results.get("query_metadata", {}).get("document_count", 0) 

39 

40 # Document clusters → similarity relationships 

41 if "document_clusters" in analysis_results: 

42 clusters = analysis_results["document_clusters"] 

43 summary_parts.append(f"{len(clusters)} document clusters found") 

44 

45 for cluster in clusters: 

46 cluster_docs = cluster.get("documents", []) 

47 max_pairs: int = params.get("max_similarity_pairs_per_cluster", 50) 

48 try: 

49 sorted_docs = sorted(cluster_docs, key=_safe_score, reverse=True) 

50 except Exception: 

51 sorted_docs = list(cluster_docs) 

52 

53 # Validate inputs and clamp quadratic-based doc count 

54 try: 

55 max_pairs_val = int(max_pairs) if max_pairs is not None else 0 

56 except Exception: 

57 max_pairs_val = 0 

58 if max_pairs_val < 0: 

59 max_pairs_val = 0 

60 

61 if not sorted_docs: 

62 docs_for_pairs = [] 

63 else: 

64 # Guard isqrt by ensuring non-negative discriminant 

65 discriminant = 1 + 8 * max_pairs_val 

66 if discriminant < 0: 

67 discriminant = 0 

68 root = math.isqrt(discriminant) 

69 max_docs = int((1 + root) // 2) 

70 # Clamp to [2, len(sorted_docs)] where possible 

71 max_docs = max(2, max_docs) 

72 max_docs = min(len(sorted_docs), max_docs) 

73 docs_for_pairs = sorted_docs[:max_docs] 

74 

75 emitted_pairs = 0 

76 for i, doc1 in enumerate(docs_for_pairs): 

77 for doc2 in docs_for_pairs[i + 1 :]: 

78 if emitted_pairs >= max_pairs: 

79 break 

80 relationships.append( 

81 { 

82 "document_1_id": get_or_create_document_id(doc1), 

83 "document_2_id": get_or_create_document_id(doc2), 

84 "document_1_title": _display_title(doc1), 

85 "document_2_title": _display_title(doc2), 

86 "relationship_type": "similarity", 

87 "confidence_score": cluster.get("cohesion_score", 0.8), 

88 "relationship_summary": f"Both documents belong to cluster: {cluster.get('theme', 'unnamed cluster')}", 

89 } 

90 ) 

91 emitted_pairs += 1 

92 

93 # Conflicts → conflict relationships 

94 if "conflict_analysis" in analysis_results: 

95 conflicts = analysis_results["conflict_analysis"].get("conflicting_pairs", []) 

96 if conflicts: 

97 summary_parts.append(f"{len(conflicts)} conflicts detected") 

98 for conflict in conflicts: 

99 if isinstance(conflict, list | tuple) and len(conflict) >= 2: 

100 doc1, doc2 = conflict[0], conflict[1] 

101 conflict_info = conflict[2] if len(conflict) > 2 else {} 

102 relationships.append( 

103 { 

104 "document_1_id": get_or_create_document_id(doc1), 

105 "document_2_id": get_or_create_document_id(doc2), 

106 "document_1_title": _display_title(doc1), 

107 "document_2_title": _display_title(doc2), 

108 "relationship_type": "conflict", 

109 "confidence_score": conflict_info.get("severity", 0.5), 

110 "relationship_summary": f"Conflict detected: {conflict_info.get('type', 'unknown conflict')}", 

111 } 

112 ) 

113 

114 # Complementary content → complementary relationships 

115 if "complementary_content" in analysis_results: 

116 complementary = analysis_results["complementary_content"] 

117 comp_count = 0 

118 # Build a lightweight documents lookup for title resolution 

119 docs_lookup: dict[str, Any] = {} 

120 try: 

121 # From clusters if available 

122 for cluster in analysis_results.get("document_clusters", []) or []: 

123 for d in cluster.get("documents", []) or []: 

124 if isinstance(d, dict): 

125 doc_key = d.get("document_id") or get_or_create_document_id(d) 

126 if doc_key: 

127 docs_lookup[str(doc_key)] = d 

128 # Also try a composite key commonly used elsewhere 

129 st = d.get("source_type", "unknown") 

130 tt = d.get("source_title", d.get("title", "unknown")) 

131 docs_lookup.setdefault(f"{st}:{tt}", d) 

132 except Exception: 

133 pass 

134 for doc_id, complementary_content in complementary.items(): 

135 if hasattr(complementary_content, "get_top_recommendations"): 

136 recommendations = complementary_content.get_top_recommendations() 

137 else: 

138 recommendations = ( 

139 complementary_content 

140 if isinstance(complementary_content, list) 

141 else [] 

142 ) 

143 for rec in recommendations: 

144 if isinstance(rec, dict): 

145 target_doc_id = rec.get("document_id", "Unknown") 

146 score = rec.get("relevance_score", 0.5) 

147 reason = rec.get("recommendation_reason", "complementary content") 

148 # Resolve titles consistently using _display_title with lookups 

149 source_doc_obj = docs_lookup.get(str(doc_id)) 

150 document_1_title = ( 

151 _display_title(source_doc_obj) 

152 if source_doc_obj is not None 

153 else str(doc_id) 

154 )[:100] 

155 target_doc_obj = rec.get("document") or docs_lookup.get( 

156 str(target_doc_id) 

157 ) 

158 fallback_title = rec.get("title", str(target_doc_id)) 

159 document_2_title = ( 

160 _display_title(target_doc_obj) 

161 if target_doc_obj is not None 

162 else fallback_title 

163 )[:100] 

164 relationships.append( 

165 { 

166 "document_1_id": doc_id, 

167 "document_2_id": target_doc_id, 

168 "document_1_title": document_1_title, 

169 "document_2_title": document_2_title, 

170 "relationship_type": "complementary", 

171 "confidence_score": score, 

172 "relationship_summary": f"Complementary content: {reason}", 

173 } 

174 ) 

175 comp_count += 1 

176 if comp_count > 0: 

177 summary_parts.append(f"{comp_count} complementary relationships") 

178 

179 # Citations and insights → summary only 

180 if "citation_network" in analysis_results: 

181 citation_net = analysis_results["citation_network"] 

182 if citation_net.get("edges", 0) > 0: 

183 summary_parts.append(f"{citation_net['edges']} citation relationships") 

184 

185 if analysis_results.get("similarity_insights"): 

186 summary_parts.append("similarity patterns identified") 

187 

188 summary_text = ( 

189 f"Analyzed {total_analyzed} documents: {', '.join(summary_parts)}" 

190 if summary_parts 

191 else f"Analyzed {total_analyzed} documents with no significant relationships found" 

192 ) 

193 

194 return { 

195 "relationships": relationships, 

196 "total_analyzed": total_analyzed, 

197 "summary": summary_text, 

198 }