Coverage for src/qdrant_loader_mcp_server/search/hybrid/orchestration/clustering.py: 82%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import time 

4from typing import Any 

5 

6from ...components.search_result_models import HybridSearchResult 

7from ...enhanced.cdi.models import ClusteringStrategy 

8 

9 

10async def cluster_documents( 

11 engine: Any, 

12 documents: list[HybridSearchResult], 

13 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES, 

14 max_clusters: int = 10, 

15 min_cluster_size: int = 2, 

16) -> dict[str, Any]: 

17 """Cluster documents using engine's CDI with delegated helpers. 

18 

19 Args: 

20 engine: The hybrid search engine instance providing CDI and helper APIs. 

21 documents: Non-empty list of `HybridSearchResult` to cluster. 

22 strategy: Clustering strategy (instance of `ClusteringStrategy`). 

23 max_clusters: Maximum number of clusters to produce. Must be int > 0. 

24 min_cluster_size: Minimum documents per cluster. Must be int >= 1 and <= total documents. 

25 

26 Returns: 

27 Dict with the following structure: 

28 - "clusters": list[dict] 

29 Each cluster dict contains: 

30 - "id": str | int 

31 - "name": str | None 

32 - "documents": list[HybridSearchResult] 

33 - "centroid_topics": list[str] | None 

34 - "shared_entities": list[str] | None 

35 - "coherence_score": float | None 

36 - "cluster_summary": str | None 

37 - "representative_doc_id": str | None 

38 - "cluster_strategy": str (the strategy value) 

39 - "quality_metrics": dict[str, Any] 

40 - "document_count": int 

41 - "expected_document_count": int 

42 - "clustering_metadata": dict[str, Any] 

43 Aggregate stats, counts, parameters, and quality assessments for the run. 

44 - "cluster_relationships": list[dict[str, Any]] 

45 Relationship analysis between clusters. 

46 

47 Raises: 

48 ValueError: If inputs are invalid, e.g. documents is not a non-empty list, 

49 strategy is not a `ClusteringStrategy`, or constraints on 

50 max_clusters/min_cluster_size are violated. 

51 """ 

52 # Input validation 

53 if not isinstance(documents, list): 

54 raise ValueError("'documents' must be a list of HybridSearchResult") 

55 if len(documents) == 0: 

56 raise ValueError("'documents' must be a non-empty list") 

57 if not isinstance(max_clusters, int) or max_clusters <= 0: 

58 raise ValueError("'max_clusters' must be an int greater than 0") 

59 if not isinstance(min_cluster_size, int) or min_cluster_size < 1: 

60 raise ValueError("'min_cluster_size' must be an int greater than or equal to 1") 

61 total_documents = len(documents) 

62 # Allow max_clusters greater than total documents; downstream analyzer may cap it. 

63 if min_cluster_size > total_documents: 

64 raise ValueError( 

65 "'min_cluster_size' cannot exceed the total number of documents" 

66 ) 

67 if not isinstance(strategy, ClusteringStrategy): 

68 raise ValueError("'strategy' must be an instance of ClusteringStrategy") 

69 start_time = time.time() 

70 

71 engine.logger.info( 

72 f"Starting clustering with {len(documents)} documents using {strategy.value}" 

73 ) 

74 

75 clusters = engine.cross_document_engine.cluster_analyzer.create_clusters( 

76 documents, strategy, max_clusters, min_cluster_size 

77 ) 

78 

79 # Build robust document lookup with multiple strategies via public API 

80 doc_lookup = engine.build_document_lookup(documents, robust=True) 

81 

82 cluster_data = [] 

83 total_matched_docs = 0 

84 total_requested_docs = 0 

85 

86 for i, cluster in enumerate(clusters): 

87 cluster_documents = [] 

88 doc_ids_found = [] 

89 doc_ids_missing = [] 

90 

91 total_requested_docs += len(cluster.documents) 

92 

93 for doc_id in cluster.documents: 

94 matched_doc = engine.find_document_by_id(doc_id, doc_lookup) 

95 if matched_doc: 

96 cluster_documents.append(matched_doc) 

97 doc_ids_found.append(doc_id) 

98 total_matched_docs += 1 

99 else: 

100 doc_ids_missing.append(doc_id) 

101 engine.logger.warning(f"Document not found in lookup: {doc_id}") 

102 

103 engine.logger.info( 

104 f"Cluster {i}: Found {len(doc_ids_found)}/{len(cluster.documents)} documents" 

105 ) 

106 if doc_ids_missing: 

107 engine.logger.warning( 

108 f"Missing documents in cluster {i}: {doc_ids_missing[:3]}" 

109 ) 

110 

111 # Calculate cluster quality metrics via public API 

112 cluster_quality = engine.calculate_cluster_quality(cluster, cluster_documents) 

113 

114 cluster_data.append( 

115 { 

116 "id": cluster.cluster_id, 

117 "name": cluster.name, 

118 "documents": cluster_documents, 

119 "centroid_topics": ( 

120 getattr(cluster, "centroid_topics", None) 

121 or getattr(cluster, "shared_topics", []) 

122 ), 

123 "shared_entities": cluster.shared_entities, 

124 "coherence_score": cluster.coherence_score, 

125 "cluster_summary": cluster.cluster_description, 

126 "representative_doc_id": cluster.representative_doc_id, 

127 "cluster_strategy": strategy.value, 

128 "quality_metrics": cluster_quality, 

129 "document_count": len(cluster_documents), 

130 "expected_document_count": len(cluster.documents), 

131 } 

132 ) 

133 

134 processing_time = (time.time() - start_time) * 1000 

135 

136 clustering_metadata = engine.build_enhanced_metadata( 

137 clusters, 

138 documents, 

139 strategy, 

140 processing_time, 

141 total_matched_docs, 

142 total_requested_docs, 

143 ) 

144 

145 cluster_relationships = engine.analyze_cluster_relationships(clusters, documents) 

146 

147 engine.logger.info( 

148 f"Clustering completed: {len(clusters)} clusters, " 

149 f"{total_matched_docs}/{total_requested_docs} documents matched, " 

150 f"{len(cluster_relationships)} relationships identified" 

151 ) 

152 

153 return { 

154 "clusters": cluster_data, 

155 "clustering_metadata": clustering_metadata, 

156 "cluster_relationships": cluster_relationships, 

157 }