Coverage for src/qdrant_loader_mcp_server/search/nlp/linguistic_preprocessor.py: 93%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1"""Linguistic query preprocessing for improved search accuracy.""" 

2 

3import re 

4import time 

5from dataclasses import dataclass 

6from typing import Any 

7 

8from ...utils.logging import LoggingConfig 

9from .spacy_analyzer import SpaCyQueryAnalyzer 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14@dataclass 

15class PreprocessingResult: 

16 """Container for query preprocessing results.""" 

17 

18 original_query: str 

19 preprocessed_query: str 

20 lemmatized_tokens: list[str] 

21 filtered_tokens: list[str] 

22 removed_stopwords: list[str] 

23 normalized_tokens: list[str] 

24 processing_steps: list[str] 

25 processing_time_ms: float 

26 

27 

28class LinguisticPreprocessor: 

29 """Linguistic query preprocessing using spaCy for lemmatization and filtering.""" 

30 

31 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer): 

32 """Initialize the linguistic preprocessor. 

33 

34 Args: 

35 spacy_analyzer: SpaCy analyzer instance for linguistic processing 

36 """ 

37 self.spacy_analyzer = spacy_analyzer 

38 self.logger = LoggingConfig.get_logger(__name__) 

39 

40 # Preprocessing configuration 

41 self.min_token_length = 2 

42 self.max_token_length = 50 

43 self.preserve_entities = True 

44 self.preserve_numbers = True 

45 

46 # Custom stop words (in addition to spaCy's) 

47 self.custom_stopwords = { 

48 "find", 

49 "show", 

50 "give", 

51 "tell", 

52 "search", 

53 "look", 

54 "get", 

55 "want", 

56 "need", 

57 "please", 

58 "help", 

59 "can", 

60 "could", 

61 "would", 

62 "should", 

63 "may", 

64 "might", 

65 "also", 

66 "just", 

67 "really", 

68 "very", 

69 "quite", 

70 "rather", 

71 "pretty", 

72 "much", 

73 } 

74 

75 # Technical terms to preserve (don't lemmatize) 

76 self.preserve_terms = { 

77 "api", 

78 "apis", 

79 "database", 

80 "databases", 

81 "server", 

82 "servers", 

83 "authentication", 

84 "authorization", 

85 "oauth", 

86 "jwt", 

87 "ssl", 

88 "tls", 

89 "crud", 

90 "rest", 

91 "restful", 

92 "graphql", 

93 "json", 

94 "xml", 

95 "yaml", 

96 "git", 

97 "github", 

98 "gitlab", 

99 "jira", 

100 "confluence", 

101 "jenkins", 

102 } 

103 

104 # Cache for preprocessing results 

105 self._preprocessing_cache: dict[str, PreprocessingResult] = {} 

106 

107 def preprocess_query( 

108 self, query: str, preserve_structure: bool = False 

109 ) -> PreprocessingResult: 

110 """Preprocess query with lemmatization, stop word removal, and normalization. 

111 

112 Args: 

113 query: The original query to preprocess 

114 preserve_structure: If True, preserve query structure for phrase queries 

115 

116 Returns: 

117 PreprocessingResult containing preprocessed query and metadata 

118 """ 

119 start_time = time.time() 

120 

121 # Check cache first 

122 cache_key = f"{query}:{preserve_structure}" 

123 if cache_key in self._preprocessing_cache: 

124 cached = self._preprocessing_cache[cache_key] 

125 logger.debug(f"Using cached preprocessing for: {query[:50]}...") 

126 return cached 

127 

128 processing_steps = [] 

129 

130 try: 

131 # Step 1: Initial cleaning 

132 cleaned_query = self._initial_cleaning(query) 

133 processing_steps.append("initial_cleaning") 

134 

135 # Step 2: Process with spaCy 

136 doc = self.spacy_analyzer.nlp(cleaned_query) 

137 processing_steps.append("spacy_processing") 

138 

139 # Step 3: Extract and process tokens 

140 tokens_info = self._extract_tokens(doc) 

141 processing_steps.append("token_extraction") 

142 

143 # Step 4: Lemmatization with preservation rules 

144 lemmatized_tokens = self._lemmatize_tokens(tokens_info) 

145 processing_steps.append("lemmatization") 

146 

147 # Step 5: Stop word filtering 

148 filtered_tokens, removed_stopwords = self._filter_stopwords( 

149 lemmatized_tokens, doc 

150 ) 

151 processing_steps.append("stopword_filtering") 

152 

153 # Step 6: Normalization 

154 normalized_tokens = self._normalize_tokens(filtered_tokens) 

155 processing_steps.append("normalization") 

156 

157 # Step 7: Build preprocessed query 

158 if preserve_structure: 

159 preprocessed_query = self._rebuild_structured_query( 

160 normalized_tokens, doc 

161 ) 

162 else: 

163 preprocessed_query = " ".join(normalized_tokens) 

164 processing_steps.append("query_reconstruction") 

165 

166 # Create result 

167 processing_time_ms = (time.time() - start_time) * 1000 

168 

169 result = PreprocessingResult( 

170 original_query=query, 

171 preprocessed_query=preprocessed_query, 

172 lemmatized_tokens=lemmatized_tokens, 

173 filtered_tokens=filtered_tokens, 

174 removed_stopwords=removed_stopwords, 

175 normalized_tokens=normalized_tokens, 

176 processing_steps=processing_steps, 

177 processing_time_ms=processing_time_ms, 

178 ) 

179 

180 # Cache the result 

181 self._preprocessing_cache[cache_key] = result 

182 

183 logger.debug( 

184 "🔥 Query preprocessing completed", 

185 original_query=query[:50], 

186 preprocessed_query=preprocessed_query[:50], 

187 tokens_removed=len(removed_stopwords), 

188 processing_time_ms=processing_time_ms, 

189 ) 

190 

191 return result 

192 

193 except Exception as e: 

194 logger.warning(f"Query preprocessing failed: {e}") 

195 # Return minimal preprocessing 

196 processing_time_ms = (time.time() - start_time) * 1000 

197 return PreprocessingResult( 

198 original_query=query, 

199 preprocessed_query=query, 

200 lemmatized_tokens=[], 

201 filtered_tokens=[], 

202 removed_stopwords=[], 

203 normalized_tokens=[], 

204 processing_steps=["error"], 

205 processing_time_ms=processing_time_ms, 

206 ) 

207 

208 def _initial_cleaning(self, query: str) -> str: 

209 """Perform initial query cleaning.""" 

210 # Remove extra whitespace 

211 cleaned = re.sub(r"\s+", " ", query.strip()) 

212 

213 # Normalize punctuation 

214 cleaned = re.sub(r"[^\w\s\?\!\-\.]", " ", cleaned) 

215 

216 # Handle contractions 

217 contractions = { 

218 "don't": "do not", 

219 "won't": "will not", 

220 "can't": "cannot", 

221 "n't": " not", 

222 "'re": " are", 

223 "'ve": " have", 

224 "'ll": " will", 

225 "'d": " would", 

226 "'m": " am", 

227 } 

228 

229 for contraction, expansion in contractions.items(): 

230 cleaned = cleaned.replace(contraction, expansion) 

231 

232 return cleaned 

233 

234 def _extract_tokens(self, doc) -> list[dict[str, Any]]: 

235 """Extract token information from spaCy doc.""" 

236 tokens_info = [] 

237 

238 for token in doc: 

239 if not token.is_space: # Skip whitespace tokens 

240 token_info = { 

241 "text": token.text, 

242 "lemma": token.lemma_, 

243 "pos": token.pos_, 

244 "tag": token.tag_, 

245 "is_alpha": token.is_alpha, 

246 "is_stop": token.is_stop, 

247 "is_punct": token.is_punct, 

248 "is_digit": token.like_num, 

249 "ent_type": token.ent_type_, 

250 "ent_iob": token.ent_iob_, 

251 } 

252 tokens_info.append(token_info) 

253 

254 return tokens_info 

255 

256 def _lemmatize_tokens(self, tokens_info: list[dict[str, Any]]) -> list[str]: 

257 """Lemmatize tokens with preservation rules.""" 

258 lemmatized = [] 

259 

260 for token_info in tokens_info: 

261 text = token_info["text"].lower() 

262 lemma = token_info["lemma"].lower() 

263 

264 # Preserve certain technical terms 

265 if text in self.preserve_terms: 

266 lemmatized.append(text) 

267 # Preserve entities 

268 elif self.preserve_entities and token_info["ent_type"]: 

269 lemmatized.append(text) 

270 # Preserve numbers if configured 

271 elif self.preserve_numbers and token_info["is_digit"]: 

272 lemmatized.append(text) 

273 # Skip punctuation 

274 elif token_info["is_punct"]: 

275 continue 

276 # Use lemma for other words 

277 elif token_info["is_alpha"] and len(lemma) >= self.min_token_length: 

278 lemmatized.append(lemma) 

279 elif len(text) >= self.min_token_length: 

280 lemmatized.append(text) 

281 

282 return lemmatized 

283 

284 def _filter_stopwords( 

285 self, lemmatized_tokens: list[str], doc 

286 ) -> tuple[list[str], list[str]]: 

287 """Filter stop words while preserving important terms.""" 

288 filtered_tokens = [] 

289 removed_stopwords = [] 

290 

291 # Get spaCy stop words 

292 spacy_stopwords = self.spacy_analyzer.nlp.Defaults.stop_words 

293 all_stopwords = spacy_stopwords.union(self.custom_stopwords) 

294 

295 for token in lemmatized_tokens: 

296 # Always preserve technical terms 

297 if token in self.preserve_terms: 

298 filtered_tokens.append(token) 

299 # Filter stop words 

300 elif token in all_stopwords: 

301 removed_stopwords.append(token) 

302 # Keep other tokens 

303 else: 

304 filtered_tokens.append(token) 

305 

306 return filtered_tokens, removed_stopwords 

307 

308 def _normalize_tokens(self, filtered_tokens: list[str]) -> list[str]: 

309 """Normalize tokens for consistent matching.""" 

310 normalized = [] 

311 

312 for token in filtered_tokens: 

313 # Convert to lowercase 

314 normalized_token = token.lower() 

315 

316 # Remove very short or very long tokens 

317 if ( 

318 self.min_token_length <= len(normalized_token) <= self.max_token_length 

319 and normalized_token.isalpha() 

320 ): 

321 normalized.append(normalized_token) 

322 

323 return normalized 

324 

325 def _rebuild_structured_query(self, normalized_tokens: list[str], doc) -> str: 

326 """Rebuild query preserving some structure for phrase queries.""" 

327 # For now, just join tokens with spaces 

328 # Future enhancement: preserve quoted phrases, operators, etc. 

329 return " ".join(normalized_tokens) 

330 

331 def preprocess_for_search( 

332 self, query: str, search_type: str = "hybrid" 

333 ) -> dict[str, Any]: 

334 """Preprocess query specifically for search operations. 

335 

336 Args: 

337 query: The original query 

338 search_type: Type of search ("vector", "keyword", "hybrid") 

339 

340 Returns: 

341 Dictionary with preprocessed variants for different search types 

342 """ 

343 try: 

344 # Standard preprocessing 

345 standard_result = self.preprocess_query(query, preserve_structure=False) 

346 

347 # Structured preprocessing (preserves more structure) 

348 structured_result = self.preprocess_query(query, preserve_structure=True) 

349 

350 return { 

351 "original_query": query, 

352 "standard_preprocessed": standard_result.preprocessed_query, 

353 "structured_preprocessed": structured_result.preprocessed_query, 

354 "semantic_keywords": standard_result.normalized_tokens, 

355 "search_variants": { 

356 "vector_search": structured_result.preprocessed_query, # Preserve structure for vector 

357 "keyword_search": standard_result.preprocessed_query, # Normalize for BM25 

358 "hybrid_search": standard_result.preprocessed_query, # Default to normalized 

359 }, 

360 "preprocessing_metadata": { 

361 "removed_stopwords_count": len(standard_result.removed_stopwords), 

362 "processing_time_ms": standard_result.processing_time_ms, 

363 "processing_steps": standard_result.processing_steps, 

364 }, 

365 } 

366 

367 except Exception as e: 

368 logger.warning(f"Search preprocessing failed: {e}") 

369 return { 

370 "original_query": query, 

371 "standard_preprocessed": query, 

372 "structured_preprocessed": query, 

373 "semantic_keywords": query.lower().split(), 

374 "search_variants": { 

375 "vector_search": query, 

376 "keyword_search": query, 

377 "hybrid_search": query, 

378 }, 

379 "preprocessing_metadata": { 

380 "removed_stopwords_count": 0, 

381 "processing_time_ms": 0, 

382 "processing_steps": ["error"], 

383 }, 

384 } 

385 

386 def clear_cache(self): 

387 """Clear preprocessing cache.""" 

388 self._preprocessing_cache.clear() 

389 logger.debug("Cleared linguistic preprocessing cache") 

390 

391 def get_cache_stats(self) -> dict[str, int]: 

392 """Get cache statistics.""" 

393 return { 

394 "preprocessing_cache_size": len(self._preprocessing_cache), 

395 }