Coverage for src/qdrant_loader_mcp_server/search/nlp/spacy_analyzer.py: 93%

183 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""spaCy-powered query analysis for intelligent search.""" 

2 

3import logging 

4from dataclasses import dataclass 

5from typing import Any, Dict, List, Optional, Set, Tuple 

6 

7import spacy 

8from spacy.cli.download import download as spacy_download 

9from spacy.tokens import Doc 

10 

11from ...utils.logging import LoggingConfig 

12 

13logger = LoggingConfig.get_logger(__name__) 

14 

15 

16@dataclass 

17class QueryAnalysis: 

18 """Container for spaCy-based query analysis results.""" 

19 

20 # Core linguistic analysis 

21 entities: List[Tuple[str, str]] # (text, label) 

22 pos_patterns: List[str] # Part-of-speech tags 

23 semantic_keywords: List[str] # Lemmatized, filtered keywords 

24 intent_signals: Dict[str, Any] # Intent detection based on linguistic patterns 

25 main_concepts: List[str] # Noun chunks representing main concepts 

26 

27 # Semantic understanding 

28 query_vector: Any # spaCy Doc vector for similarity matching 

29 semantic_similarity_cache: Dict[str, float] # Cache for similarity scores 

30 

31 # Query characteristics 

32 is_question: bool 

33 is_technical: bool 

34 complexity_score: float # 0-1 score based on linguistic complexity 

35 

36 # Processing metadata 

37 processed_tokens: int 

38 processing_time_ms: float 

39 

40 

41class SpaCyQueryAnalyzer: 

42 """Enhanced query analysis using spaCy NLP with en_core_web_md model.""" 

43 

44 def __init__(self, spacy_model: str = "en_core_web_md"): 

45 """Initialize the spaCy query analyzer. 

46  

47 Args: 

48 spacy_model: spaCy model to use (default: en_core_web_md with 20k word vectors) 

49 """ 

50 self.spacy_model = spacy_model 

51 self.nlp = self._load_spacy_model() 

52 self.logger = LoggingConfig.get_logger(__name__) 

53 

54 # Intent pattern definitions using POS tags and linguistic features 

55 self.intent_patterns = { 

56 "technical_lookup": { 

57 "entities": {"ORG", "PRODUCT", "PERSON", "GPE"}, 

58 "pos_sequences": [["NOUN", "NOUN"], ["ADJ", "NOUN"], ["VERB", "NOUN"]], 

59 "keywords": {"api", "database", "architecture", "implementation", "system", "code", "function"}, 

60 "question_words": set(), 

61 }, 

62 "business_context": { 

63 "entities": {"ORG", "MONEY", "PERCENT", "CARDINAL"}, 

64 "pos_sequences": [["NOUN", "NOUN"], ["ADJ", "NOUN", "NOUN"]], 

65 "keywords": {"requirements", "objectives", "strategy", "business", "scope", "goals"}, 

66 "question_words": {"what", "why", "how"}, 

67 }, 

68 "vendor_evaluation": { 

69 "entities": {"ORG", "MONEY", "PERSON"}, 

70 "pos_sequences": [["NOUN", "NOUN"], ["VERB", "NOUN"], ["ADJ", "NOUN"]], 

71 "keywords": {"proposal", "criteria", "cost", "vendor", "comparison", "evaluation"}, 

72 "question_words": {"which", "what", "how much"}, 

73 }, 

74 "procedural": { 

75 "entities": set(), 

76 "pos_sequences": [["VERB", "NOUN"], ["VERB", "DET", "NOUN"]], 

77 "keywords": {"how", "steps", "process", "procedure", "guide", "tutorial"}, 

78 "question_words": {"how", "when", "where"}, 

79 }, 

80 "informational": { 

81 "entities": set(), 

82 "pos_sequences": [["NOUN"], ["ADJ", "NOUN"]], 

83 "keywords": {"what", "definition", "meaning", "overview", "about"}, 

84 "question_words": {"what", "who", "when", "where"}, 

85 } 

86 } 

87 

88 # Cache for processed queries to improve performance 

89 self._analysis_cache: Dict[str, QueryAnalysis] = {} 

90 self._similarity_cache: Dict[Tuple[str, str], float] = {} 

91 

92 def _load_spacy_model(self) -> spacy.Language: 

93 """Load spaCy model with error handling and auto-download.""" 

94 try: 

95 nlp = spacy.load(self.spacy_model) 

96 # Verify model has vectors for semantic similarity 

97 if not nlp.meta.get("vectors", {}).get("vectors", 0): 

98 logger.warning( 

99 f"spaCy model {self.spacy_model} loaded but has no word vectors. " 

100 "Semantic similarity features will be limited." 

101 ) 

102 else: 

103 logger.info( 

104 f"spaCy model {self.spacy_model} loaded successfully with " 

105 f"{nlp.meta['vectors']['vectors']} word vectors" 

106 ) 

107 return nlp 

108 except OSError: 

109 logger.info(f"spaCy model {self.spacy_model} not found. Downloading...") 

110 try: 

111 spacy_download(self.spacy_model) 

112 nlp = spacy.load(self.spacy_model) 

113 logger.info(f"Successfully downloaded and loaded {self.spacy_model}") 

114 return nlp 

115 except Exception as e: 

116 logger.error(f"Failed to download spaCy model {self.spacy_model}: {e}") 

117 # Fallback to a basic model 

118 try: 

119 logger.warning("Falling back to en_core_web_sm model") 

120 spacy_download("en_core_web_sm") 

121 return spacy.load("en_core_web_sm") 

122 except Exception as fallback_error: 

123 logger.error(f"Failed to load fallback model: {fallback_error}") 

124 raise RuntimeError( 

125 f"Could not load any spaCy model. Please install {self.spacy_model} manually." 

126 ) 

127 

128 def analyze_query_semantic(self, query: str) -> QueryAnalysis: 

129 """Enhanced query analysis using spaCy NLP. 

130  

131 Args: 

132 query: The search query to analyze 

133  

134 Returns: 

135 QueryAnalysis containing comprehensive linguistic analysis 

136 """ 

137 import time 

138 start_time = time.time() 

139 

140 # Check cache first 

141 if query in self._analysis_cache: 

142 cached = self._analysis_cache[query] 

143 logger.debug(f"Using cached analysis for query: {query[:50]}...") 

144 return cached 

145 

146 # Process query with spaCy 

147 doc = self.nlp(query) 

148 

149 # Extract entities with confidence 

150 entities = [(ent.text, ent.label_) for ent in doc.ents] 

151 

152 # Get POS patterns 

153 pos_patterns = [token.pos_ for token in doc if not token.is_space] 

154 

155 # Extract semantic keywords (lemmatized, filtered) 

156 semantic_keywords = [ 

157 token.lemma_.lower() 

158 for token in doc 

159 if (token.is_alpha and 

160 not token.is_stop and 

161 not token.is_punct and 

162 len(token.text) > 2) 

163 ] 

164 

165 # Extract main concepts (noun chunks) 

166 main_concepts = [chunk.text.strip() for chunk in doc.noun_chunks if len(chunk.text.strip()) > 2] 

167 

168 # Detect intent using linguistic patterns 

169 intent_signals = self._detect_intent_patterns(doc, entities, pos_patterns, semantic_keywords) 

170 

171 # Query characteristics 

172 is_question = self._is_question(doc) 

173 is_technical = self._is_technical_query(doc, entities, semantic_keywords) 

174 complexity_score = self._calculate_complexity_score(doc) 

175 

176 # Processing metadata 

177 processing_time_ms = (time.time() - start_time) * 1000 

178 

179 # Create analysis result 

180 analysis = QueryAnalysis( 

181 entities=entities, 

182 pos_patterns=pos_patterns, 

183 semantic_keywords=semantic_keywords, 

184 intent_signals=intent_signals, 

185 main_concepts=main_concepts, 

186 query_vector=doc, # Store the spaCy Doc for similarity calculations 

187 semantic_similarity_cache={}, 

188 is_question=is_question, 

189 is_technical=is_technical, 

190 complexity_score=complexity_score, 

191 processed_tokens=len(doc), 

192 processing_time_ms=processing_time_ms 

193 ) 

194 

195 # Cache the result 

196 self._analysis_cache[query] = analysis 

197 

198 logger.debug( 

199 f"Analyzed query in {processing_time_ms:.2f}ms", 

200 query_length=len(query), 

201 entities_found=len(entities), 

202 keywords_extracted=len(semantic_keywords), 

203 intent=intent_signals.get("primary_intent", "unknown") 

204 ) 

205 

206 return analysis 

207 

208 def semantic_similarity_matching(self, query_analysis: QueryAnalysis, entity_text: str) -> float: 

209 """Calculate semantic similarity using spaCy word vectors. 

210  

211 Args: 

212 query_analysis: Analyzed query containing the query vector 

213 entity_text: Text to compare similarity with 

214  

215 Returns: 

216 Similarity score between 0.0 and 1.0 

217 """ 

218 # Check cache first 

219 cache_key = (str(query_analysis.query_vector), entity_text) 

220 if cache_key in self._similarity_cache: 

221 return self._similarity_cache[cache_key] 

222 

223 try: 

224 # Process entity text 

225 entity_doc = self.nlp(entity_text) 

226 

227 # Calculate similarity using spaCy vectors 

228 if query_analysis.query_vector.has_vector and entity_doc.has_vector: 

229 similarity = query_analysis.query_vector.similarity(entity_doc) 

230 else: 

231 # Fallback to token-based similarity if no vectors 

232 similarity = self._token_similarity_fallback( 

233 query_analysis.semantic_keywords, 

234 entity_text.lower() 

235 ) 

236 

237 # Cache the result 

238 self._similarity_cache[cache_key] = similarity 

239 

240 return similarity 

241 

242 except Exception as e: 

243 logger.warning(f"Error calculating similarity for '{entity_text}': {e}") 

244 return 0.0 

245 

246 def _detect_intent_patterns( 

247 self, 

248 doc: Doc, 

249 entities: List[Tuple[str, str]], 

250 pos_patterns: List[str], 

251 semantic_keywords: List[str] 

252 ) -> Dict[str, Any]: 

253 """Detect query intent using POS patterns and linguistic features.""" 

254 intent_scores = {} 

255 

256 # Convert entities and keywords to sets for faster lookup 

257 entity_labels = {label for _, label in entities} 

258 keyword_set = set(semantic_keywords) 

259 

260 # Score each intent pattern 

261 for intent_name, pattern in self.intent_patterns.items(): 

262 score = 0.0 

263 

264 # Entity type matching 

265 entity_match = len(entity_labels.intersection(pattern["entities"])) / max(len(pattern["entities"]), 1) 

266 score += entity_match * 0.3 

267 

268 # POS sequence matching 

269 pos_match = self._match_pos_sequences(pos_patterns, pattern["pos_sequences"]) 

270 score += pos_match * 0.3 

271 

272 # Keyword matching 

273 keyword_match = len(keyword_set.intersection(pattern["keywords"])) / max(len(pattern["keywords"]), 1) 

274 score += keyword_match * 0.2 

275 

276 # Question word matching 

277 question_match = self._match_question_words(doc, pattern["question_words"]) 

278 score += question_match * 0.2 

279 

280 intent_scores[intent_name] = score 

281 

282 # Find primary intent 

283 primary_intent = max(intent_scores, key=intent_scores.get) if intent_scores else "general" 

284 primary_score = intent_scores.get(primary_intent, 0.0) 

285 

286 # Only use intent if confidence is above threshold 

287 if primary_score < 0.3: 

288 primary_intent = "general" 

289 

290 return { 

291 "primary_intent": primary_intent, 

292 "confidence": primary_score, 

293 "all_scores": intent_scores, 

294 "linguistic_features": { 

295 "has_entities": len(entities) > 0, 

296 "has_question_words": any(token.text.lower() in {"what", "how", "why", "when", "who", "where"} for token in doc), 

297 "verb_count": sum(1 for pos in pos_patterns if pos in {"VERB", "AUX"}), 

298 "noun_count": sum(1 for pos in pos_patterns if pos == "NOUN"), 

299 } 

300 } 

301 

302 def _match_pos_sequences(self, pos_patterns: List[str], target_sequences: List[List[str]]) -> float: 

303 """Match POS tag sequences in the query.""" 

304 if not target_sequences or not pos_patterns: 

305 return 0.0 

306 

307 matches = 0 

308 total_sequences = len(target_sequences) 

309 

310 for sequence in target_sequences: 

311 if self._contains_sequence(pos_patterns, sequence): 

312 matches += 1 

313 

314 return matches / total_sequences 

315 

316 def _contains_sequence(self, pos_patterns: List[str], sequence: List[str]) -> bool: 

317 """Check if POS patterns contain a specific sequence.""" 

318 if len(sequence) > len(pos_patterns): 

319 return False 

320 

321 for i in range(len(pos_patterns) - len(sequence) + 1): 

322 if pos_patterns[i:i+len(sequence)] == sequence: 

323 return True 

324 

325 return False 

326 

327 def _match_question_words(self, doc: Doc, question_words: Set[str]) -> float: 

328 """Match question words in the query.""" 

329 if not question_words: 

330 return 0.0 

331 

332 found_words = {token.text.lower() for token in doc if token.text.lower() in question_words} 

333 return len(found_words) / len(question_words) 

334 

335 def _is_question(self, doc: Doc) -> bool: 

336 """Detect if the query is a question using linguistic features.""" 

337 # Check for question marks 

338 if "?" in doc.text: 

339 return True 

340 

341 # Check for question words at the beginning 

342 question_words = {"what", "how", "why", "when", "who", "where", "which", "whose", "whom"} 

343 first_token = doc[0] if doc else None 

344 if first_token and first_token.text.lower() in question_words: 

345 return True 

346 

347 # Check for auxiliary verbs at the beginning (e.g., "Can you", "Do we", "Is there") 

348 if len(doc) >= 2: 

349 first_two = [token.text.lower() for token in doc[:2]] 

350 aux_patterns = {("can", "you"), ("do", "we"), ("is", "there"), ("are", "there"), ("will", "you")} 

351 if tuple(first_two) in aux_patterns: 

352 return True 

353 

354 return False 

355 

356 def _is_technical_query(self, doc: Doc, entities: List[Tuple[str, str]], keywords: List[str]) -> bool: 

357 """Detect if the query is technical in nature.""" 

358 technical_indicators = { 

359 "api", "database", "system", "code", "function", "architecture", 

360 "implementation", "framework", "library", "server", "client", 

361 "protocol", "algorithm", "data", "query", "schema", "endpoint" 

362 } 

363 

364 # Check keywords 

365 keyword_set = set(keywords) 

366 if keyword_set.intersection(technical_indicators): 

367 return True 

368 

369 # Check for technical entity types 

370 technical_entities = {"ORG", "PRODUCT", "LANGUAGE"} # Often technical in this context 

371 entity_labels = {label for _, label in entities} 

372 if entity_labels.intersection(technical_entities): 

373 return True 

374 

375 return False 

376 

377 def _calculate_complexity_score(self, doc: Doc) -> float: 

378 """Calculate query complexity based on linguistic features.""" 

379 if not doc: 

380 return 0.0 

381 

382 # Factors that contribute to complexity 

383 factors = { 

384 "length": min(len(doc) / 20, 1.0), # Longer queries are more complex 

385 "entities": min(len(doc.ents) / 5, 1.0), # More entities = more complex 

386 "noun_chunks": min(len(list(doc.noun_chunks)) / 5, 1.0), # More concepts = more complex 

387 "question_words": min(sum(1 for token in doc if token.text.lower() in 

388 {"what", "how", "why", "when", "who", "where", "which"}) / 3, 1.0), 

389 "dependency_depth": min(self._max_dependency_depth(doc) / 5, 1.0), 

390 } 

391 

392 # Weighted average 

393 weights = {"length": 0.2, "entities": 0.3, "noun_chunks": 0.2, "question_words": 0.15, "dependency_depth": 0.15} 

394 

395 complexity = sum(factors[key] * weights[key] for key in factors) 

396 return min(complexity, 1.0) 

397 

398 def _max_dependency_depth(self, doc: Doc) -> int: 

399 """Calculate maximum dependency tree depth.""" 

400 max_depth = 0 

401 

402 def get_depth(token, current_depth=0): 

403 nonlocal max_depth 

404 max_depth = max(max_depth, current_depth) 

405 for child in token.children: 

406 get_depth(child, current_depth + 1) 

407 

408 for token in doc: 

409 if token.head == token: # Root token 

410 get_depth(token) 

411 

412 return max_depth 

413 

414 def _token_similarity_fallback(self, query_keywords: List[str], entity_text: str) -> float: 

415 """Fallback similarity calculation when word vectors are unavailable.""" 

416 if not query_keywords: 

417 return 0.0 

418 

419 entity_words = set(entity_text.lower().split()) 

420 query_word_set = set(query_keywords) 

421 

422 # Simple Jaccard similarity 

423 intersection = query_word_set.intersection(entity_words) 

424 union = query_word_set.union(entity_words) 

425 

426 return len(intersection) / len(union) if union else 0.0 

427 

428 def clear_cache(self): 

429 """Clear analysis and similarity caches.""" 

430 self._analysis_cache.clear() 

431 self._similarity_cache.clear() 

432 logger.debug("Cleared spaCy analyzer caches") 

433 

434 def get_cache_stats(self) -> Dict[str, int]: 

435 """Get cache statistics for monitoring.""" 

436 return { 

437 "analysis_cache_size": len(self._analysis_cache), 

438 "similarity_cache_size": len(self._similarity_cache), 

439 }