Coverage for src/qdrant_loader_mcp_server/search/hybrid/components/cluster_quality.py: 96%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3from typing import Any 

4 

5 

6def calculate_std(values: list[float]) -> float: 

7 if len(values) < 2: 

8 return 0.0 

9 mean = sum(values) / len(values) 

10 variance = sum((x - mean) ** 2 for x in values) / len(values) 

11 return variance**0.5 

12 

13 

14def assess_overall_quality(clusters, matched_docs: int, requested_docs: int) -> float: 

15 if not clusters: 

16 return 0.0 

17 

18 retrieval_score = matched_docs / requested_docs if requested_docs > 0 else 0 

19 coherence_scores = [c.coherence_score for c in clusters if c.coherence_score > 0] 

20 coherence_score = ( 

21 sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0 

22 ) 

23 

24 cluster_sizes = [len(c.documents) for c in clusters] 

25 size_distribution_score = 1.0 

26 if len(clusters) == 1 and len(cluster_sizes) > 0 and cluster_sizes[0] > 10: 

27 size_distribution_score = 0.7 

28 elif len([s for s in cluster_sizes if s < 3]) > len(clusters) * 0.7: 

29 size_distribution_score = 0.8 

30 

31 overall_quality = ( 

32 retrieval_score * 0.4 + coherence_score * 0.4 + size_distribution_score * 0.2 

33 ) 

34 return min(1.0, max(0.0, overall_quality)) 

35 

36 

37def generate_clustering_recommendations( 

38 clusters, strategy, matched_docs: int, requested_docs: int 

39) -> dict[str, Any]: 

40 recommendations = { 

41 "quality_threshold_met": ( 

42 matched_docs / requested_docs >= 0.9 if requested_docs > 0 else False 

43 ), 

44 "suggestions": [], 

45 } 

46 

47 retrieval_rate = matched_docs / requested_docs if requested_docs > 0 else 0 

48 if retrieval_rate < 0.9: 

49 recommendations["suggestions"].append( 

50 f"Low document retrieval rate ({retrieval_rate:.1%}). Check document ID consistency." 

51 ) 

52 

53 if len(clusters) == 1 and requested_docs > 10: 

54 recommendations["suggestions"].append( 

55 "Single large cluster detected. Consider trying entity_based or topic_based strategy." 

56 ) 

57 recommendations["alternative_strategies"] = ["entity_based", "topic_based"] 

58 

59 if len(clusters) > requested_docs * 0.5: 

60 recommendations["suggestions"].append( 

61 "Many small clusters. Consider increasing min_cluster_size or trying mixed_features strategy." 

62 ) 

63 

64 coherence_scores = [c.coherence_score for c in clusters if c.coherence_score > 0] 

65 if coherence_scores and sum(coherence_scores) / len(coherence_scores) < 0.5: 

66 recommendations["suggestions"].append( 

67 "Low cluster coherence. Documents may be too diverse for meaningful clustering." 

68 ) 

69 

70 return recommendations 

71 

72 

73def build_enhanced_metadata( 

74 clusters, documents, strategy, processing_time, matched_docs, requested_docs 

75) -> dict[str, Any]: 

76 cluster_sizes = [len(cluster.documents) for cluster in clusters] 

77 coherence_scores = [ 

78 cluster.coherence_score for cluster in clusters if cluster.coherence_score > 0 

79 ] 

80 

81 metadata = { 

82 "strategy": strategy.value, 

83 "total_documents": len(documents), 

84 "clusters_created": len(clusters), 

85 "unclustered_documents": len(documents) - sum(cluster_sizes), 

86 "document_retrieval_rate": ( 

87 matched_docs / requested_docs if requested_docs > 0 else 0 

88 ), 

89 "processing_time_ms": round(processing_time, 2), 

90 "strategy_performance": { 

91 "coherence_avg": ( 

92 sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0 

93 ), 

94 "coherence_std": ( 

95 calculate_std(coherence_scores) if len(coherence_scores) > 1 else 0 

96 ), 

97 "size_distribution": cluster_sizes, 

98 "size_avg": ( 

99 sum(cluster_sizes) / len(cluster_sizes) if cluster_sizes else 0 

100 ), 

101 }, 

102 "clustering_quality": assess_overall_quality( 

103 clusters, matched_docs, requested_docs 

104 ), 

105 "recommendations": generate_clustering_recommendations( 

106 clusters, strategy, matched_docs, requested_docs 

107 ), 

108 } 

109 return metadata 

110 

111 

112def categorize_cluster_size(size: int) -> str: 

113 if size <= 2: 

114 return "small" 

115 elif size <= 5: 

116 return "medium" 

117 elif size <= 10: 

118 return "large" 

119 else: 

120 return "very_large" 

121 

122 

123def estimate_content_similarity(documents: list[Any]) -> float: 

124 if len(documents) < 2: 

125 return 1.0 

126 all_words: list[str] = [] 

127 doc_word_sets: list[set[str]] = [] 

128 for doc in documents[:5]: 

129 words: set[str] = set() 

130 title = getattr(doc, "source_title", None) 

131 if title: 

132 words.update(str(title).lower().split()) 

133 text = getattr(doc, "text", None) 

134 if text: 

135 words.update(str(text)[:200].lower().split()) 

136 doc_word_sets.append(words) 

137 all_words.extend(list(words)) 

138 if not doc_word_sets: 

139 return 0.0 

140 total_overlap = 0.0 

141 comparisons = 0 

142 for i in range(len(doc_word_sets)): 

143 for j in range(i + 1, len(doc_word_sets)): 

144 overlap = len(doc_word_sets[i] & doc_word_sets[j]) 

145 union = len(doc_word_sets[i] | doc_word_sets[j]) 

146 if union > 0: 

147 total_overlap += overlap / union 

148 comparisons += 1 

149 return total_overlap / comparisons if comparisons > 0 else 0.0 

150 

151 

152def calculate_cluster_quality( 

153 cluster: Any, cluster_documents: list[Any] 

154) -> dict[str, Any]: 

155 quality_metrics = { 

156 "document_retrieval_rate": ( 

157 len(cluster_documents) / len(cluster.documents) if cluster.documents else 0 

158 ), 

159 "coherence_score": cluster.coherence_score, 

160 "entity_diversity": len(cluster.shared_entities), 

161 "topic_diversity": len(cluster.shared_topics), 

162 "has_representative": bool(cluster.representative_doc_id), 

163 "cluster_size_category": categorize_cluster_size(len(cluster_documents)), 

164 } 

165 if len(cluster_documents) > 1: 

166 quality_metrics["content_similarity"] = estimate_content_similarity( 

167 cluster_documents 

168 ) 

169 return quality_metrics