Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/extractors/similarity_helpers.py: 67%
175 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import logging
5from ....models import SearchResult
6from ..models import SimilarityMetric
7from ..utils import (
8 ARCHITECTURE_PATTERNS,
9 DOMAIN_KEYWORDS,
10 STOP_WORDS_BASIC,
11 TECH_KEYWORDS_COUNT,
12 TECH_KEYWORDS_SHARED,
13 extract_texts_from_mixed,
14 weighted_average,
15)
18def _normalize_runtime(value: str) -> str:
19 """Normalize runtime/technology variants to canonical names.
21 - Returns "node.js" for any of {"node", "nodejs", "node.js"}
22 - Otherwise returns the lowercased cleaned input
23 """
24 v = (value or "").lower()
25 return "node.js" if v in {"node", "nodejs", "node.js"} else v
28def get_shared_entities(doc1: SearchResult, doc2: SearchResult) -> list[str]:
29 ents1 = extract_texts_from_mixed(doc1.entities)
30 ents2 = extract_texts_from_mixed(doc2.entities)
31 return list(set(ents1) & set(ents2))
34def get_shared_topics(doc1: SearchResult, doc2: SearchResult) -> list[str]:
35 topics1 = extract_texts_from_mixed(doc1.topics)
36 topics2 = extract_texts_from_mixed(doc2.topics)
37 return list(set(topics1) & set(topics2))
40def combine_metric_scores(metric_scores: dict[SimilarityMetric, float]) -> float:
41 if not metric_scores:
42 return 0.0
43 weights = {
44 SimilarityMetric.ENTITY_OVERLAP: 0.25,
45 SimilarityMetric.TOPIC_OVERLAP: 0.25,
46 SimilarityMetric.METADATA_SIMILARITY: 0.20,
47 SimilarityMetric.CONTENT_FEATURES: 0.15,
48 SimilarityMetric.HIERARCHICAL_DISTANCE: 0.10,
49 SimilarityMetric.SEMANTIC_SIMILARITY: 0.05,
50 }
51 # Convert Enum keys to string names for generic helper
52 scores_as_named = {m.value: s for m, s in metric_scores.items()}
53 return weighted_average(scores_as_named, {k.value: v for k, v in weights.items()})
56def calculate_semantic_similarity_spacy(
57 spacy_analyzer, text1: str, text2: str
58) -> float:
59 """Compute spaCy vector similarity on truncated texts, mirroring legacy behavior.
61 Only expected, recoverable errors are handled; unexpected exceptions propagate.
62 """
63 logger = logging.getLogger(__name__)
64 try:
65 doc1_analyzed = spacy_analyzer.nlp((text1 or "")[:500])
66 doc2_analyzed = spacy_analyzer.nlp((text2 or "")[:500])
67 return float(doc1_analyzed.similarity(doc2_analyzed))
68 except (AttributeError, ValueError, OSError) as e:
69 logger.error(
70 "spaCy similarity failed (recoverable): %s | len1=%d len2=%d",
71 e,
72 len(text1 or ""),
73 len(text2 or ""),
74 )
75 return 0.0
76 except Exception:
77 # Let unexpected exceptions bubble up for visibility
78 raise
81def calculate_text_similarity(doc1: SearchResult, doc2: SearchResult) -> float:
82 """Legacy-style text similarity using stopword-filtered Jaccard."""
83 text1 = (doc1.text or "").lower()
84 text2 = (doc2.text or "").lower()
85 words1 = set(text1.split())
86 words2 = set(text2.split())
87 words1 -= STOP_WORDS_BASIC
88 words2 -= STOP_WORDS_BASIC
89 if not words1 or not words2:
90 return 0.0
91 intersection = len(words1 & words2)
92 union = len(words1 | words2)
93 return intersection / union if union > 0 else 0.0
96def extract_context_snippet(text: str, keyword: str, max_length: int = 150) -> str:
97 """Extract a context snippet around a keyword from text (legacy-compatible)."""
98 import re
100 keyword_lower = (keyword or "").lower()
101 pattern = r"\b" + re.escape(keyword_lower) + r"\b"
103 match = re.search(pattern, text, re.IGNORECASE)
104 if not match:
105 words = keyword_lower.split()
106 for word in words:
107 word_pattern = r"\b" + re.escape(word) + r"\b"
108 match = re.search(word_pattern, text, re.IGNORECASE)
109 if match:
110 keyword = word
111 break
113 if not match:
114 return (text or "")[:max_length].strip()
116 keyword_start = match.start()
117 snippet_start = max(0, keyword_start - max_length // 2)
118 snippet_end = min(len(text), keyword_start + len(keyword) + max_length // 2)
119 snippet = text[snippet_start:snippet_end].strip()
121 sentences = snippet.split(".")
122 if len(sentences) > 1:
123 for i, sentence in enumerate(sentences):
124 if keyword.lower() in sentence.lower():
125 start_idx = max(0, i - 1) if i > 0 else 0
126 end_idx = min(len(sentences), i + 2)
127 snippet = ".".join(
128 s.strip() for s in sentences[start_idx:end_idx]
129 ).strip()
130 break
132 return snippet
135def have_semantic_similarity(doc1: SearchResult, doc2: SearchResult) -> bool:
136 """Heuristic semantic similarity based on title overlap and key terms."""
137 title1 = (doc1.source_title or "").lower()
138 title2 = (doc2.source_title or "").lower()
139 if title1 and title2:
140 title_words1 = set(title1.split())
141 title_words2 = set(title2.split())
142 if title_words1 & title_words2:
143 return True
145 key_terms = [
146 "authentication",
147 "security",
148 "login",
149 "password",
150 "access",
151 "user",
152 "interface",
153 "design",
154 "app",
155 "mobile",
156 ]
157 text1_lower = (doc1.text or "").lower()
158 text2_lower = (doc2.text or "").lower()
159 terms_in_doc1 = [term for term in key_terms if term in text1_lower]
160 terms_in_doc2 = [term for term in key_terms if term in text2_lower]
161 return len(set(terms_in_doc1) & set(terms_in_doc2)) >= 2
164def has_shared_entities(doc1: SearchResult, doc2: SearchResult) -> bool:
165 return len(set(get_shared_entities(doc1, doc2))) > 0
168def has_shared_topics(doc1: SearchResult, doc2: SearchResult) -> bool:
169 return len(set(get_shared_topics(doc1, doc2))) > 0
172def get_shared_entities_count(doc1: SearchResult, doc2: SearchResult) -> int:
173 return len(set(get_shared_entities(doc1, doc2)))
176def get_shared_topics_count(doc1: SearchResult, doc2: SearchResult) -> int:
177 return len(set(get_shared_topics(doc1, doc2)))
180def has_transferable_domain_knowledge(doc1: SearchResult, doc2: SearchResult) -> bool:
181 title1 = (doc1.source_title or "").lower()
182 title2 = (doc2.source_title or "").lower()
183 for domain in DOMAIN_KEYWORDS:
184 if any(k in title1 for k in domain) and any(k in title2 for k in domain):
185 return True
186 return False
189def has_reusable_architecture_patterns(doc1: SearchResult, doc2: SearchResult) -> bool:
190 title1 = (doc1.source_title or "").lower()
191 title2 = (doc2.source_title or "").lower()
192 for pattern in ARCHITECTURE_PATTERNS:
193 if any(k in title1 for k in pattern) and any(k in title2 for k in pattern):
194 return True
195 return False
198def has_shared_technologies(doc1: SearchResult, doc2: SearchResult) -> bool:
199 # Reuse the shared extraction logic for consistency across helpers
200 ents1 = set(extract_texts_from_mixed(getattr(doc1, "entities", []) or []))
201 ents2 = set(extract_texts_from_mixed(getattr(doc2, "entities", []) or []))
203 if {_normalize_runtime(e) for e in ents1} & {_normalize_runtime(e) for e in ents2}:
204 return True
206 title1 = (doc1.source_title or "").lower()
207 title2 = (doc2.source_title or "").lower()
208 return any(k in title1 and k in title2 for k in TECH_KEYWORDS_SHARED)
211def get_shared_technologies_count(doc1: SearchResult, doc2: SearchResult) -> int:
212 entities1 = set(extract_texts_from_mixed(getattr(doc1, "entities", []) or []))
213 entities2 = set(extract_texts_from_mixed(getattr(doc2, "entities", []) or []))
215 shared_entities = {_normalize_runtime(e) for e in entities1} & {
216 _normalize_runtime(e) for e in entities2
217 }
218 if shared_entities:
219 return len(shared_entities)
221 title1 = (doc1.source_title or "").lower()
222 title2 = (doc2.source_title or "").lower()
223 return sum(1 for k in TECH_KEYWORDS_COUNT if k in title1 and k in title2)
226def calculate_entity_overlap(doc1: SearchResult, doc2: SearchResult) -> float:
227 """Calculate entity overlap between documents (Jaccard)."""
228 entities1 = extract_texts_from_mixed(getattr(doc1, "entities", []) or [])
229 entities2 = extract_texts_from_mixed(getattr(doc2, "entities", []) or [])
230 if not entities1 or not entities2:
231 return 0.0
232 set1 = set(entities1)
233 set2 = set(entities2)
234 inter = len(set1 & set2)
235 union = len(set1 | set2)
236 return inter / union if union > 0 else 0.0
239def calculate_topic_overlap(doc1: SearchResult, doc2: SearchResult) -> float:
240 """Calculate topic overlap between documents (Jaccard)."""
241 topics1 = extract_texts_from_mixed(getattr(doc1, "topics", []) or [])
242 topics2 = extract_texts_from_mixed(getattr(doc2, "topics", []) or [])
243 if not topics1 or not topics2:
244 return 0.0
245 set1 = set(topics1)
246 set2 = set(topics2)
247 inter = len(set1 & set2)
248 union = len(set1 | set2)
249 return inter / union if union > 0 else 0.0
252def calculate_metadata_similarity(doc1: SearchResult, doc2: SearchResult) -> float:
253 """Calculate metadata similarity combining project/source/features/word count."""
254 similarity_factors: list[float] = []
256 if getattr(doc1, "project_id", None) and getattr(doc2, "project_id", None):
257 similarity_factors.append(1.0 if doc1.project_id == doc2.project_id else 0.0)
259 similarity_factors.append(0.5 if doc1.source_type == doc2.source_type else 0.0)
261 features1 = [
262 getattr(doc1, "has_code_blocks", False),
263 getattr(doc1, "has_tables", False),
264 getattr(doc1, "has_images", False),
265 getattr(doc1, "has_links", False),
266 ]
267 features2 = [
268 getattr(doc2, "has_code_blocks", False),
269 getattr(doc2, "has_tables", False),
270 getattr(doc2, "has_images", False),
271 getattr(doc2, "has_links", False),
272 ]
273 min_len = min(len(features1), len(features2))
274 if min_len == 0:
275 feature_similarity = 0.0
276 else:
277 feature_similarity = sum(
278 f1 == f2 for f1, f2 in zip(features1, features2, strict=False)
279 ) / float(min_len)
280 similarity_factors.append(feature_similarity)
282 if getattr(doc1, "word_count", None) and getattr(doc2, "word_count", None):
283 min_words = min(doc1.word_count, doc2.word_count)
284 max_words = max(doc1.word_count, doc2.word_count)
285 similarity_factors.append(min_words / max_words if max_words > 0 else 0.0)
287 return (
288 (sum(similarity_factors) / len(similarity_factors))
289 if similarity_factors
290 else 0.0
291 )
294def calculate_content_features_similarity(
295 doc1: SearchResult, doc2: SearchResult
296) -> float:
297 """Calculate content features similarity (read time, depth, content flags)."""
298 read_time_similarity = 0.0
299 if getattr(doc1, "estimated_read_time", None) and getattr(
300 doc2, "estimated_read_time", None
301 ):
302 min_time = min(doc1.estimated_read_time, doc2.estimated_read_time)
303 max_time = max(doc1.estimated_read_time, doc2.estimated_read_time)
304 read_time_similarity = min_time / max_time if max_time > 0 else 0.0
306 depth_similarity = 0.0
307 if (
308 getattr(doc1, "depth", None) is not None
309 and getattr(doc2, "depth", None) is not None
310 ):
311 depth_diff = abs(doc1.depth - doc2.depth)
312 depth_similarity = max(0.0, 1.0 - depth_diff / 5.0)
314 feature_factors = [read_time_similarity, depth_similarity]
315 return sum(feature_factors) / len(feature_factors) if feature_factors else 0.0