Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/utils.py: 76%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from collections import Counter
4from collections.abc import Iterable
5from typing import TypeVar
7import numpy as np
9T = TypeVar("T")
12def jaccard_similarity(a: Iterable[T], b: Iterable[T]) -> float:
13 """Compute Jaccard similarity for two iterables (as sets).
15 Provided as a pure helper. Not wired into the legacy module yet.
16 """
17 set_a = set(a)
18 set_b = set(b)
19 if not set_a and not set_b:
20 return 0.0
21 if not set_a or not set_b:
22 return 0.0
23 intersection = len(set_a & set_b)
24 union = len(set_a | set_b)
25 return intersection / union if union > 0 else 0.0
28def extract_texts_from_mixed(
29 items: Iterable[dict | str], key: str = "text"
30) -> list[str]:
31 """Extract lowercase texts from a mixed list of dicts or strings.
33 Mirrors legacy behavior used in CDI but provides a shared, tested helper.
34 """
35 if items is None:
36 return []
37 texts: list[str] = []
38 for item in items:
39 if isinstance(item, dict):
40 value = str(item.get(key, "")).strip().lower()
41 if value:
42 texts.append(value)
43 elif isinstance(item, str):
44 value = item.strip().lower()
45 if value:
46 texts.append(value)
47 return texts
50def weighted_average(scores: dict[str, float], weights: dict[str, float]) -> float:
51 """Compute a weighted average for named scores with default weights.
53 Any missing weights default to 0.1 to mirror lenient legacy combining rules.
54 """
55 if not scores:
56 return 0.0
57 total = 0.0
58 total_w = 0.0
59 for name, score in scores.items():
60 w = weights.get(name, 0.1)
61 total += score * w
62 total_w += w
63 return total / total_w if total_w > 0 else 0.0
66# Common stop words used across similarity functions
67STOP_WORDS_BASIC: set[str] = {
68 "the",
69 "and",
70 "or",
71 "but",
72 "in",
73 "on",
74 "at",
75 "to",
76 "for",
77 "of",
78 "with",
79 "by",
80 "a",
81 "an",
82 "is",
83 "are",
84 "was",
85 "were",
86 "be",
87 "been",
88 "being",
89}
91# Title-specific stop words used when extracting common words
92TITLE_STOP_WORDS: set[str] = {
93 "documentation",
94 "guide",
95 "overview",
96 "introduction",
97 "the",
98 "and",
99 "for",
100 "with",
101}
103# Domain and architecture keyword groups
104DOMAIN_KEYWORDS: list[list[str]] = [
105 ["healthcare", "medical", "patient", "clinical"],
106 ["finance", "payment", "banking", "financial"],
107 ["ecommerce", "retail", "shopping", "commerce"],
108 ["education", "learning", "student", "academic"],
109 ["iot", "device", "sensor", "embedded"],
110 ["mobile", "app", "ios", "android"],
111]
113ARCHITECTURE_PATTERNS: list[list[str]] = [
114 ["microservices", "service", "microservice"],
115 ["api", "rest", "graphql", "endpoint"],
116 ["database", "data", "storage", "persistence"],
117 ["authentication", "auth", "identity", "oauth"],
118 ["messaging", "queue", "event", "pub-sub"],
119 ["cache", "caching", "redis", "memory"],
120 ["monitoring", "logging", "observability", "metrics"],
121]
123# Technology keyword lists (two variants to preserve legacy behavior)
124TECH_KEYWORDS_SHARED: list[str] = [
125 "react",
126 "angular",
127 "vue",
128 "node",
129 "node.js",
130 "python",
131 "java",
132 "golang",
133 "docker",
134 "kubernetes",
135 "aws",
136 "azure",
137 "gcp",
138 "postgres",
139 "mysql",
140 "mongodb",
141 "jwt",
142 "oauth",
143 "rest",
144 "graphql",
145 "grpc",
146]
148TECH_KEYWORDS_COUNT: list[str] = [
149 "react",
150 "angular",
151 "vue",
152 "node",
153 "node.js",
154 "python",
155 "java",
156 "docker",
157 "kubernetes",
158 "aws",
159 "azure",
160 "postgres",
161 "mysql",
162 "mongodb",
163 "jwt",
164 "oauth",
165]
168def compute_common_title_words(titles: list[str], top_k: int = 10) -> list[str]:
169 """Return top_k common title words excluding title stop words and short tokens."""
170 title_words: list[str] = []
171 for title in titles:
172 title_words.extend((title or "").lower().split())
173 common = [
174 word
175 for word, count in Counter(title_words).most_common(top_k)
176 if len(word) > 3 and word not in TITLE_STOP_WORDS
177 ]
178 return common
181def cosine_similarity(
182 vec1: list[float] | Iterable[float], vec2: list[float] | Iterable[float]
183) -> float:
184 """Compute cosine similarity with numpy, guarding zero vectors.
186 Mirrors legacy behavior in CDI.
187 """
188 try:
189 a = np.array(list(vec1), dtype=float)
190 b = np.array(list(vec2), dtype=float)
191 mag_a = np.linalg.norm(a)
192 mag_b = np.linalg.norm(b)
193 if mag_a == 0.0 or mag_b == 0.0:
194 return 0.0
195 return float(np.dot(a, b) / (mag_a * mag_b))
196 except Exception:
197 return 0.0
200def hierarchical_distance_from_breadcrumbs(
201 breadcrumb1: str | None, breadcrumb2: str | None
202) -> float:
203 """Compute hierarchical relatedness score using breadcrumb overlap.
205 Returns 0.7 for sibling docs (same parent, different leaf), otherwise Jaccard
206 overlap of breadcrumb sets. Returns 0.0 if unavailable.
207 """
208 if not breadcrumb1 or not breadcrumb2:
209 return 0.0
210 parts1 = breadcrumb1.split(" > ")
211 parts2 = breadcrumb2.split(" > ")
212 if (
213 len(parts1) == len(parts2)
214 and len(parts1) > 1
215 and parts1[:-1] == parts2[:-1]
216 and parts1[-1] != parts2[-1]
217 ):
218 return 0.7
219 set1 = set(parts1)
220 set2 = set(parts2)
221 if not set1 or not set2:
222 return 0.0
223 inter = len(set1 & set2)
224 union = len(set1 | set2)
225 return inter / union if union > 0 else 0.0
228def split_breadcrumb(breadcrumb: str | None) -> list[str]:
229 """Split a breadcrumb string into parts using ' > ' delimiter, safely."""
230 if not breadcrumb:
231 return []
232 return [part for part in breadcrumb.split(" > ") if part]
235def cluster_key_from_breadcrumb(breadcrumb: str | None, levels: int = 2) -> str:
236 """Build a cluster key from the first N breadcrumb levels."""
237 parts = split_breadcrumb(breadcrumb)
238 if not parts:
239 return "root"
240 if levels <= 0:
241 return " > ".join(parts)
242 return " > ".join(parts[:levels])
245def format_hierarchy_cluster_name(context_key: str) -> str:
246 """Format a human-readable cluster name for hierarchy-based clusters."""
247 if context_key == "root":
248 return "Root Documentation"
249 parts = split_breadcrumb(context_key)
250 if len(parts) == 1:
251 return f"{parts[0]} Section"
252 if len(parts) >= 2:
253 return f"{parts[0]} > {parts[1]}"
254 return f"{context_key} Hierarchy"
257def clean_topic_name(topic: str) -> str:
258 """Normalize topic names for display: trim and Title-Case if lowercase."""
259 if not topic:
260 return ""
261 topic = topic.strip()
262 if topic.islower():
263 return topic.title()
264 return topic
267def categorize_cluster_size(size: int) -> str:
268 """Map cluster size to human-readable bucket.
270 Follows legacy thresholds: individual<=1, small<=3, medium<=8, large<=15, else very large.
271 """
272 if size <= 1:
273 return "individual"
274 if size <= 3:
275 return "small"
276 if size <= 8:
277 return "medium"
278 if size <= 15:
279 return "large"
280 return "very large"
283def normalize_acronym(token: str) -> str:
284 """Normalize common acronyms for display consistently across CDI modules.
286 Falls back to Title Case when not in mapping and tolerates None/empty input.
287 """
288 mapping = {
289 "oauth": "OAuth",
290 "jwt": "JWT",
291 "api": "API",
292 "ui": "UI",
293 "ux": "UX",
294 "sql": "SQL",
295 }
296 t = (token or "").strip()
297 lower = t.lower()
298 return mapping.get(lower, t.title())