Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/utils.py: 76%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3from collections import Counter 

4from collections.abc import Iterable 

5from typing import TypeVar 

6 

7import numpy as np 

8 

9T = TypeVar("T") 

10 

11 

12def jaccard_similarity(a: Iterable[T], b: Iterable[T]) -> float: 

13 """Compute Jaccard similarity for two iterables (as sets). 

14 

15 Provided as a pure helper. Not wired into the legacy module yet. 

16 """ 

17 set_a = set(a) 

18 set_b = set(b) 

19 if not set_a and not set_b: 

20 return 0.0 

21 if not set_a or not set_b: 

22 return 0.0 

23 intersection = len(set_a & set_b) 

24 union = len(set_a | set_b) 

25 return intersection / union if union > 0 else 0.0 

26 

27 

28def extract_texts_from_mixed( 

29 items: Iterable[dict | str], key: str = "text" 

30) -> list[str]: 

31 """Extract lowercase texts from a mixed list of dicts or strings. 

32 

33 Mirrors legacy behavior used in CDI but provides a shared, tested helper. 

34 """ 

35 if items is None: 

36 return [] 

37 texts: list[str] = [] 

38 for item in items: 

39 if isinstance(item, dict): 

40 value = str(item.get(key, "")).strip().lower() 

41 if value: 

42 texts.append(value) 

43 elif isinstance(item, str): 

44 value = item.strip().lower() 

45 if value: 

46 texts.append(value) 

47 return texts 

48 

49 

50def weighted_average(scores: dict[str, float], weights: dict[str, float]) -> float: 

51 """Compute a weighted average for named scores with default weights. 

52 

53 Any missing weights default to 0.1 to mirror lenient legacy combining rules. 

54 """ 

55 if not scores: 

56 return 0.0 

57 total = 0.0 

58 total_w = 0.0 

59 for name, score in scores.items(): 

60 w = weights.get(name, 0.1) 

61 total += score * w 

62 total_w += w 

63 return total / total_w if total_w > 0 else 0.0 

64 

65 

66# Common stop words used across similarity functions 

67STOP_WORDS_BASIC: set[str] = { 

68 "the", 

69 "and", 

70 "or", 

71 "but", 

72 "in", 

73 "on", 

74 "at", 

75 "to", 

76 "for", 

77 "of", 

78 "with", 

79 "by", 

80 "a", 

81 "an", 

82 "is", 

83 "are", 

84 "was", 

85 "were", 

86 "be", 

87 "been", 

88 "being", 

89} 

90 

91# Title-specific stop words used when extracting common words 

92TITLE_STOP_WORDS: set[str] = { 

93 "documentation", 

94 "guide", 

95 "overview", 

96 "introduction", 

97 "the", 

98 "and", 

99 "for", 

100 "with", 

101} 

102 

103# Domain and architecture keyword groups 

104DOMAIN_KEYWORDS: list[list[str]] = [ 

105 ["healthcare", "medical", "patient", "clinical"], 

106 ["finance", "payment", "banking", "financial"], 

107 ["ecommerce", "retail", "shopping", "commerce"], 

108 ["education", "learning", "student", "academic"], 

109 ["iot", "device", "sensor", "embedded"], 

110 ["mobile", "app", "ios", "android"], 

111] 

112 

113ARCHITECTURE_PATTERNS: list[list[str]] = [ 

114 ["microservices", "service", "microservice"], 

115 ["api", "rest", "graphql", "endpoint"], 

116 ["database", "data", "storage", "persistence"], 

117 ["authentication", "auth", "identity", "oauth"], 

118 ["messaging", "queue", "event", "pub-sub"], 

119 ["cache", "caching", "redis", "memory"], 

120 ["monitoring", "logging", "observability", "metrics"], 

121] 

122 

123# Technology keyword lists (two variants to preserve legacy behavior) 

124TECH_KEYWORDS_SHARED: list[str] = [ 

125 "react", 

126 "angular", 

127 "vue", 

128 "node", 

129 "node.js", 

130 "python", 

131 "java", 

132 "golang", 

133 "docker", 

134 "kubernetes", 

135 "aws", 

136 "azure", 

137 "gcp", 

138 "postgres", 

139 "mysql", 

140 "mongodb", 

141 "jwt", 

142 "oauth", 

143 "rest", 

144 "graphql", 

145 "grpc", 

146] 

147 

148TECH_KEYWORDS_COUNT: list[str] = [ 

149 "react", 

150 "angular", 

151 "vue", 

152 "node", 

153 "node.js", 

154 "python", 

155 "java", 

156 "docker", 

157 "kubernetes", 

158 "aws", 

159 "azure", 

160 "postgres", 

161 "mysql", 

162 "mongodb", 

163 "jwt", 

164 "oauth", 

165] 

166 

167 

168def compute_common_title_words(titles: list[str], top_k: int = 10) -> list[str]: 

169 """Return top_k common title words excluding title stop words and short tokens.""" 

170 title_words: list[str] = [] 

171 for title in titles: 

172 title_words.extend((title or "").lower().split()) 

173 common = [ 

174 word 

175 for word, count in Counter(title_words).most_common(top_k) 

176 if len(word) > 3 and word not in TITLE_STOP_WORDS 

177 ] 

178 return common 

179 

180 

181def cosine_similarity( 

182 vec1: list[float] | Iterable[float], vec2: list[float] | Iterable[float] 

183) -> float: 

184 """Compute cosine similarity with numpy, guarding zero vectors. 

185 

186 Mirrors legacy behavior in CDI. 

187 """ 

188 try: 

189 a = np.array(list(vec1), dtype=float) 

190 b = np.array(list(vec2), dtype=float) 

191 mag_a = np.linalg.norm(a) 

192 mag_b = np.linalg.norm(b) 

193 if mag_a == 0.0 or mag_b == 0.0: 

194 return 0.0 

195 return float(np.dot(a, b) / (mag_a * mag_b)) 

196 except Exception: 

197 return 0.0 

198 

199 

200def hierarchical_distance_from_breadcrumbs( 

201 breadcrumb1: str | None, breadcrumb2: str | None 

202) -> float: 

203 """Compute hierarchical relatedness score using breadcrumb overlap. 

204 

205 Returns 0.7 for sibling docs (same parent, different leaf), otherwise Jaccard 

206 overlap of breadcrumb sets. Returns 0.0 if unavailable. 

207 """ 

208 if not breadcrumb1 or not breadcrumb2: 

209 return 0.0 

210 parts1 = breadcrumb1.split(" > ") 

211 parts2 = breadcrumb2.split(" > ") 

212 if ( 

213 len(parts1) == len(parts2) 

214 and len(parts1) > 1 

215 and parts1[:-1] == parts2[:-1] 

216 and parts1[-1] != parts2[-1] 

217 ): 

218 return 0.7 

219 set1 = set(parts1) 

220 set2 = set(parts2) 

221 if not set1 or not set2: 

222 return 0.0 

223 inter = len(set1 & set2) 

224 union = len(set1 | set2) 

225 return inter / union if union > 0 else 0.0 

226 

227 

228def split_breadcrumb(breadcrumb: str | None) -> list[str]: 

229 """Split a breadcrumb string into parts using ' > ' delimiter, safely.""" 

230 if not breadcrumb: 

231 return [] 

232 return [part for part in breadcrumb.split(" > ") if part] 

233 

234 

235def cluster_key_from_breadcrumb(breadcrumb: str | None, levels: int = 2) -> str: 

236 """Build a cluster key from the first N breadcrumb levels.""" 

237 parts = split_breadcrumb(breadcrumb) 

238 if not parts: 

239 return "root" 

240 if levels <= 0: 

241 return " > ".join(parts) 

242 return " > ".join(parts[:levels]) 

243 

244 

245def format_hierarchy_cluster_name(context_key: str) -> str: 

246 """Format a human-readable cluster name for hierarchy-based clusters.""" 

247 if context_key == "root": 

248 return "Root Documentation" 

249 parts = split_breadcrumb(context_key) 

250 if len(parts) == 1: 

251 return f"{parts[0]} Section" 

252 if len(parts) >= 2: 

253 return f"{parts[0]} > {parts[1]}" 

254 return f"{context_key} Hierarchy" 

255 

256 

257def clean_topic_name(topic: str) -> str: 

258 """Normalize topic names for display: trim and Title-Case if lowercase.""" 

259 if not topic: 

260 return "" 

261 topic = topic.strip() 

262 if topic.islower(): 

263 return topic.title() 

264 return topic 

265 

266 

267def categorize_cluster_size(size: int) -> str: 

268 """Map cluster size to human-readable bucket. 

269 

270 Follows legacy thresholds: individual<=1, small<=3, medium<=8, large<=15, else very large. 

271 """ 

272 if size <= 1: 

273 return "individual" 

274 if size <= 3: 

275 return "small" 

276 if size <= 8: 

277 return "medium" 

278 if size <= 15: 

279 return "large" 

280 return "very large" 

281 

282 

283def normalize_acronym(token: str) -> str: 

284 """Normalize common acronyms for display consistently across CDI modules. 

285 

286 Falls back to Title Case when not in mapping and tolerates None/empty input. 

287 """ 

288 mapping = { 

289 "oauth": "OAuth", 

290 "jwt": "JWT", 

291 "api": "API", 

292 "ui": "UI", 

293 "ux": "UX", 

294 "sql": "SQL", 

295 } 

296 t = (token or "").strip() 

297 lower = t.lower() 

298 return mapping.get(lower, t.title())