Coverage for src/qdrant_loader_mcp_server/search/nlp/linguistic_preprocessor.py: 93%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""Linguistic query preprocessing for improved search accuracy.""" 

2 

3import re 

4import time 

5from dataclasses import dataclass 

6from typing import List, Set, Dict, Any, Optional 

7 

8from ...utils.logging import LoggingConfig 

9from .spacy_analyzer import SpaCyQueryAnalyzer 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14@dataclass 

15class PreprocessingResult: 

16 """Container for query preprocessing results.""" 

17 

18 original_query: str 

19 preprocessed_query: str 

20 lemmatized_tokens: List[str] 

21 filtered_tokens: List[str] 

22 removed_stopwords: List[str] 

23 normalized_tokens: List[str] 

24 processing_steps: List[str] 

25 processing_time_ms: float 

26 

27 

28class LinguisticPreprocessor: 

29 """Linguistic query preprocessing using spaCy for lemmatization and filtering.""" 

30 

31 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer): 

32 """Initialize the linguistic preprocessor. 

33  

34 Args: 

35 spacy_analyzer: SpaCy analyzer instance for linguistic processing 

36 """ 

37 self.spacy_analyzer = spacy_analyzer 

38 self.logger = LoggingConfig.get_logger(__name__) 

39 

40 # Preprocessing configuration 

41 self.min_token_length = 2 

42 self.max_token_length = 50 

43 self.preserve_entities = True 

44 self.preserve_numbers = True 

45 

46 # Custom stop words (in addition to spaCy's) 

47 self.custom_stopwords = { 

48 "find", "show", "give", "tell", "search", "look", "get", "want", "need", 

49 "please", "help", "can", "could", "would", "should", "may", "might", 

50 "also", "just", "really", "very", "quite", "rather", "pretty", "much" 

51 } 

52 

53 # Technical terms to preserve (don't lemmatize) 

54 self.preserve_terms = { 

55 "api", "apis", "database", "databases", "server", "servers", 

56 "authentication", "authorization", "oauth", "jwt", "ssl", "tls", 

57 "crud", "rest", "restful", "graphql", "json", "xml", "yaml", 

58 "git", "github", "gitlab", "jira", "confluence", "jenkins" 

59 } 

60 

61 # Cache for preprocessing results 

62 self._preprocessing_cache: Dict[str, PreprocessingResult] = {} 

63 

64 def preprocess_query( 

65 self, 

66 query: str, 

67 preserve_structure: bool = False 

68 ) -> PreprocessingResult: 

69 """Preprocess query with lemmatization, stop word removal, and normalization. 

70  

71 Args: 

72 query: The original query to preprocess 

73 preserve_structure: If True, preserve query structure for phrase queries 

74  

75 Returns: 

76 PreprocessingResult containing preprocessed query and metadata 

77 """ 

78 start_time = time.time() 

79 

80 # Check cache first 

81 cache_key = f"{query}:{preserve_structure}" 

82 if cache_key in self._preprocessing_cache: 

83 cached = self._preprocessing_cache[cache_key] 

84 logger.debug(f"Using cached preprocessing for: {query[:50]}...") 

85 return cached 

86 

87 processing_steps = [] 

88 

89 try: 

90 # Step 1: Initial cleaning 

91 cleaned_query = self._initial_cleaning(query) 

92 processing_steps.append("initial_cleaning") 

93 

94 # Step 2: Process with spaCy 

95 doc = self.spacy_analyzer.nlp(cleaned_query) 

96 processing_steps.append("spacy_processing") 

97 

98 # Step 3: Extract and process tokens 

99 tokens_info = self._extract_tokens(doc) 

100 processing_steps.append("token_extraction") 

101 

102 # Step 4: Lemmatization with preservation rules 

103 lemmatized_tokens = self._lemmatize_tokens(tokens_info) 

104 processing_steps.append("lemmatization") 

105 

106 # Step 5: Stop word filtering 

107 filtered_tokens, removed_stopwords = self._filter_stopwords(lemmatized_tokens, doc) 

108 processing_steps.append("stopword_filtering") 

109 

110 # Step 6: Normalization 

111 normalized_tokens = self._normalize_tokens(filtered_tokens) 

112 processing_steps.append("normalization") 

113 

114 # Step 7: Build preprocessed query 

115 if preserve_structure: 

116 preprocessed_query = self._rebuild_structured_query(normalized_tokens, doc) 

117 else: 

118 preprocessed_query = " ".join(normalized_tokens) 

119 processing_steps.append("query_reconstruction") 

120 

121 # Create result 

122 processing_time_ms = (time.time() - start_time) * 1000 

123 

124 result = PreprocessingResult( 

125 original_query=query, 

126 preprocessed_query=preprocessed_query, 

127 lemmatized_tokens=lemmatized_tokens, 

128 filtered_tokens=filtered_tokens, 

129 removed_stopwords=removed_stopwords, 

130 normalized_tokens=normalized_tokens, 

131 processing_steps=processing_steps, 

132 processing_time_ms=processing_time_ms 

133 ) 

134 

135 # Cache the result 

136 self._preprocessing_cache[cache_key] = result 

137 

138 logger.debug( 

139 "🔥 Query preprocessing completed", 

140 original_query=query[:50], 

141 preprocessed_query=preprocessed_query[:50], 

142 tokens_removed=len(removed_stopwords), 

143 processing_time_ms=processing_time_ms, 

144 ) 

145 

146 return result 

147 

148 except Exception as e: 

149 logger.warning(f"Query preprocessing failed: {e}") 

150 # Return minimal preprocessing 

151 processing_time_ms = (time.time() - start_time) * 1000 

152 return PreprocessingResult( 

153 original_query=query, 

154 preprocessed_query=query, 

155 lemmatized_tokens=[], 

156 filtered_tokens=[], 

157 removed_stopwords=[], 

158 normalized_tokens=[], 

159 processing_steps=["error"], 

160 processing_time_ms=processing_time_ms 

161 ) 

162 

163 def _initial_cleaning(self, query: str) -> str: 

164 """Perform initial query cleaning.""" 

165 # Remove extra whitespace 

166 cleaned = re.sub(r'\s+', ' ', query.strip()) 

167 

168 # Normalize punctuation 

169 cleaned = re.sub(r'[^\w\s\?\!\-\.]', ' ', cleaned) 

170 

171 # Handle contractions 

172 contractions = { 

173 "don't": "do not", 

174 "won't": "will not", 

175 "can't": "cannot", 

176 "n't": " not", 

177 "'re": " are", 

178 "'ve": " have", 

179 "'ll": " will", 

180 "'d": " would", 

181 "'m": " am" 

182 } 

183 

184 for contraction, expansion in contractions.items(): 

185 cleaned = cleaned.replace(contraction, expansion) 

186 

187 return cleaned 

188 

189 def _extract_tokens(self, doc) -> List[Dict[str, Any]]: 

190 """Extract token information from spaCy doc.""" 

191 tokens_info = [] 

192 

193 for token in doc: 

194 if not token.is_space: # Skip whitespace tokens 

195 token_info = { 

196 "text": token.text, 

197 "lemma": token.lemma_, 

198 "pos": token.pos_, 

199 "tag": token.tag_, 

200 "is_alpha": token.is_alpha, 

201 "is_stop": token.is_stop, 

202 "is_punct": token.is_punct, 

203 "is_digit": token.like_num, 

204 "ent_type": token.ent_type_, 

205 "ent_iob": token.ent_iob_, 

206 } 

207 tokens_info.append(token_info) 

208 

209 return tokens_info 

210 

211 def _lemmatize_tokens(self, tokens_info: List[Dict[str, Any]]) -> List[str]: 

212 """Lemmatize tokens with preservation rules.""" 

213 lemmatized = [] 

214 

215 for token_info in tokens_info: 

216 text = token_info["text"].lower() 

217 lemma = token_info["lemma"].lower() 

218 

219 # Preserve certain technical terms 

220 if text in self.preserve_terms: 

221 lemmatized.append(text) 

222 # Preserve entities 

223 elif self.preserve_entities and token_info["ent_type"]: 

224 lemmatized.append(text) 

225 # Preserve numbers if configured 

226 elif self.preserve_numbers and token_info["is_digit"]: 

227 lemmatized.append(text) 

228 # Skip punctuation 

229 elif token_info["is_punct"]: 

230 continue 

231 # Use lemma for other words 

232 elif token_info["is_alpha"] and len(lemma) >= self.min_token_length: 

233 lemmatized.append(lemma) 

234 elif len(text) >= self.min_token_length: 

235 lemmatized.append(text) 

236 

237 return lemmatized 

238 

239 def _filter_stopwords( 

240 self, 

241 lemmatized_tokens: List[str], 

242 doc 

243 ) -> tuple[List[str], List[str]]: 

244 """Filter stop words while preserving important terms.""" 

245 filtered_tokens = [] 

246 removed_stopwords = [] 

247 

248 # Get spaCy stop words 

249 spacy_stopwords = self.spacy_analyzer.nlp.Defaults.stop_words 

250 all_stopwords = spacy_stopwords.union(self.custom_stopwords) 

251 

252 for token in lemmatized_tokens: 

253 # Always preserve technical terms 

254 if token in self.preserve_terms: 

255 filtered_tokens.append(token) 

256 # Filter stop words 

257 elif token in all_stopwords: 

258 removed_stopwords.append(token) 

259 # Keep other tokens 

260 else: 

261 filtered_tokens.append(token) 

262 

263 return filtered_tokens, removed_stopwords 

264 

265 def _normalize_tokens(self, filtered_tokens: List[str]) -> List[str]: 

266 """Normalize tokens for consistent matching.""" 

267 normalized = [] 

268 

269 for token in filtered_tokens: 

270 # Convert to lowercase 

271 normalized_token = token.lower() 

272 

273 # Remove very short or very long tokens 

274 if (self.min_token_length <= len(normalized_token) <= self.max_token_length and 

275 normalized_token.isalpha()): 

276 normalized.append(normalized_token) 

277 

278 return normalized 

279 

280 def _rebuild_structured_query( 

281 self, 

282 normalized_tokens: List[str], 

283 doc 

284 ) -> str: 

285 """Rebuild query preserving some structure for phrase queries.""" 

286 # For now, just join tokens with spaces 

287 # Future enhancement: preserve quoted phrases, operators, etc. 

288 return " ".join(normalized_tokens) 

289 

290 def preprocess_for_search( 

291 self, 

292 query: str, 

293 search_type: str = "hybrid" 

294 ) -> Dict[str, Any]: 

295 """Preprocess query specifically for search operations. 

296  

297 Args: 

298 query: The original query 

299 search_type: Type of search ("vector", "keyword", "hybrid") 

300  

301 Returns: 

302 Dictionary with preprocessed variants for different search types 

303 """ 

304 try: 

305 # Standard preprocessing 

306 standard_result = self.preprocess_query(query, preserve_structure=False) 

307 

308 # Structured preprocessing (preserves more structure) 

309 structured_result = self.preprocess_query(query, preserve_structure=True) 

310 

311 return { 

312 "original_query": query, 

313 "standard_preprocessed": standard_result.preprocessed_query, 

314 "structured_preprocessed": structured_result.preprocessed_query, 

315 "semantic_keywords": standard_result.normalized_tokens, 

316 "search_variants": { 

317 "vector_search": structured_result.preprocessed_query, # Preserve structure for vector 

318 "keyword_search": standard_result.preprocessed_query, # Normalize for BM25 

319 "hybrid_search": standard_result.preprocessed_query, # Default to normalized 

320 }, 

321 "preprocessing_metadata": { 

322 "removed_stopwords_count": len(standard_result.removed_stopwords), 

323 "processing_time_ms": standard_result.processing_time_ms, 

324 "processing_steps": standard_result.processing_steps, 

325 } 

326 } 

327 

328 except Exception as e: 

329 logger.warning(f"Search preprocessing failed: {e}") 

330 return { 

331 "original_query": query, 

332 "standard_preprocessed": query, 

333 "structured_preprocessed": query, 

334 "semantic_keywords": query.lower().split(), 

335 "search_variants": { 

336 "vector_search": query, 

337 "keyword_search": query, 

338 "hybrid_search": query, 

339 }, 

340 "preprocessing_metadata": { 

341 "removed_stopwords_count": 0, 

342 "processing_time_ms": 0, 

343 "processing_steps": ["error"], 

344 } 

345 } 

346 

347 def clear_cache(self): 

348 """Clear preprocessing cache.""" 

349 self._preprocessing_cache.clear() 

350 logger.debug("Cleared linguistic preprocessing cache") 

351 

352 def get_cache_stats(self) -> Dict[str, int]: 

353 """Get cache statistics.""" 

354 return { 

355 "preprocessing_cache_size": len(self._preprocessing_cache), 

356 }