Coverage for src/qdrant_loader_mcp_server/search/nlp/linguistic_preprocessor.py: 93%
128 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""Linguistic query preprocessing for improved search accuracy."""
3import re
4import time
5from dataclasses import dataclass
6from typing import Any
8from ...utils.logging import LoggingConfig
9from .spacy_analyzer import SpaCyQueryAnalyzer
11logger = LoggingConfig.get_logger(__name__)
14@dataclass
15class PreprocessingResult:
16 """Container for query preprocessing results."""
18 original_query: str
19 preprocessed_query: str
20 lemmatized_tokens: list[str]
21 filtered_tokens: list[str]
22 removed_stopwords: list[str]
23 normalized_tokens: list[str]
24 processing_steps: list[str]
25 processing_time_ms: float
28class LinguisticPreprocessor:
29 """Linguistic query preprocessing using spaCy for lemmatization and filtering."""
31 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
32 """Initialize the linguistic preprocessor.
34 Args:
35 spacy_analyzer: SpaCy analyzer instance for linguistic processing
36 """
37 self.spacy_analyzer = spacy_analyzer
38 self.logger = LoggingConfig.get_logger(__name__)
40 # Preprocessing configuration
41 self.min_token_length = 2
42 self.max_token_length = 50
43 self.preserve_entities = True
44 self.preserve_numbers = True
46 # Custom stop words (in addition to spaCy's)
47 self.custom_stopwords = {
48 "find",
49 "show",
50 "give",
51 "tell",
52 "search",
53 "look",
54 "get",
55 "want",
56 "need",
57 "please",
58 "help",
59 "can",
60 "could",
61 "would",
62 "should",
63 "may",
64 "might",
65 "also",
66 "just",
67 "really",
68 "very",
69 "quite",
70 "rather",
71 "pretty",
72 "much",
73 }
75 # Technical terms to preserve (don't lemmatize)
76 self.preserve_terms = {
77 "api",
78 "apis",
79 "database",
80 "databases",
81 "server",
82 "servers",
83 "authentication",
84 "authorization",
85 "oauth",
86 "jwt",
87 "ssl",
88 "tls",
89 "crud",
90 "rest",
91 "restful",
92 "graphql",
93 "json",
94 "xml",
95 "yaml",
96 "git",
97 "github",
98 "gitlab",
99 "jira",
100 "confluence",
101 "jenkins",
102 }
104 # Cache for preprocessing results
105 self._preprocessing_cache: dict[str, PreprocessingResult] = {}
107 def preprocess_query(
108 self, query: str, preserve_structure: bool = False
109 ) -> PreprocessingResult:
110 """Preprocess query with lemmatization, stop word removal, and normalization.
112 Args:
113 query: The original query to preprocess
114 preserve_structure: If True, preserve query structure for phrase queries
116 Returns:
117 PreprocessingResult containing preprocessed query and metadata
118 """
119 start_time = time.time()
121 # Check cache first
122 cache_key = f"{query}:{preserve_structure}"
123 if cache_key in self._preprocessing_cache:
124 cached = self._preprocessing_cache[cache_key]
125 logger.debug(f"Using cached preprocessing for: {query[:50]}...")
126 return cached
128 processing_steps = []
130 try:
131 # Step 1: Initial cleaning
132 cleaned_query = self._initial_cleaning(query)
133 processing_steps.append("initial_cleaning")
135 # Step 2: Process with spaCy
136 doc = self.spacy_analyzer.nlp(cleaned_query)
137 processing_steps.append("spacy_processing")
139 # Step 3: Extract and process tokens
140 tokens_info = self._extract_tokens(doc)
141 processing_steps.append("token_extraction")
143 # Step 4: Lemmatization with preservation rules
144 lemmatized_tokens = self._lemmatize_tokens(tokens_info)
145 processing_steps.append("lemmatization")
147 # Step 5: Stop word filtering
148 filtered_tokens, removed_stopwords = self._filter_stopwords(
149 lemmatized_tokens, doc
150 )
151 processing_steps.append("stopword_filtering")
153 # Step 6: Normalization
154 normalized_tokens = self._normalize_tokens(filtered_tokens)
155 processing_steps.append("normalization")
157 # Step 7: Build preprocessed query
158 if preserve_structure:
159 preprocessed_query = self._rebuild_structured_query(
160 normalized_tokens, doc
161 )
162 else:
163 preprocessed_query = " ".join(normalized_tokens)
164 processing_steps.append("query_reconstruction")
166 # Create result
167 processing_time_ms = (time.time() - start_time) * 1000
169 result = PreprocessingResult(
170 original_query=query,
171 preprocessed_query=preprocessed_query,
172 lemmatized_tokens=lemmatized_tokens,
173 filtered_tokens=filtered_tokens,
174 removed_stopwords=removed_stopwords,
175 normalized_tokens=normalized_tokens,
176 processing_steps=processing_steps,
177 processing_time_ms=processing_time_ms,
178 )
180 # Cache the result
181 self._preprocessing_cache[cache_key] = result
183 logger.debug(
184 "🔥 Query preprocessing completed",
185 original_query=query[:50],
186 preprocessed_query=preprocessed_query[:50],
187 tokens_removed=len(removed_stopwords),
188 processing_time_ms=processing_time_ms,
189 )
191 return result
193 except Exception as e:
194 logger.warning(f"Query preprocessing failed: {e}")
195 # Return minimal preprocessing
196 processing_time_ms = (time.time() - start_time) * 1000
197 return PreprocessingResult(
198 original_query=query,
199 preprocessed_query=query,
200 lemmatized_tokens=[],
201 filtered_tokens=[],
202 removed_stopwords=[],
203 normalized_tokens=[],
204 processing_steps=["error"],
205 processing_time_ms=processing_time_ms,
206 )
208 def _initial_cleaning(self, query: str) -> str:
209 """Perform initial query cleaning."""
210 # Remove extra whitespace
211 cleaned = re.sub(r"\s+", " ", query.strip())
213 # Normalize punctuation
214 cleaned = re.sub(r"[^\w\s\?\!\-\.]", " ", cleaned)
216 # Handle contractions
217 contractions = {
218 "don't": "do not",
219 "won't": "will not",
220 "can't": "cannot",
221 "n't": " not",
222 "'re": " are",
223 "'ve": " have",
224 "'ll": " will",
225 "'d": " would",
226 "'m": " am",
227 }
229 for contraction, expansion in contractions.items():
230 cleaned = cleaned.replace(contraction, expansion)
232 return cleaned
234 def _extract_tokens(self, doc) -> list[dict[str, Any]]:
235 """Extract token information from spaCy doc."""
236 tokens_info = []
238 for token in doc:
239 if not token.is_space: # Skip whitespace tokens
240 token_info = {
241 "text": token.text,
242 "lemma": token.lemma_,
243 "pos": token.pos_,
244 "tag": token.tag_,
245 "is_alpha": token.is_alpha,
246 "is_stop": token.is_stop,
247 "is_punct": token.is_punct,
248 "is_digit": token.like_num,
249 "ent_type": token.ent_type_,
250 "ent_iob": token.ent_iob_,
251 }
252 tokens_info.append(token_info)
254 return tokens_info
256 def _lemmatize_tokens(self, tokens_info: list[dict[str, Any]]) -> list[str]:
257 """Lemmatize tokens with preservation rules."""
258 lemmatized = []
260 for token_info in tokens_info:
261 text = token_info["text"].lower()
262 lemma = token_info["lemma"].lower()
264 # Preserve certain technical terms
265 if text in self.preserve_terms:
266 lemmatized.append(text)
267 # Preserve entities
268 elif self.preserve_entities and token_info["ent_type"]:
269 lemmatized.append(text)
270 # Preserve numbers if configured
271 elif self.preserve_numbers and token_info["is_digit"]:
272 lemmatized.append(text)
273 # Skip punctuation
274 elif token_info["is_punct"]:
275 continue
276 # Use lemma for other words
277 elif token_info["is_alpha"] and len(lemma) >= self.min_token_length:
278 lemmatized.append(lemma)
279 elif len(text) >= self.min_token_length:
280 lemmatized.append(text)
282 return lemmatized
284 def _filter_stopwords(
285 self, lemmatized_tokens: list[str], doc
286 ) -> tuple[list[str], list[str]]:
287 """Filter stop words while preserving important terms."""
288 filtered_tokens = []
289 removed_stopwords = []
291 # Get spaCy stop words
292 spacy_stopwords = self.spacy_analyzer.nlp.Defaults.stop_words
293 all_stopwords = spacy_stopwords.union(self.custom_stopwords)
295 for token in lemmatized_tokens:
296 # Always preserve technical terms
297 if token in self.preserve_terms:
298 filtered_tokens.append(token)
299 # Filter stop words
300 elif token in all_stopwords:
301 removed_stopwords.append(token)
302 # Keep other tokens
303 else:
304 filtered_tokens.append(token)
306 return filtered_tokens, removed_stopwords
308 def _normalize_tokens(self, filtered_tokens: list[str]) -> list[str]:
309 """Normalize tokens for consistent matching."""
310 normalized = []
312 for token in filtered_tokens:
313 # Convert to lowercase
314 normalized_token = token.lower()
316 # Remove very short or very long tokens
317 if (
318 self.min_token_length <= len(normalized_token) <= self.max_token_length
319 and normalized_token.isalpha()
320 ):
321 normalized.append(normalized_token)
323 return normalized
325 def _rebuild_structured_query(self, normalized_tokens: list[str], doc) -> str:
326 """Rebuild query preserving some structure for phrase queries."""
327 # For now, just join tokens with spaces
328 # Future enhancement: preserve quoted phrases, operators, etc.
329 return " ".join(normalized_tokens)
331 def preprocess_for_search(
332 self, query: str, search_type: str = "hybrid"
333 ) -> dict[str, Any]:
334 """Preprocess query specifically for search operations.
336 Args:
337 query: The original query
338 search_type: Type of search ("vector", "keyword", "hybrid")
340 Returns:
341 Dictionary with preprocessed variants for different search types
342 """
343 try:
344 # Standard preprocessing
345 standard_result = self.preprocess_query(query, preserve_structure=False)
347 # Structured preprocessing (preserves more structure)
348 structured_result = self.preprocess_query(query, preserve_structure=True)
350 return {
351 "original_query": query,
352 "standard_preprocessed": standard_result.preprocessed_query,
353 "structured_preprocessed": structured_result.preprocessed_query,
354 "semantic_keywords": standard_result.normalized_tokens,
355 "search_variants": {
356 "vector_search": structured_result.preprocessed_query, # Preserve structure for vector
357 "keyword_search": standard_result.preprocessed_query, # Normalize for BM25
358 "hybrid_search": standard_result.preprocessed_query, # Default to normalized
359 },
360 "preprocessing_metadata": {
361 "removed_stopwords_count": len(standard_result.removed_stopwords),
362 "processing_time_ms": standard_result.processing_time_ms,
363 "processing_steps": standard_result.processing_steps,
364 },
365 }
367 except Exception as e:
368 logger.warning(f"Search preprocessing failed: {e}")
369 return {
370 "original_query": query,
371 "standard_preprocessed": query,
372 "structured_preprocessed": query,
373 "semantic_keywords": query.lower().split(),
374 "search_variants": {
375 "vector_search": query,
376 "keyword_search": query,
377 "hybrid_search": query,
378 },
379 "preprocessing_metadata": {
380 "removed_stopwords_count": 0,
381 "processing_time_ms": 0,
382 "processing_steps": ["error"],
383 },
384 }
386 def clear_cache(self):
387 """Clear preprocessing cache."""
388 self._preprocessing_cache.clear()
389 logger.debug("Cleared linguistic preprocessing cache")
391 def get_cache_stats(self) -> dict[str, int]:
392 """Get cache statistics."""
393 return {
394 "preprocessing_cache_size": len(self._preprocessing_cache),
395 }