Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/extractors/similarity_helpers.py: 67%

175 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import logging 

4 

5from ....models import SearchResult 

6from ..models import SimilarityMetric 

7from ..utils import ( 

8 ARCHITECTURE_PATTERNS, 

9 DOMAIN_KEYWORDS, 

10 STOP_WORDS_BASIC, 

11 TECH_KEYWORDS_COUNT, 

12 TECH_KEYWORDS_SHARED, 

13 extract_texts_from_mixed, 

14 weighted_average, 

15) 

16 

17 

18def _normalize_runtime(value: str) -> str: 

19 """Normalize runtime/technology variants to canonical names. 

20 

21 - Returns "node.js" for any of {"node", "nodejs", "node.js"} 

22 - Otherwise returns the lowercased cleaned input 

23 """ 

24 v = (value or "").lower() 

25 return "node.js" if v in {"node", "nodejs", "node.js"} else v 

26 

27 

28def get_shared_entities(doc1: SearchResult, doc2: SearchResult) -> list[str]: 

29 ents1 = extract_texts_from_mixed(doc1.entities) 

30 ents2 = extract_texts_from_mixed(doc2.entities) 

31 return list(set(ents1) & set(ents2)) 

32 

33 

34def get_shared_topics(doc1: SearchResult, doc2: SearchResult) -> list[str]: 

35 topics1 = extract_texts_from_mixed(doc1.topics) 

36 topics2 = extract_texts_from_mixed(doc2.topics) 

37 return list(set(topics1) & set(topics2)) 

38 

39 

40def combine_metric_scores(metric_scores: dict[SimilarityMetric, float]) -> float: 

41 if not metric_scores: 

42 return 0.0 

43 weights = { 

44 SimilarityMetric.ENTITY_OVERLAP: 0.25, 

45 SimilarityMetric.TOPIC_OVERLAP: 0.25, 

46 SimilarityMetric.METADATA_SIMILARITY: 0.20, 

47 SimilarityMetric.CONTENT_FEATURES: 0.15, 

48 SimilarityMetric.HIERARCHICAL_DISTANCE: 0.10, 

49 SimilarityMetric.SEMANTIC_SIMILARITY: 0.05, 

50 } 

51 # Convert Enum keys to string names for generic helper 

52 scores_as_named = {m.value: s for m, s in metric_scores.items()} 

53 return weighted_average(scores_as_named, {k.value: v for k, v in weights.items()}) 

54 

55 

56def calculate_semantic_similarity_spacy( 

57 spacy_analyzer, text1: str, text2: str 

58) -> float: 

59 """Compute spaCy vector similarity on truncated texts, mirroring legacy behavior. 

60 

61 Only expected, recoverable errors are handled; unexpected exceptions propagate. 

62 """ 

63 logger = logging.getLogger(__name__) 

64 try: 

65 doc1_analyzed = spacy_analyzer.nlp((text1 or "")[:500]) 

66 doc2_analyzed = spacy_analyzer.nlp((text2 or "")[:500]) 

67 return float(doc1_analyzed.similarity(doc2_analyzed)) 

68 except (AttributeError, ValueError, OSError) as e: 

69 logger.error( 

70 "spaCy similarity failed (recoverable): %s | len1=%d len2=%d", 

71 e, 

72 len(text1 or ""), 

73 len(text2 or ""), 

74 ) 

75 return 0.0 

76 except Exception: 

77 # Let unexpected exceptions bubble up for visibility 

78 raise 

79 

80 

81def calculate_text_similarity(doc1: SearchResult, doc2: SearchResult) -> float: 

82 """Legacy-style text similarity using stopword-filtered Jaccard.""" 

83 text1 = (doc1.text or "").lower() 

84 text2 = (doc2.text or "").lower() 

85 words1 = set(text1.split()) 

86 words2 = set(text2.split()) 

87 words1 -= STOP_WORDS_BASIC 

88 words2 -= STOP_WORDS_BASIC 

89 if not words1 or not words2: 

90 return 0.0 

91 intersection = len(words1 & words2) 

92 union = len(words1 | words2) 

93 return intersection / union if union > 0 else 0.0 

94 

95 

96def extract_context_snippet(text: str, keyword: str, max_length: int = 150) -> str: 

97 """Extract a context snippet around a keyword from text (legacy-compatible).""" 

98 import re 

99 

100 keyword_lower = (keyword or "").lower() 

101 pattern = r"\b" + re.escape(keyword_lower) + r"\b" 

102 

103 match = re.search(pattern, text, re.IGNORECASE) 

104 if not match: 

105 words = keyword_lower.split() 

106 for word in words: 

107 word_pattern = r"\b" + re.escape(word) + r"\b" 

108 match = re.search(word_pattern, text, re.IGNORECASE) 

109 if match: 

110 keyword = word 

111 break 

112 

113 if not match: 

114 return (text or "")[:max_length].strip() 

115 

116 keyword_start = match.start() 

117 snippet_start = max(0, keyword_start - max_length // 2) 

118 snippet_end = min(len(text), keyword_start + len(keyword) + max_length // 2) 

119 snippet = text[snippet_start:snippet_end].strip() 

120 

121 sentences = snippet.split(".") 

122 if len(sentences) > 1: 

123 for i, sentence in enumerate(sentences): 

124 if keyword.lower() in sentence.lower(): 

125 start_idx = max(0, i - 1) if i > 0 else 0 

126 end_idx = min(len(sentences), i + 2) 

127 snippet = ".".join( 

128 s.strip() for s in sentences[start_idx:end_idx] 

129 ).strip() 

130 break 

131 

132 return snippet 

133 

134 

135def have_semantic_similarity(doc1: SearchResult, doc2: SearchResult) -> bool: 

136 """Heuristic semantic similarity based on title overlap and key terms.""" 

137 title1 = (doc1.source_title or "").lower() 

138 title2 = (doc2.source_title or "").lower() 

139 if title1 and title2: 

140 title_words1 = set(title1.split()) 

141 title_words2 = set(title2.split()) 

142 if title_words1 & title_words2: 

143 return True 

144 

145 key_terms = [ 

146 "authentication", 

147 "security", 

148 "login", 

149 "password", 

150 "access", 

151 "user", 

152 "interface", 

153 "design", 

154 "app", 

155 "mobile", 

156 ] 

157 text1_lower = (doc1.text or "").lower() 

158 text2_lower = (doc2.text or "").lower() 

159 terms_in_doc1 = [term for term in key_terms if term in text1_lower] 

160 terms_in_doc2 = [term for term in key_terms if term in text2_lower] 

161 return len(set(terms_in_doc1) & set(terms_in_doc2)) >= 2 

162 

163 

164def has_shared_entities(doc1: SearchResult, doc2: SearchResult) -> bool: 

165 return len(set(get_shared_entities(doc1, doc2))) > 0 

166 

167 

168def has_shared_topics(doc1: SearchResult, doc2: SearchResult) -> bool: 

169 return len(set(get_shared_topics(doc1, doc2))) > 0 

170 

171 

172def get_shared_entities_count(doc1: SearchResult, doc2: SearchResult) -> int: 

173 return len(set(get_shared_entities(doc1, doc2))) 

174 

175 

176def get_shared_topics_count(doc1: SearchResult, doc2: SearchResult) -> int: 

177 return len(set(get_shared_topics(doc1, doc2))) 

178 

179 

180def has_transferable_domain_knowledge(doc1: SearchResult, doc2: SearchResult) -> bool: 

181 title1 = (doc1.source_title or "").lower() 

182 title2 = (doc2.source_title or "").lower() 

183 for domain in DOMAIN_KEYWORDS: 

184 if any(k in title1 for k in domain) and any(k in title2 for k in domain): 

185 return True 

186 return False 

187 

188 

189def has_reusable_architecture_patterns(doc1: SearchResult, doc2: SearchResult) -> bool: 

190 title1 = (doc1.source_title or "").lower() 

191 title2 = (doc2.source_title or "").lower() 

192 for pattern in ARCHITECTURE_PATTERNS: 

193 if any(k in title1 for k in pattern) and any(k in title2 for k in pattern): 

194 return True 

195 return False 

196 

197 

198def has_shared_technologies(doc1: SearchResult, doc2: SearchResult) -> bool: 

199 # Reuse the shared extraction logic for consistency across helpers 

200 ents1 = set(extract_texts_from_mixed(getattr(doc1, "entities", []) or [])) 

201 ents2 = set(extract_texts_from_mixed(getattr(doc2, "entities", []) or [])) 

202 

203 if {_normalize_runtime(e) for e in ents1} & {_normalize_runtime(e) for e in ents2}: 

204 return True 

205 

206 title1 = (doc1.source_title or "").lower() 

207 title2 = (doc2.source_title or "").lower() 

208 return any(k in title1 and k in title2 for k in TECH_KEYWORDS_SHARED) 

209 

210 

211def get_shared_technologies_count(doc1: SearchResult, doc2: SearchResult) -> int: 

212 entities1 = set(extract_texts_from_mixed(getattr(doc1, "entities", []) or [])) 

213 entities2 = set(extract_texts_from_mixed(getattr(doc2, "entities", []) or [])) 

214 

215 shared_entities = {_normalize_runtime(e) for e in entities1} & { 

216 _normalize_runtime(e) for e in entities2 

217 } 

218 if shared_entities: 

219 return len(shared_entities) 

220 

221 title1 = (doc1.source_title or "").lower() 

222 title2 = (doc2.source_title or "").lower() 

223 return sum(1 for k in TECH_KEYWORDS_COUNT if k in title1 and k in title2) 

224 

225 

226def calculate_entity_overlap(doc1: SearchResult, doc2: SearchResult) -> float: 

227 """Calculate entity overlap between documents (Jaccard).""" 

228 entities1 = extract_texts_from_mixed(getattr(doc1, "entities", []) or []) 

229 entities2 = extract_texts_from_mixed(getattr(doc2, "entities", []) or []) 

230 if not entities1 or not entities2: 

231 return 0.0 

232 set1 = set(entities1) 

233 set2 = set(entities2) 

234 inter = len(set1 & set2) 

235 union = len(set1 | set2) 

236 return inter / union if union > 0 else 0.0 

237 

238 

239def calculate_topic_overlap(doc1: SearchResult, doc2: SearchResult) -> float: 

240 """Calculate topic overlap between documents (Jaccard).""" 

241 topics1 = extract_texts_from_mixed(getattr(doc1, "topics", []) or []) 

242 topics2 = extract_texts_from_mixed(getattr(doc2, "topics", []) or []) 

243 if not topics1 or not topics2: 

244 return 0.0 

245 set1 = set(topics1) 

246 set2 = set(topics2) 

247 inter = len(set1 & set2) 

248 union = len(set1 | set2) 

249 return inter / union if union > 0 else 0.0 

250 

251 

252def calculate_metadata_similarity(doc1: SearchResult, doc2: SearchResult) -> float: 

253 """Calculate metadata similarity combining project/source/features/word count.""" 

254 similarity_factors: list[float] = [] 

255 

256 if getattr(doc1, "project_id", None) and getattr(doc2, "project_id", None): 

257 similarity_factors.append(1.0 if doc1.project_id == doc2.project_id else 0.0) 

258 

259 similarity_factors.append(0.5 if doc1.source_type == doc2.source_type else 0.0) 

260 

261 features1 = [ 

262 getattr(doc1, "has_code_blocks", False), 

263 getattr(doc1, "has_tables", False), 

264 getattr(doc1, "has_images", False), 

265 getattr(doc1, "has_links", False), 

266 ] 

267 features2 = [ 

268 getattr(doc2, "has_code_blocks", False), 

269 getattr(doc2, "has_tables", False), 

270 getattr(doc2, "has_images", False), 

271 getattr(doc2, "has_links", False), 

272 ] 

273 min_len = min(len(features1), len(features2)) 

274 if min_len == 0: 

275 feature_similarity = 0.0 

276 else: 

277 feature_similarity = sum( 

278 f1 == f2 for f1, f2 in zip(features1, features2, strict=False) 

279 ) / float(min_len) 

280 similarity_factors.append(feature_similarity) 

281 

282 if getattr(doc1, "word_count", None) and getattr(doc2, "word_count", None): 

283 min_words = min(doc1.word_count, doc2.word_count) 

284 max_words = max(doc1.word_count, doc2.word_count) 

285 similarity_factors.append(min_words / max_words if max_words > 0 else 0.0) 

286 

287 return ( 

288 (sum(similarity_factors) / len(similarity_factors)) 

289 if similarity_factors 

290 else 0.0 

291 ) 

292 

293 

294def calculate_content_features_similarity( 

295 doc1: SearchResult, doc2: SearchResult 

296) -> float: 

297 """Calculate content features similarity (read time, depth, content flags).""" 

298 read_time_similarity = 0.0 

299 if getattr(doc1, "estimated_read_time", None) and getattr( 

300 doc2, "estimated_read_time", None 

301 ): 

302 min_time = min(doc1.estimated_read_time, doc2.estimated_read_time) 

303 max_time = max(doc1.estimated_read_time, doc2.estimated_read_time) 

304 read_time_similarity = min_time / max_time if max_time > 0 else 0.0 

305 

306 depth_similarity = 0.0 

307 if ( 

308 getattr(doc1, "depth", None) is not None 

309 and getattr(doc2, "depth", None) is not None 

310 ): 

311 depth_diff = abs(doc1.depth - doc2.depth) 

312 depth_similarity = max(0.0, 1.0 - depth_diff / 5.0) 

313 

314 feature_factors = [read_time_similarity, depth_similarity] 

315 return sum(feature_factors) / len(feature_factors) if feature_factors else 0.0