Coverage for src/qdrant_loader_mcp_server/search/nlp/spacy_analyzer.py: 93%
183 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""spaCy-powered query analysis for intelligent search."""
3import logging
4from dataclasses import dataclass
5from typing import Any, Dict, List, Optional, Set, Tuple
7import spacy
8from spacy.cli.download import download as spacy_download
9from spacy.tokens import Doc
11from ...utils.logging import LoggingConfig
13logger = LoggingConfig.get_logger(__name__)
16@dataclass
17class QueryAnalysis:
18 """Container for spaCy-based query analysis results."""
20 # Core linguistic analysis
21 entities: List[Tuple[str, str]] # (text, label)
22 pos_patterns: List[str] # Part-of-speech tags
23 semantic_keywords: List[str] # Lemmatized, filtered keywords
24 intent_signals: Dict[str, Any] # Intent detection based on linguistic patterns
25 main_concepts: List[str] # Noun chunks representing main concepts
27 # Semantic understanding
28 query_vector: Any # spaCy Doc vector for similarity matching
29 semantic_similarity_cache: Dict[str, float] # Cache for similarity scores
31 # Query characteristics
32 is_question: bool
33 is_technical: bool
34 complexity_score: float # 0-1 score based on linguistic complexity
36 # Processing metadata
37 processed_tokens: int
38 processing_time_ms: float
41class SpaCyQueryAnalyzer:
42 """Enhanced query analysis using spaCy NLP with en_core_web_md model."""
44 def __init__(self, spacy_model: str = "en_core_web_md"):
45 """Initialize the spaCy query analyzer.
47 Args:
48 spacy_model: spaCy model to use (default: en_core_web_md with 20k word vectors)
49 """
50 self.spacy_model = spacy_model
51 self.nlp = self._load_spacy_model()
52 self.logger = LoggingConfig.get_logger(__name__)
54 # Intent pattern definitions using POS tags and linguistic features
55 self.intent_patterns = {
56 "technical_lookup": {
57 "entities": {"ORG", "PRODUCT", "PERSON", "GPE"},
58 "pos_sequences": [["NOUN", "NOUN"], ["ADJ", "NOUN"], ["VERB", "NOUN"]],
59 "keywords": {"api", "database", "architecture", "implementation", "system", "code", "function"},
60 "question_words": set(),
61 },
62 "business_context": {
63 "entities": {"ORG", "MONEY", "PERCENT", "CARDINAL"},
64 "pos_sequences": [["NOUN", "NOUN"], ["ADJ", "NOUN", "NOUN"]],
65 "keywords": {"requirements", "objectives", "strategy", "business", "scope", "goals"},
66 "question_words": {"what", "why", "how"},
67 },
68 "vendor_evaluation": {
69 "entities": {"ORG", "MONEY", "PERSON"},
70 "pos_sequences": [["NOUN", "NOUN"], ["VERB", "NOUN"], ["ADJ", "NOUN"]],
71 "keywords": {"proposal", "criteria", "cost", "vendor", "comparison", "evaluation"},
72 "question_words": {"which", "what", "how much"},
73 },
74 "procedural": {
75 "entities": set(),
76 "pos_sequences": [["VERB", "NOUN"], ["VERB", "DET", "NOUN"]],
77 "keywords": {"how", "steps", "process", "procedure", "guide", "tutorial"},
78 "question_words": {"how", "when", "where"},
79 },
80 "informational": {
81 "entities": set(),
82 "pos_sequences": [["NOUN"], ["ADJ", "NOUN"]],
83 "keywords": {"what", "definition", "meaning", "overview", "about"},
84 "question_words": {"what", "who", "when", "where"},
85 }
86 }
88 # Cache for processed queries to improve performance
89 self._analysis_cache: Dict[str, QueryAnalysis] = {}
90 self._similarity_cache: Dict[Tuple[str, str], float] = {}
92 def _load_spacy_model(self) -> spacy.Language:
93 """Load spaCy model with error handling and auto-download."""
94 try:
95 nlp = spacy.load(self.spacy_model)
96 # Verify model has vectors for semantic similarity
97 if not nlp.meta.get("vectors", {}).get("vectors", 0):
98 logger.warning(
99 f"spaCy model {self.spacy_model} loaded but has no word vectors. "
100 "Semantic similarity features will be limited."
101 )
102 else:
103 logger.info(
104 f"spaCy model {self.spacy_model} loaded successfully with "
105 f"{nlp.meta['vectors']['vectors']} word vectors"
106 )
107 return nlp
108 except OSError:
109 logger.info(f"spaCy model {self.spacy_model} not found. Downloading...")
110 try:
111 spacy_download(self.spacy_model)
112 nlp = spacy.load(self.spacy_model)
113 logger.info(f"Successfully downloaded and loaded {self.spacy_model}")
114 return nlp
115 except Exception as e:
116 logger.error(f"Failed to download spaCy model {self.spacy_model}: {e}")
117 # Fallback to a basic model
118 try:
119 logger.warning("Falling back to en_core_web_sm model")
120 spacy_download("en_core_web_sm")
121 return spacy.load("en_core_web_sm")
122 except Exception as fallback_error:
123 logger.error(f"Failed to load fallback model: {fallback_error}")
124 raise RuntimeError(
125 f"Could not load any spaCy model. Please install {self.spacy_model} manually."
126 )
128 def analyze_query_semantic(self, query: str) -> QueryAnalysis:
129 """Enhanced query analysis using spaCy NLP.
131 Args:
132 query: The search query to analyze
134 Returns:
135 QueryAnalysis containing comprehensive linguistic analysis
136 """
137 import time
138 start_time = time.time()
140 # Check cache first
141 if query in self._analysis_cache:
142 cached = self._analysis_cache[query]
143 logger.debug(f"Using cached analysis for query: {query[:50]}...")
144 return cached
146 # Process query with spaCy
147 doc = self.nlp(query)
149 # Extract entities with confidence
150 entities = [(ent.text, ent.label_) for ent in doc.ents]
152 # Get POS patterns
153 pos_patterns = [token.pos_ for token in doc if not token.is_space]
155 # Extract semantic keywords (lemmatized, filtered)
156 semantic_keywords = [
157 token.lemma_.lower()
158 for token in doc
159 if (token.is_alpha and
160 not token.is_stop and
161 not token.is_punct and
162 len(token.text) > 2)
163 ]
165 # Extract main concepts (noun chunks)
166 main_concepts = [chunk.text.strip() for chunk in doc.noun_chunks if len(chunk.text.strip()) > 2]
168 # Detect intent using linguistic patterns
169 intent_signals = self._detect_intent_patterns(doc, entities, pos_patterns, semantic_keywords)
171 # Query characteristics
172 is_question = self._is_question(doc)
173 is_technical = self._is_technical_query(doc, entities, semantic_keywords)
174 complexity_score = self._calculate_complexity_score(doc)
176 # Processing metadata
177 processing_time_ms = (time.time() - start_time) * 1000
179 # Create analysis result
180 analysis = QueryAnalysis(
181 entities=entities,
182 pos_patterns=pos_patterns,
183 semantic_keywords=semantic_keywords,
184 intent_signals=intent_signals,
185 main_concepts=main_concepts,
186 query_vector=doc, # Store the spaCy Doc for similarity calculations
187 semantic_similarity_cache={},
188 is_question=is_question,
189 is_technical=is_technical,
190 complexity_score=complexity_score,
191 processed_tokens=len(doc),
192 processing_time_ms=processing_time_ms
193 )
195 # Cache the result
196 self._analysis_cache[query] = analysis
198 logger.debug(
199 f"Analyzed query in {processing_time_ms:.2f}ms",
200 query_length=len(query),
201 entities_found=len(entities),
202 keywords_extracted=len(semantic_keywords),
203 intent=intent_signals.get("primary_intent", "unknown")
204 )
206 return analysis
208 def semantic_similarity_matching(self, query_analysis: QueryAnalysis, entity_text: str) -> float:
209 """Calculate semantic similarity using spaCy word vectors.
211 Args:
212 query_analysis: Analyzed query containing the query vector
213 entity_text: Text to compare similarity with
215 Returns:
216 Similarity score between 0.0 and 1.0
217 """
218 # Check cache first
219 cache_key = (str(query_analysis.query_vector), entity_text)
220 if cache_key in self._similarity_cache:
221 return self._similarity_cache[cache_key]
223 try:
224 # Process entity text
225 entity_doc = self.nlp(entity_text)
227 # Calculate similarity using spaCy vectors
228 if query_analysis.query_vector.has_vector and entity_doc.has_vector:
229 similarity = query_analysis.query_vector.similarity(entity_doc)
230 else:
231 # Fallback to token-based similarity if no vectors
232 similarity = self._token_similarity_fallback(
233 query_analysis.semantic_keywords,
234 entity_text.lower()
235 )
237 # Cache the result
238 self._similarity_cache[cache_key] = similarity
240 return similarity
242 except Exception as e:
243 logger.warning(f"Error calculating similarity for '{entity_text}': {e}")
244 return 0.0
246 def _detect_intent_patterns(
247 self,
248 doc: Doc,
249 entities: List[Tuple[str, str]],
250 pos_patterns: List[str],
251 semantic_keywords: List[str]
252 ) -> Dict[str, Any]:
253 """Detect query intent using POS patterns and linguistic features."""
254 intent_scores = {}
256 # Convert entities and keywords to sets for faster lookup
257 entity_labels = {label for _, label in entities}
258 keyword_set = set(semantic_keywords)
260 # Score each intent pattern
261 for intent_name, pattern in self.intent_patterns.items():
262 score = 0.0
264 # Entity type matching
265 entity_match = len(entity_labels.intersection(pattern["entities"])) / max(len(pattern["entities"]), 1)
266 score += entity_match * 0.3
268 # POS sequence matching
269 pos_match = self._match_pos_sequences(pos_patterns, pattern["pos_sequences"])
270 score += pos_match * 0.3
272 # Keyword matching
273 keyword_match = len(keyword_set.intersection(pattern["keywords"])) / max(len(pattern["keywords"]), 1)
274 score += keyword_match * 0.2
276 # Question word matching
277 question_match = self._match_question_words(doc, pattern["question_words"])
278 score += question_match * 0.2
280 intent_scores[intent_name] = score
282 # Find primary intent
283 primary_intent = max(intent_scores, key=intent_scores.get) if intent_scores else "general"
284 primary_score = intent_scores.get(primary_intent, 0.0)
286 # Only use intent if confidence is above threshold
287 if primary_score < 0.3:
288 primary_intent = "general"
290 return {
291 "primary_intent": primary_intent,
292 "confidence": primary_score,
293 "all_scores": intent_scores,
294 "linguistic_features": {
295 "has_entities": len(entities) > 0,
296 "has_question_words": any(token.text.lower() in {"what", "how", "why", "when", "who", "where"} for token in doc),
297 "verb_count": sum(1 for pos in pos_patterns if pos in {"VERB", "AUX"}),
298 "noun_count": sum(1 for pos in pos_patterns if pos == "NOUN"),
299 }
300 }
302 def _match_pos_sequences(self, pos_patterns: List[str], target_sequences: List[List[str]]) -> float:
303 """Match POS tag sequences in the query."""
304 if not target_sequences or not pos_patterns:
305 return 0.0
307 matches = 0
308 total_sequences = len(target_sequences)
310 for sequence in target_sequences:
311 if self._contains_sequence(pos_patterns, sequence):
312 matches += 1
314 return matches / total_sequences
316 def _contains_sequence(self, pos_patterns: List[str], sequence: List[str]) -> bool:
317 """Check if POS patterns contain a specific sequence."""
318 if len(sequence) > len(pos_patterns):
319 return False
321 for i in range(len(pos_patterns) - len(sequence) + 1):
322 if pos_patterns[i:i+len(sequence)] == sequence:
323 return True
325 return False
327 def _match_question_words(self, doc: Doc, question_words: Set[str]) -> float:
328 """Match question words in the query."""
329 if not question_words:
330 return 0.0
332 found_words = {token.text.lower() for token in doc if token.text.lower() in question_words}
333 return len(found_words) / len(question_words)
335 def _is_question(self, doc: Doc) -> bool:
336 """Detect if the query is a question using linguistic features."""
337 # Check for question marks
338 if "?" in doc.text:
339 return True
341 # Check for question words at the beginning
342 question_words = {"what", "how", "why", "when", "who", "where", "which", "whose", "whom"}
343 first_token = doc[0] if doc else None
344 if first_token and first_token.text.lower() in question_words:
345 return True
347 # Check for auxiliary verbs at the beginning (e.g., "Can you", "Do we", "Is there")
348 if len(doc) >= 2:
349 first_two = [token.text.lower() for token in doc[:2]]
350 aux_patterns = {("can", "you"), ("do", "we"), ("is", "there"), ("are", "there"), ("will", "you")}
351 if tuple(first_two) in aux_patterns:
352 return True
354 return False
356 def _is_technical_query(self, doc: Doc, entities: List[Tuple[str, str]], keywords: List[str]) -> bool:
357 """Detect if the query is technical in nature."""
358 technical_indicators = {
359 "api", "database", "system", "code", "function", "architecture",
360 "implementation", "framework", "library", "server", "client",
361 "protocol", "algorithm", "data", "query", "schema", "endpoint"
362 }
364 # Check keywords
365 keyword_set = set(keywords)
366 if keyword_set.intersection(technical_indicators):
367 return True
369 # Check for technical entity types
370 technical_entities = {"ORG", "PRODUCT", "LANGUAGE"} # Often technical in this context
371 entity_labels = {label for _, label in entities}
372 if entity_labels.intersection(technical_entities):
373 return True
375 return False
377 def _calculate_complexity_score(self, doc: Doc) -> float:
378 """Calculate query complexity based on linguistic features."""
379 if not doc:
380 return 0.0
382 # Factors that contribute to complexity
383 factors = {
384 "length": min(len(doc) / 20, 1.0), # Longer queries are more complex
385 "entities": min(len(doc.ents) / 5, 1.0), # More entities = more complex
386 "noun_chunks": min(len(list(doc.noun_chunks)) / 5, 1.0), # More concepts = more complex
387 "question_words": min(sum(1 for token in doc if token.text.lower() in
388 {"what", "how", "why", "when", "who", "where", "which"}) / 3, 1.0),
389 "dependency_depth": min(self._max_dependency_depth(doc) / 5, 1.0),
390 }
392 # Weighted average
393 weights = {"length": 0.2, "entities": 0.3, "noun_chunks": 0.2, "question_words": 0.15, "dependency_depth": 0.15}
395 complexity = sum(factors[key] * weights[key] for key in factors)
396 return min(complexity, 1.0)
398 def _max_dependency_depth(self, doc: Doc) -> int:
399 """Calculate maximum dependency tree depth."""
400 max_depth = 0
402 def get_depth(token, current_depth=0):
403 nonlocal max_depth
404 max_depth = max(max_depth, current_depth)
405 for child in token.children:
406 get_depth(child, current_depth + 1)
408 for token in doc:
409 if token.head == token: # Root token
410 get_depth(token)
412 return max_depth
414 def _token_similarity_fallback(self, query_keywords: List[str], entity_text: str) -> float:
415 """Fallback similarity calculation when word vectors are unavailable."""
416 if not query_keywords:
417 return 0.0
419 entity_words = set(entity_text.lower().split())
420 query_word_set = set(query_keywords)
422 # Simple Jaccard similarity
423 intersection = query_word_set.intersection(entity_words)
424 union = query_word_set.union(entity_words)
426 return len(intersection) / len(union) if union else 0.0
428 def clear_cache(self):
429 """Clear analysis and similarity caches."""
430 self._analysis_cache.clear()
431 self._similarity_cache.clear()
432 logger.debug("Cleared spaCy analyzer caches")
434 def get_cache_stats(self) -> Dict[str, int]:
435 """Get cache statistics for monitoring."""
436 return {
437 "analysis_cache_size": len(self._analysis_cache),
438 "similarity_cache_size": len(self._similarity_cache),
439 }