Coverage for src/qdrant_loader_mcp_server/search/nlp/linguistic_preprocessor.py: 93%
128 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""Linguistic query preprocessing for improved search accuracy."""
3import re
4import time
5from dataclasses import dataclass
6from typing import List, Set, Dict, Any, Optional
8from ...utils.logging import LoggingConfig
9from .spacy_analyzer import SpaCyQueryAnalyzer
11logger = LoggingConfig.get_logger(__name__)
14@dataclass
15class PreprocessingResult:
16 """Container for query preprocessing results."""
18 original_query: str
19 preprocessed_query: str
20 lemmatized_tokens: List[str]
21 filtered_tokens: List[str]
22 removed_stopwords: List[str]
23 normalized_tokens: List[str]
24 processing_steps: List[str]
25 processing_time_ms: float
28class LinguisticPreprocessor:
29 """Linguistic query preprocessing using spaCy for lemmatization and filtering."""
31 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
32 """Initialize the linguistic preprocessor.
34 Args:
35 spacy_analyzer: SpaCy analyzer instance for linguistic processing
36 """
37 self.spacy_analyzer = spacy_analyzer
38 self.logger = LoggingConfig.get_logger(__name__)
40 # Preprocessing configuration
41 self.min_token_length = 2
42 self.max_token_length = 50
43 self.preserve_entities = True
44 self.preserve_numbers = True
46 # Custom stop words (in addition to spaCy's)
47 self.custom_stopwords = {
48 "find", "show", "give", "tell", "search", "look", "get", "want", "need",
49 "please", "help", "can", "could", "would", "should", "may", "might",
50 "also", "just", "really", "very", "quite", "rather", "pretty", "much"
51 }
53 # Technical terms to preserve (don't lemmatize)
54 self.preserve_terms = {
55 "api", "apis", "database", "databases", "server", "servers",
56 "authentication", "authorization", "oauth", "jwt", "ssl", "tls",
57 "crud", "rest", "restful", "graphql", "json", "xml", "yaml",
58 "git", "github", "gitlab", "jira", "confluence", "jenkins"
59 }
61 # Cache for preprocessing results
62 self._preprocessing_cache: Dict[str, PreprocessingResult] = {}
64 def preprocess_query(
65 self,
66 query: str,
67 preserve_structure: bool = False
68 ) -> PreprocessingResult:
69 """Preprocess query with lemmatization, stop word removal, and normalization.
71 Args:
72 query: The original query to preprocess
73 preserve_structure: If True, preserve query structure for phrase queries
75 Returns:
76 PreprocessingResult containing preprocessed query and metadata
77 """
78 start_time = time.time()
80 # Check cache first
81 cache_key = f"{query}:{preserve_structure}"
82 if cache_key in self._preprocessing_cache:
83 cached = self._preprocessing_cache[cache_key]
84 logger.debug(f"Using cached preprocessing for: {query[:50]}...")
85 return cached
87 processing_steps = []
89 try:
90 # Step 1: Initial cleaning
91 cleaned_query = self._initial_cleaning(query)
92 processing_steps.append("initial_cleaning")
94 # Step 2: Process with spaCy
95 doc = self.spacy_analyzer.nlp(cleaned_query)
96 processing_steps.append("spacy_processing")
98 # Step 3: Extract and process tokens
99 tokens_info = self._extract_tokens(doc)
100 processing_steps.append("token_extraction")
102 # Step 4: Lemmatization with preservation rules
103 lemmatized_tokens = self._lemmatize_tokens(tokens_info)
104 processing_steps.append("lemmatization")
106 # Step 5: Stop word filtering
107 filtered_tokens, removed_stopwords = self._filter_stopwords(lemmatized_tokens, doc)
108 processing_steps.append("stopword_filtering")
110 # Step 6: Normalization
111 normalized_tokens = self._normalize_tokens(filtered_tokens)
112 processing_steps.append("normalization")
114 # Step 7: Build preprocessed query
115 if preserve_structure:
116 preprocessed_query = self._rebuild_structured_query(normalized_tokens, doc)
117 else:
118 preprocessed_query = " ".join(normalized_tokens)
119 processing_steps.append("query_reconstruction")
121 # Create result
122 processing_time_ms = (time.time() - start_time) * 1000
124 result = PreprocessingResult(
125 original_query=query,
126 preprocessed_query=preprocessed_query,
127 lemmatized_tokens=lemmatized_tokens,
128 filtered_tokens=filtered_tokens,
129 removed_stopwords=removed_stopwords,
130 normalized_tokens=normalized_tokens,
131 processing_steps=processing_steps,
132 processing_time_ms=processing_time_ms
133 )
135 # Cache the result
136 self._preprocessing_cache[cache_key] = result
138 logger.debug(
139 "🔥 Query preprocessing completed",
140 original_query=query[:50],
141 preprocessed_query=preprocessed_query[:50],
142 tokens_removed=len(removed_stopwords),
143 processing_time_ms=processing_time_ms,
144 )
146 return result
148 except Exception as e:
149 logger.warning(f"Query preprocessing failed: {e}")
150 # Return minimal preprocessing
151 processing_time_ms = (time.time() - start_time) * 1000
152 return PreprocessingResult(
153 original_query=query,
154 preprocessed_query=query,
155 lemmatized_tokens=[],
156 filtered_tokens=[],
157 removed_stopwords=[],
158 normalized_tokens=[],
159 processing_steps=["error"],
160 processing_time_ms=processing_time_ms
161 )
163 def _initial_cleaning(self, query: str) -> str:
164 """Perform initial query cleaning."""
165 # Remove extra whitespace
166 cleaned = re.sub(r'\s+', ' ', query.strip())
168 # Normalize punctuation
169 cleaned = re.sub(r'[^\w\s\?\!\-\.]', ' ', cleaned)
171 # Handle contractions
172 contractions = {
173 "don't": "do not",
174 "won't": "will not",
175 "can't": "cannot",
176 "n't": " not",
177 "'re": " are",
178 "'ve": " have",
179 "'ll": " will",
180 "'d": " would",
181 "'m": " am"
182 }
184 for contraction, expansion in contractions.items():
185 cleaned = cleaned.replace(contraction, expansion)
187 return cleaned
189 def _extract_tokens(self, doc) -> List[Dict[str, Any]]:
190 """Extract token information from spaCy doc."""
191 tokens_info = []
193 for token in doc:
194 if not token.is_space: # Skip whitespace tokens
195 token_info = {
196 "text": token.text,
197 "lemma": token.lemma_,
198 "pos": token.pos_,
199 "tag": token.tag_,
200 "is_alpha": token.is_alpha,
201 "is_stop": token.is_stop,
202 "is_punct": token.is_punct,
203 "is_digit": token.like_num,
204 "ent_type": token.ent_type_,
205 "ent_iob": token.ent_iob_,
206 }
207 tokens_info.append(token_info)
209 return tokens_info
211 def _lemmatize_tokens(self, tokens_info: List[Dict[str, Any]]) -> List[str]:
212 """Lemmatize tokens with preservation rules."""
213 lemmatized = []
215 for token_info in tokens_info:
216 text = token_info["text"].lower()
217 lemma = token_info["lemma"].lower()
219 # Preserve certain technical terms
220 if text in self.preserve_terms:
221 lemmatized.append(text)
222 # Preserve entities
223 elif self.preserve_entities and token_info["ent_type"]:
224 lemmatized.append(text)
225 # Preserve numbers if configured
226 elif self.preserve_numbers and token_info["is_digit"]:
227 lemmatized.append(text)
228 # Skip punctuation
229 elif token_info["is_punct"]:
230 continue
231 # Use lemma for other words
232 elif token_info["is_alpha"] and len(lemma) >= self.min_token_length:
233 lemmatized.append(lemma)
234 elif len(text) >= self.min_token_length:
235 lemmatized.append(text)
237 return lemmatized
239 def _filter_stopwords(
240 self,
241 lemmatized_tokens: List[str],
242 doc
243 ) -> tuple[List[str], List[str]]:
244 """Filter stop words while preserving important terms."""
245 filtered_tokens = []
246 removed_stopwords = []
248 # Get spaCy stop words
249 spacy_stopwords = self.spacy_analyzer.nlp.Defaults.stop_words
250 all_stopwords = spacy_stopwords.union(self.custom_stopwords)
252 for token in lemmatized_tokens:
253 # Always preserve technical terms
254 if token in self.preserve_terms:
255 filtered_tokens.append(token)
256 # Filter stop words
257 elif token in all_stopwords:
258 removed_stopwords.append(token)
259 # Keep other tokens
260 else:
261 filtered_tokens.append(token)
263 return filtered_tokens, removed_stopwords
265 def _normalize_tokens(self, filtered_tokens: List[str]) -> List[str]:
266 """Normalize tokens for consistent matching."""
267 normalized = []
269 for token in filtered_tokens:
270 # Convert to lowercase
271 normalized_token = token.lower()
273 # Remove very short or very long tokens
274 if (self.min_token_length <= len(normalized_token) <= self.max_token_length and
275 normalized_token.isalpha()):
276 normalized.append(normalized_token)
278 return normalized
280 def _rebuild_structured_query(
281 self,
282 normalized_tokens: List[str],
283 doc
284 ) -> str:
285 """Rebuild query preserving some structure for phrase queries."""
286 # For now, just join tokens with spaces
287 # Future enhancement: preserve quoted phrases, operators, etc.
288 return " ".join(normalized_tokens)
290 def preprocess_for_search(
291 self,
292 query: str,
293 search_type: str = "hybrid"
294 ) -> Dict[str, Any]:
295 """Preprocess query specifically for search operations.
297 Args:
298 query: The original query
299 search_type: Type of search ("vector", "keyword", "hybrid")
301 Returns:
302 Dictionary with preprocessed variants for different search types
303 """
304 try:
305 # Standard preprocessing
306 standard_result = self.preprocess_query(query, preserve_structure=False)
308 # Structured preprocessing (preserves more structure)
309 structured_result = self.preprocess_query(query, preserve_structure=True)
311 return {
312 "original_query": query,
313 "standard_preprocessed": standard_result.preprocessed_query,
314 "structured_preprocessed": structured_result.preprocessed_query,
315 "semantic_keywords": standard_result.normalized_tokens,
316 "search_variants": {
317 "vector_search": structured_result.preprocessed_query, # Preserve structure for vector
318 "keyword_search": standard_result.preprocessed_query, # Normalize for BM25
319 "hybrid_search": standard_result.preprocessed_query, # Default to normalized
320 },
321 "preprocessing_metadata": {
322 "removed_stopwords_count": len(standard_result.removed_stopwords),
323 "processing_time_ms": standard_result.processing_time_ms,
324 "processing_steps": standard_result.processing_steps,
325 }
326 }
328 except Exception as e:
329 logger.warning(f"Search preprocessing failed: {e}")
330 return {
331 "original_query": query,
332 "standard_preprocessed": query,
333 "structured_preprocessed": query,
334 "semantic_keywords": query.lower().split(),
335 "search_variants": {
336 "vector_search": query,
337 "keyword_search": query,
338 "hybrid_search": query,
339 },
340 "preprocessing_metadata": {
341 "removed_stopwords_count": 0,
342 "processing_time_ms": 0,
343 "processing_steps": ["error"],
344 }
345 }
347 def clear_cache(self):
348 """Clear preprocessing cache."""
349 self._preprocessing_cache.clear()
350 logger.debug("Cleared linguistic preprocessing cache")
352 def get_cache_stats(self) -> Dict[str, int]:
353 """Get cache statistics."""
354 return {
355 "preprocessing_cache_size": len(self._preprocessing_cache),
356 }