Coverage for src/qdrant_loader_mcp_server/search/nlp/semantic_expander.py: 85%
168 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""Semantic query expansion using spaCy word vectors and entity matching."""
3import logging
4from dataclasses import dataclass
5from typing import Dict, List, Set, Tuple, Any, Optional
7from ...utils.logging import LoggingConfig
8from .spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis
10logger = LoggingConfig.get_logger(__name__)
13@dataclass
14class ExpansionResult:
15 """Container for query expansion results."""
17 original_query: str
18 expanded_query: str
19 expansion_terms: List[str]
20 semantic_terms: List[str]
21 entity_terms: List[str]
22 concept_terms: List[str]
23 expansion_weight: float # Weight given to expansion terms (0-1)
24 processing_time_ms: float
27class EntityQueryExpander:
28 """Semantic query expansion using spaCy entities and word vectors."""
30 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
31 """Initialize the entity query expander.
33 Args:
34 spacy_analyzer: SpaCy analyzer instance for semantic analysis
35 """
36 self.spacy_analyzer = spacy_analyzer
37 self.logger = LoggingConfig.get_logger(__name__)
39 # Expansion configuration
40 self.max_semantic_expansions = 3 # Max semantic terms to add
41 self.max_entity_expansions = 2 # Max entity-related terms to add
42 self.max_concept_expansions = 2 # Max concept terms to add
43 self.similarity_threshold = 0.6 # Minimum similarity for expansion
45 # Domain-specific expansion dictionaries
46 self.domain_expansions = {
47 # Technical terms
48 "api": ["interface", "endpoint", "service", "restful"],
49 "database": ["db", "storage", "persistence", "data"],
50 "authentication": ["auth", "login", "credentials", "security"],
51 "authorization": ["access", "permissions", "roles", "security"],
52 "architecture": ["design", "structure", "pattern", "system"],
53 "performance": ["optimization", "speed", "efficiency", "tuning"],
55 # Business terms
56 "requirements": ["specs", "specifications", "needs", "criteria"],
57 "documentation": ["docs", "guide", "manual", "reference"],
58 "proposal": ["offer", "bid", "submission", "plan"],
59 "evaluation": ["assessment", "review", "analysis", "comparison"],
60 "vendor": ["supplier", "provider", "contractor", "partner"],
62 # Content types
63 "code": ["implementation", "function", "method", "script"],
64 "table": ["data", "spreadsheet", "grid", "matrix"],
65 "image": ["picture", "diagram", "screenshot", "visual"],
66 "document": ["file", "paper", "report", "text"],
67 }
69 # Cache for expansion results
70 self._expansion_cache: Dict[str, ExpansionResult] = {}
72 def expand_query(
73 self,
74 original_query: str,
75 search_context: Optional[Dict[str, Any]] = None
76 ) -> ExpansionResult:
77 """Expand query using spaCy entities and document metadata.
79 Args:
80 original_query: The original search query
81 search_context: Optional context containing document entities and metadata
83 Returns:
84 ExpansionResult containing the expanded query and metadata
85 """
86 import time
87 start_time = time.time()
89 # Check cache first
90 cache_key = f"{original_query}:{str(search_context)}"
91 if cache_key in self._expansion_cache:
92 cached = self._expansion_cache[cache_key]
93 logger.debug(f"Using cached expansion for: {original_query[:50]}...")
94 return cached
96 try:
97 # Analyze the original query
98 query_analysis = self.spacy_analyzer.analyze_query_semantic(original_query)
100 # Collect expansion terms from different sources
101 expansion_terms = []
102 semantic_terms = []
103 entity_terms = []
104 concept_terms = []
106 # 1. Semantic expansion using spaCy similarity
107 semantic_terms = self._expand_with_semantic_similarity(
108 query_analysis, search_context
109 )
110 expansion_terms.extend(semantic_terms)
112 # 2. Entity-based expansion
113 entity_terms = self._expand_with_entities(
114 query_analysis, search_context
115 )
116 expansion_terms.extend(entity_terms)
118 # 3. Concept-based expansion using noun chunks
119 concept_terms = self._expand_with_concepts(
120 query_analysis, search_context
121 )
122 expansion_terms.extend(concept_terms)
124 # 4. Domain-specific expansion
125 domain_terms = self._expand_with_domain_knowledge(query_analysis)
126 expansion_terms.extend(domain_terms)
128 # Remove duplicates and filter
129 expansion_terms = self._filter_expansion_terms(
130 expansion_terms, query_analysis.semantic_keywords
131 )
133 # Build expanded query with appropriate weighting
134 expanded_query, expansion_weight = self._build_expanded_query(
135 original_query, expansion_terms, query_analysis
136 )
138 # Create result
139 processing_time_ms = (time.time() - start_time) * 1000
141 result = ExpansionResult(
142 original_query=original_query,
143 expanded_query=expanded_query,
144 expansion_terms=expansion_terms,
145 semantic_terms=semantic_terms,
146 entity_terms=entity_terms,
147 concept_terms=concept_terms,
148 expansion_weight=expansion_weight,
149 processing_time_ms=processing_time_ms
150 )
152 # Cache the result
153 self._expansion_cache[cache_key] = result
155 logger.debug(
156 "🔥 Query expansion completed",
157 original_query=original_query[:50],
158 expansion_terms_count=len(expansion_terms),
159 semantic_terms_count=len(semantic_terms),
160 entity_terms_count=len(entity_terms),
161 concept_terms_count=len(concept_terms),
162 processing_time_ms=processing_time_ms,
163 )
165 return result
167 except Exception as e:
168 logger.warning(f"Query expansion failed: {e}")
169 # Return minimal expansion
170 processing_time_ms = (time.time() - start_time) * 1000
171 return ExpansionResult(
172 original_query=original_query,
173 expanded_query=original_query,
174 expansion_terms=[],
175 semantic_terms=[],
176 entity_terms=[],
177 concept_terms=[],
178 expansion_weight=0.0,
179 processing_time_ms=processing_time_ms
180 )
182 def _expand_with_semantic_similarity(
183 self,
184 query_analysis: QueryAnalysis,
185 search_context: Optional[Dict[str, Any]]
186 ) -> List[str]:
187 """Expand using semantic similarity with spaCy word vectors."""
188 semantic_terms = []
190 if not search_context or "document_entities" not in search_context:
191 return semantic_terms
193 try:
194 document_entities = search_context["document_entities"]
196 # Find semantically similar entities
197 for entity in document_entities[:20]: # Limit to avoid performance issues
198 entity_text = entity if isinstance(entity, str) else entity.get("text", str(entity))
200 # Calculate similarity with query
201 similarity = self.spacy_analyzer.semantic_similarity_matching(
202 query_analysis, entity_text
203 )
205 # Add if above threshold
206 if similarity >= self.similarity_threshold:
207 # Extract meaningful words from entity
208 entity_words = self._extract_entity_words(entity_text)
209 semantic_terms.extend(entity_words)
211 if len(semantic_terms) >= self.max_semantic_expansions:
212 break
214 except Exception as e:
215 logger.warning(f"Semantic similarity expansion failed: {e}")
217 return semantic_terms[:self.max_semantic_expansions]
219 def _expand_with_entities(
220 self,
221 query_analysis: QueryAnalysis,
222 search_context: Optional[Dict[str, Any]]
223 ) -> List[str]:
224 """Expand using related entities from the query and context."""
225 entity_terms = []
227 try:
228 # Use entities from the query itself
229 for entity_text, entity_type in query_analysis.entities:
230 # Add synonyms based on entity type
231 synonyms = self._get_entity_synonyms(entity_text, entity_type)
232 entity_terms.extend(synonyms)
234 # Use entities from search context if available
235 if search_context and "related_entities" in search_context:
236 related_entities = search_context["related_entities"]
237 for entity in related_entities[:5]: # Limit for performance
238 entity_text = entity if isinstance(entity, str) else entity.get("text", str(entity))
239 entity_words = self._extract_entity_words(entity_text)
240 entity_terms.extend(entity_words)
242 except Exception as e:
243 logger.warning(f"Entity-based expansion failed: {e}")
245 return entity_terms[:self.max_entity_expansions]
247 def _expand_with_concepts(
248 self,
249 query_analysis: QueryAnalysis,
250 search_context: Optional[Dict[str, Any]]
251 ) -> List[str]:
252 """Expand using main concepts and noun chunks."""
253 concept_terms = []
255 try:
256 # Use main concepts from query analysis
257 for concept in query_analysis.main_concepts:
258 # Extract individual words from concepts
259 concept_words = self._extract_concept_words(concept)
260 concept_terms.extend(concept_words)
262 # Add related concepts if available in context
263 if search_context and "related_concepts" in search_context:
264 related_concepts = search_context["related_concepts"]
265 for concept in related_concepts[:3]:
266 concept_words = self._extract_concept_words(str(concept))
267 concept_terms.extend(concept_words)
269 except Exception as e:
270 logger.warning(f"Concept-based expansion failed: {e}")
272 return concept_terms[:self.max_concept_expansions]
274 def _expand_with_domain_knowledge(self, query_analysis: QueryAnalysis) -> List[str]:
275 """Expand using domain-specific knowledge."""
276 domain_terms = []
278 try:
279 # Check if any query keywords match our domain expansions
280 for keyword in query_analysis.semantic_keywords:
281 if keyword in self.domain_expansions:
282 domain_terms.extend(self.domain_expansions[keyword])
284 # Check main concepts for domain matches
285 for concept in query_analysis.main_concepts:
286 concept_lower = concept.lower().strip()
287 if concept_lower in self.domain_expansions:
288 domain_terms.extend(self.domain_expansions[concept_lower])
290 except Exception as e:
291 logger.warning(f"Domain knowledge expansion failed: {e}")
293 return domain_terms[:3] # Limit domain expansions
295 def _extract_entity_words(self, entity_text: str) -> List[str]:
296 """Extract meaningful words from entity text."""
297 # Simple extraction - split and filter
298 words = entity_text.lower().split()
299 return [word for word in words if len(word) > 2 and word.isalpha()]
301 def _extract_concept_words(self, concept_text: str) -> List[str]:
302 """Extract meaningful words from concept text."""
303 # Use spaCy to process and extract meaningful terms
304 try:
305 doc = self.spacy_analyzer.nlp(concept_text)
306 return [
307 token.lemma_.lower()
308 for token in doc
309 if (token.is_alpha and
310 not token.is_stop and
311 len(token.text) > 2 and
312 token.pos_ in {"NOUN", "VERB", "ADJ"})
313 ]
314 except:
315 # Fallback to simple splitting
316 words = concept_text.lower().split()
317 return [word for word in words if len(word) > 2 and word.isalpha()]
319 def _get_entity_synonyms(self, entity_text: str, entity_type: str) -> List[str]:
320 """Get synonyms for entities based on their type."""
321 synonyms = []
323 # Type-specific synonym mapping
324 type_synonyms = {
325 "ORG": lambda text: [text.lower(), f"{text.lower()} company", f"{text.lower()} organization"],
326 "PRODUCT": lambda text: [text.lower(), f"{text.lower()} software", f"{text.lower()} tool"],
327 "PERSON": lambda text: [text.lower(), f"{text.lower()} developer", f"{text.lower()} author"],
328 "GPE": lambda text: [text.lower(), f"{text.lower()} location"],
329 }
331 if entity_type in type_synonyms:
332 try:
333 synonyms = type_synonyms[entity_type](entity_text)
334 except:
335 synonyms = [entity_text.lower()]
336 else:
337 synonyms = [entity_text.lower()]
339 return synonyms[:2] # Limit synonyms
341 def _filter_expansion_terms(
342 self,
343 expansion_terms: List[str],
344 original_keywords: List[str]
345 ) -> List[str]:
346 """Filter and deduplicate expansion terms."""
347 # Remove duplicates and original keywords
348 original_set = set(original_keywords)
349 filtered_terms = []
350 seen = set()
352 for term in expansion_terms:
353 term_clean = term.lower().strip()
354 if (term_clean not in original_set and
355 term_clean not in seen and
356 len(term_clean) > 2 and
357 term_clean.isalpha()):
358 filtered_terms.append(term_clean)
359 seen.add(term_clean)
361 return filtered_terms[:5] # Limit total expansions
363 def _build_expanded_query(
364 self,
365 original_query: str,
366 expansion_terms: List[str],
367 query_analysis: QueryAnalysis
368 ) -> Tuple[str, float]:
369 """Build the expanded query with appropriate weighting."""
370 if not expansion_terms:
371 return original_query, 0.0
373 # Determine expansion weight based on query characteristics
374 if query_analysis.complexity_score > 0.5:
375 # Complex queries get less expansion to avoid noise
376 expansion_weight = 0.2
377 max_terms = 2
378 elif query_analysis.is_technical:
379 # Technical queries benefit from more expansion
380 expansion_weight = 0.4
381 max_terms = 3
382 else:
383 # General queries get moderate expansion
384 expansion_weight = 0.3
385 max_terms = 3
387 # Select best expansion terms
388 selected_terms = expansion_terms[:max_terms]
390 # Build expanded query
391 expansion_part = " ".join(selected_terms)
392 expanded_query = f"{original_query} {expansion_part}"
394 return expanded_query, expansion_weight
396 def clear_cache(self):
397 """Clear expansion cache."""
398 self._expansion_cache.clear()
399 logger.debug("Cleared query expansion cache")
401 def get_cache_stats(self) -> Dict[str, int]:
402 """Get cache statistics."""
403 return {
404 "expansion_cache_size": len(self._expansion_cache),
405 }