Coverage for src/qdrant_loader_mcp_server/search/hybrid/components/cluster_quality.py: 96%
81 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from typing import Any
6def calculate_std(values: list[float]) -> float:
7 if len(values) < 2:
8 return 0.0
9 mean = sum(values) / len(values)
10 variance = sum((x - mean) ** 2 for x in values) / len(values)
11 return variance**0.5
14def assess_overall_quality(clusters, matched_docs: int, requested_docs: int) -> float:
15 if not clusters:
16 return 0.0
18 retrieval_score = matched_docs / requested_docs if requested_docs > 0 else 0
19 coherence_scores = [c.coherence_score for c in clusters if c.coherence_score > 0]
20 coherence_score = (
21 sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0
22 )
24 cluster_sizes = [len(c.documents) for c in clusters]
25 size_distribution_score = 1.0
26 if len(clusters) == 1 and len(cluster_sizes) > 0 and cluster_sizes[0] > 10:
27 size_distribution_score = 0.7
28 elif len([s for s in cluster_sizes if s < 3]) > len(clusters) * 0.7:
29 size_distribution_score = 0.8
31 overall_quality = (
32 retrieval_score * 0.4 + coherence_score * 0.4 + size_distribution_score * 0.2
33 )
34 return min(1.0, max(0.0, overall_quality))
37def generate_clustering_recommendations(
38 clusters, strategy, matched_docs: int, requested_docs: int
39) -> dict[str, Any]:
40 recommendations = {
41 "quality_threshold_met": (
42 matched_docs / requested_docs >= 0.9 if requested_docs > 0 else False
43 ),
44 "suggestions": [],
45 }
47 retrieval_rate = matched_docs / requested_docs if requested_docs > 0 else 0
48 if retrieval_rate < 0.9:
49 recommendations["suggestions"].append(
50 f"Low document retrieval rate ({retrieval_rate:.1%}). Check document ID consistency."
51 )
53 if len(clusters) == 1 and requested_docs > 10:
54 recommendations["suggestions"].append(
55 "Single large cluster detected. Consider trying entity_based or topic_based strategy."
56 )
57 recommendations["alternative_strategies"] = ["entity_based", "topic_based"]
59 if len(clusters) > requested_docs * 0.5:
60 recommendations["suggestions"].append(
61 "Many small clusters. Consider increasing min_cluster_size or trying mixed_features strategy."
62 )
64 coherence_scores = [c.coherence_score for c in clusters if c.coherence_score > 0]
65 if coherence_scores and sum(coherence_scores) / len(coherence_scores) < 0.5:
66 recommendations["suggestions"].append(
67 "Low cluster coherence. Documents may be too diverse for meaningful clustering."
68 )
70 return recommendations
73def build_enhanced_metadata(
74 clusters, documents, strategy, processing_time, matched_docs, requested_docs
75) -> dict[str, Any]:
76 cluster_sizes = [len(cluster.documents) for cluster in clusters]
77 coherence_scores = [
78 cluster.coherence_score for cluster in clusters if cluster.coherence_score > 0
79 ]
81 metadata = {
82 "strategy": strategy.value,
83 "total_documents": len(documents),
84 "clusters_created": len(clusters),
85 "unclustered_documents": len(documents) - sum(cluster_sizes),
86 "document_retrieval_rate": (
87 matched_docs / requested_docs if requested_docs > 0 else 0
88 ),
89 "processing_time_ms": round(processing_time, 2),
90 "strategy_performance": {
91 "coherence_avg": (
92 sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0
93 ),
94 "coherence_std": (
95 calculate_std(coherence_scores) if len(coherence_scores) > 1 else 0
96 ),
97 "size_distribution": cluster_sizes,
98 "size_avg": (
99 sum(cluster_sizes) / len(cluster_sizes) if cluster_sizes else 0
100 ),
101 },
102 "clustering_quality": assess_overall_quality(
103 clusters, matched_docs, requested_docs
104 ),
105 "recommendations": generate_clustering_recommendations(
106 clusters, strategy, matched_docs, requested_docs
107 ),
108 }
109 return metadata
112def categorize_cluster_size(size: int) -> str:
113 if size <= 2:
114 return "small"
115 elif size <= 5:
116 return "medium"
117 elif size <= 10:
118 return "large"
119 else:
120 return "very_large"
123def estimate_content_similarity(documents: list[Any]) -> float:
124 if len(documents) < 2:
125 return 1.0
126 all_words: list[str] = []
127 doc_word_sets: list[set[str]] = []
128 for doc in documents[:5]:
129 words: set[str] = set()
130 title = getattr(doc, "source_title", None)
131 if title:
132 words.update(str(title).lower().split())
133 text = getattr(doc, "text", None)
134 if text:
135 words.update(str(text)[:200].lower().split())
136 doc_word_sets.append(words)
137 all_words.extend(list(words))
138 if not doc_word_sets:
139 return 0.0
140 total_overlap = 0.0
141 comparisons = 0
142 for i in range(len(doc_word_sets)):
143 for j in range(i + 1, len(doc_word_sets)):
144 overlap = len(doc_word_sets[i] & doc_word_sets[j])
145 union = len(doc_word_sets[i] | doc_word_sets[j])
146 if union > 0:
147 total_overlap += overlap / union
148 comparisons += 1
149 return total_overlap / comparisons if comparisons > 0 else 0.0
152def calculate_cluster_quality(
153 cluster: Any, cluster_documents: list[Any]
154) -> dict[str, Any]:
155 quality_metrics = {
156 "document_retrieval_rate": (
157 len(cluster_documents) / len(cluster.documents) if cluster.documents else 0
158 ),
159 "coherence_score": cluster.coherence_score,
160 "entity_diversity": len(cluster.shared_entities),
161 "topic_diversity": len(cluster.shared_topics),
162 "has_representative": bool(cluster.representative_doc_id),
163 "cluster_size_category": categorize_cluster_size(len(cluster_documents)),
164 }
165 if len(cluster_documents) > 1:
166 quality_metrics["content_similarity"] = estimate_content_similarity(
167 cluster_documents
168 )
169 return quality_metrics