Coverage for src/qdrant_loader_mcp_server/search/nlp/semantic_expander.py: 85%
167 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""Semantic query expansion using spaCy word vectors and entity matching."""
3from dataclasses import dataclass
4from typing import Any
6from ...utils.logging import LoggingConfig
7from .spacy_analyzer import QueryAnalysis, SpaCyQueryAnalyzer
9logger = LoggingConfig.get_logger(__name__)
12@dataclass
13class ExpansionResult:
14 """Container for query expansion results."""
16 original_query: str
17 expanded_query: str
18 expansion_terms: list[str]
19 semantic_terms: list[str]
20 entity_terms: list[str]
21 concept_terms: list[str]
22 expansion_weight: float # Weight given to expansion terms (0-1)
23 processing_time_ms: float
26class EntityQueryExpander:
27 """Semantic query expansion using spaCy entities and word vectors."""
29 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
30 """Initialize the entity query expander.
32 Args:
33 spacy_analyzer: SpaCy analyzer instance for semantic analysis
34 """
35 self.spacy_analyzer = spacy_analyzer
36 self.logger = LoggingConfig.get_logger(__name__)
38 # Expansion configuration
39 self.max_semantic_expansions = 3 # Max semantic terms to add
40 self.max_entity_expansions = 2 # Max entity-related terms to add
41 self.max_concept_expansions = 2 # Max concept terms to add
42 self.similarity_threshold = 0.6 # Minimum similarity for expansion
44 # Domain-specific expansion dictionaries
45 self.domain_expansions = {
46 # Technical terms
47 "api": ["interface", "endpoint", "service", "restful"],
48 "database": ["db", "storage", "persistence", "data"],
49 "authentication": ["auth", "login", "credentials", "security"],
50 "authorization": ["access", "permissions", "roles", "security"],
51 "architecture": ["design", "structure", "pattern", "system"],
52 "performance": ["optimization", "speed", "efficiency", "tuning"],
53 # Business terms
54 "requirements": ["specs", "specifications", "needs", "criteria"],
55 "documentation": ["docs", "guide", "manual", "reference"],
56 "proposal": ["offer", "bid", "submission", "plan"],
57 "evaluation": ["assessment", "review", "analysis", "comparison"],
58 "vendor": ["supplier", "provider", "contractor", "partner"],
59 # Content types
60 "code": ["implementation", "function", "method", "script"],
61 "table": ["data", "spreadsheet", "grid", "matrix"],
62 "image": ["picture", "diagram", "screenshot", "visual"],
63 "document": ["file", "paper", "report", "text"],
64 }
66 # Cache for expansion results
67 self._expansion_cache: dict[str, ExpansionResult] = {}
69 def expand_query(
70 self, original_query: str, search_context: dict[str, Any] | None = None
71 ) -> ExpansionResult:
72 """Expand query using spaCy entities and document metadata.
74 Args:
75 original_query: The original search query
76 search_context: Optional context containing document entities and metadata
78 Returns:
79 ExpansionResult containing the expanded query and metadata
80 """
81 import time
83 start_time = time.time()
85 # Check cache first
86 cache_key = f"{original_query}:{str(search_context)}"
87 if cache_key in self._expansion_cache:
88 cached = self._expansion_cache[cache_key]
89 logger.debug(f"Using cached expansion for: {original_query[:50]}...")
90 return cached
92 try:
93 # Analyze the original query
94 query_analysis = self.spacy_analyzer.analyze_query_semantic(original_query)
96 # Collect expansion terms from different sources
97 expansion_terms = []
98 semantic_terms = []
99 entity_terms = []
100 concept_terms = []
102 # 1. Semantic expansion using spaCy similarity
103 semantic_terms = self._expand_with_semantic_similarity(
104 query_analysis, search_context
105 )
106 expansion_terms.extend(semantic_terms)
108 # 2. Entity-based expansion
109 entity_terms = self._expand_with_entities(query_analysis, search_context)
110 expansion_terms.extend(entity_terms)
112 # 3. Concept-based expansion using noun chunks
113 concept_terms = self._expand_with_concepts(query_analysis, search_context)
114 expansion_terms.extend(concept_terms)
116 # 4. Domain-specific expansion
117 domain_terms = self._expand_with_domain_knowledge(query_analysis)
118 expansion_terms.extend(domain_terms)
120 # Remove duplicates and filter
121 expansion_terms = self._filter_expansion_terms(
122 expansion_terms, query_analysis.semantic_keywords
123 )
125 # Build expanded query with appropriate weighting
126 expanded_query, expansion_weight = self._build_expanded_query(
127 original_query, expansion_terms, query_analysis
128 )
130 # Create result
131 processing_time_ms = (time.time() - start_time) * 1000
133 result = ExpansionResult(
134 original_query=original_query,
135 expanded_query=expanded_query,
136 expansion_terms=expansion_terms,
137 semantic_terms=semantic_terms,
138 entity_terms=entity_terms,
139 concept_terms=concept_terms,
140 expansion_weight=expansion_weight,
141 processing_time_ms=processing_time_ms,
142 )
144 # Cache the result
145 self._expansion_cache[cache_key] = result
147 logger.debug(
148 "🔥 Query expansion completed",
149 original_query=original_query[:50],
150 expansion_terms_count=len(expansion_terms),
151 semantic_terms_count=len(semantic_terms),
152 entity_terms_count=len(entity_terms),
153 concept_terms_count=len(concept_terms),
154 processing_time_ms=processing_time_ms,
155 )
157 return result
159 except Exception as e:
160 logger.warning(f"Query expansion failed: {e}")
161 # Return minimal expansion
162 processing_time_ms = (time.time() - start_time) * 1000
163 return ExpansionResult(
164 original_query=original_query,
165 expanded_query=original_query,
166 expansion_terms=[],
167 semantic_terms=[],
168 entity_terms=[],
169 concept_terms=[],
170 expansion_weight=0.0,
171 processing_time_ms=processing_time_ms,
172 )
174 def _expand_with_semantic_similarity(
175 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None
176 ) -> list[str]:
177 """Expand using semantic similarity with spaCy word vectors."""
178 semantic_terms = []
180 if not search_context or "document_entities" not in search_context:
181 return semantic_terms
183 try:
184 document_entities = search_context["document_entities"]
186 # Find semantically similar entities
187 for entity in document_entities[:20]: # Limit to avoid performance issues
188 entity_text = (
189 entity
190 if isinstance(entity, str)
191 else entity.get("text", str(entity))
192 )
194 # Calculate similarity with query
195 similarity = self.spacy_analyzer.semantic_similarity_matching(
196 query_analysis, entity_text
197 )
199 # Add if above threshold
200 if similarity >= self.similarity_threshold:
201 # Extract meaningful words from entity
202 entity_words = self._extract_entity_words(entity_text)
203 semantic_terms.extend(entity_words)
205 if len(semantic_terms) >= self.max_semantic_expansions:
206 break
208 except Exception as e:
209 logger.warning(f"Semantic similarity expansion failed: {e}")
211 return semantic_terms[: self.max_semantic_expansions]
213 def _expand_with_entities(
214 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None
215 ) -> list[str]:
216 """Expand using related entities from the query and context."""
217 entity_terms = []
219 try:
220 # Use entities from the query itself
221 for entity_text, entity_type in query_analysis.entities:
222 # Add synonyms based on entity type
223 synonyms = self._get_entity_synonyms(entity_text, entity_type)
224 entity_terms.extend(synonyms)
226 # Use entities from search context if available
227 if search_context and "related_entities" in search_context:
228 related_entities = search_context["related_entities"]
229 for entity in related_entities[:5]: # Limit for performance
230 entity_text = (
231 entity
232 if isinstance(entity, str)
233 else entity.get("text", str(entity))
234 )
235 entity_words = self._extract_entity_words(entity_text)
236 entity_terms.extend(entity_words)
238 except Exception as e:
239 logger.warning(f"Entity-based expansion failed: {e}")
241 return entity_terms[: self.max_entity_expansions]
243 def _expand_with_concepts(
244 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None
245 ) -> list[str]:
246 """Expand using main concepts and noun chunks."""
247 concept_terms = []
249 try:
250 # Use main concepts from query analysis
251 for concept in query_analysis.main_concepts:
252 # Extract individual words from concepts
253 concept_words = self._extract_concept_words(concept)
254 concept_terms.extend(concept_words)
256 # Add related concepts if available in context
257 if search_context and "related_concepts" in search_context:
258 related_concepts = search_context["related_concepts"]
259 for concept in related_concepts[:3]:
260 concept_words = self._extract_concept_words(str(concept))
261 concept_terms.extend(concept_words)
263 except Exception as e:
264 logger.warning(f"Concept-based expansion failed: {e}")
266 return concept_terms[: self.max_concept_expansions]
268 def _expand_with_domain_knowledge(self, query_analysis: QueryAnalysis) -> list[str]:
269 """Expand using domain-specific knowledge."""
270 domain_terms = []
272 try:
273 # Check if any query keywords match our domain expansions
274 for keyword in query_analysis.semantic_keywords:
275 if keyword in self.domain_expansions:
276 domain_terms.extend(self.domain_expansions[keyword])
278 # Check main concepts for domain matches
279 for concept in query_analysis.main_concepts:
280 concept_lower = concept.lower().strip()
281 if concept_lower in self.domain_expansions:
282 domain_terms.extend(self.domain_expansions[concept_lower])
284 except Exception as e:
285 logger.warning(f"Domain knowledge expansion failed: {e}")
287 return domain_terms[:3] # Limit domain expansions
289 def _extract_entity_words(self, entity_text: str) -> list[str]:
290 """Extract meaningful words from entity text."""
291 # Simple extraction - split and filter
292 words = entity_text.lower().split()
293 return [word for word in words if len(word) > 2 and word.isalpha()]
295 def _extract_concept_words(self, concept_text: str) -> list[str]:
296 """Extract meaningful words from concept text."""
297 # Use spaCy to process and extract meaningful terms
298 try:
299 doc = self.spacy_analyzer.nlp(concept_text)
300 return [
301 token.lemma_.lower()
302 for token in doc
303 if (
304 token.is_alpha
305 and not token.is_stop
306 and len(token.text) > 2
307 and token.pos_ in {"NOUN", "VERB", "ADJ"}
308 )
309 ]
310 except Exception:
311 # Fallback to simple splitting
312 words = concept_text.lower().split()
313 return [word for word in words if len(word) > 2 and word.isalpha()]
315 def _get_entity_synonyms(self, entity_text: str, entity_type: str) -> list[str]:
316 """Get synonyms for entities based on their type."""
317 synonyms = []
319 # Type-specific synonym mapping
320 type_synonyms = {
321 "ORG": lambda text: [
322 text.lower(),
323 f"{text.lower()} company",
324 f"{text.lower()} organization",
325 ],
326 "PRODUCT": lambda text: [
327 text.lower(),
328 f"{text.lower()} software",
329 f"{text.lower()} tool",
330 ],
331 "PERSON": lambda text: [
332 text.lower(),
333 f"{text.lower()} developer",
334 f"{text.lower()} author",
335 ],
336 "GPE": lambda text: [text.lower(), f"{text.lower()} location"],
337 }
339 if entity_type in type_synonyms:
340 try:
341 synonyms = type_synonyms[entity_type](entity_text)
342 except Exception:
343 synonyms = [entity_text.lower()]
344 else:
345 synonyms = [entity_text.lower()]
347 return synonyms[:2] # Limit synonyms
349 def _filter_expansion_terms(
350 self, expansion_terms: list[str], original_keywords: list[str]
351 ) -> list[str]:
352 """Filter and deduplicate expansion terms."""
353 # Remove duplicates and original keywords
354 original_set = set(original_keywords)
355 filtered_terms = []
356 seen = set()
358 for term in expansion_terms:
359 term_clean = term.lower().strip()
360 if (
361 term_clean not in original_set
362 and term_clean not in seen
363 and len(term_clean) > 2
364 and term_clean.isalpha()
365 ):
366 filtered_terms.append(term_clean)
367 seen.add(term_clean)
369 return filtered_terms[:5] # Limit total expansions
371 def _build_expanded_query(
372 self,
373 original_query: str,
374 expansion_terms: list[str],
375 query_analysis: QueryAnalysis,
376 ) -> tuple[str, float]:
377 """Build the expanded query with appropriate weighting."""
378 if not expansion_terms:
379 return original_query, 0.0
381 # Determine expansion weight based on query characteristics
382 if query_analysis.complexity_score > 0.5:
383 # Complex queries get less expansion to avoid noise
384 expansion_weight = 0.2
385 max_terms = 2
386 elif query_analysis.is_technical:
387 # Technical queries benefit from more expansion
388 expansion_weight = 0.4
389 max_terms = 3
390 else:
391 # General queries get moderate expansion
392 expansion_weight = 0.3
393 max_terms = 3
395 # Select best expansion terms
396 selected_terms = expansion_terms[:max_terms]
398 # Build expanded query
399 expansion_part = " ".join(selected_terms)
400 expanded_query = f"{original_query} {expansion_part}"
402 return expanded_query, expansion_weight
404 def clear_cache(self):
405 """Clear expansion cache."""
406 self._expansion_cache.clear()
407 logger.debug("Cleared query expansion cache")
409 def get_cache_stats(self) -> dict[str, int]:
410 """Get cache statistics."""
411 return {
412 "expansion_cache_size": len(self._expansion_cache),
413 }