Coverage for src / qdrant_loader / core / text_processing / semantic_analyzer.py: 90%
217 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Semantic analysis module for text processing."""
3import hashlib
4import logging
5import threading
6from dataclasses import dataclass
7from typing import Any
9import spacy
10from gensim import corpora
11from gensim.models import LdaModel
12from gensim.parsing.preprocessing import preprocess_string
13from spacy.cli.download import download as spacy_download
14from spacy.tokens import Doc
16logger = logging.getLogger(__name__)
19def is_meaningful_text(text: str) -> bool:
20 """Check if text contains meaningful content (letters or digits).
22 Returns False for text that only contains:
23 - Punctuation marks: ., #, @, |, -, _, etc.
24 - Whitespace characters
25 - Special symbols without semantic meaning (---, ..., |||, etc.)
27 """
28 # Check if text contains at least one alphanumeric character
29 return any(c.isalnum() for c in text)
32@dataclass
33class SemanticAnalysisResult:
34 """Container for semantic analysis results."""
36 entities: list[dict[str, Any]]
37 pos_tags: list[dict[str, Any]]
38 dependencies: list[dict[str, Any]]
39 topics: list[dict[str, Any]]
40 key_phrases: list[str]
41 document_similarity: dict[str, float]
44class SemanticAnalyzer:
45 """Advanced semantic analysis for text processing."""
47 def __init__(
48 self,
49 spacy_model: str = "en_core_web_md",
50 num_topics: int = 5,
51 passes: int = 10,
52 min_topic_freq: int = 2,
53 ):
54 """Initialize the semantic analyzer.
56 Args:
57 spacy_model: Name of the spaCy model to use
58 num_topics: Number of topics for LDA
59 passes: Number of passes for LDA training
60 min_topic_freq: Minimum frequency for topic terms
61 """
62 self.logger = logging.getLogger(__name__)
64 # Initialize spaCy
65 try:
66 self.nlp = spacy.load(spacy_model)
67 except OSError:
68 self.logger.info(f"Downloading spaCy model {spacy_model}...")
69 spacy_download(spacy_model)
70 self.nlp = spacy.load(spacy_model)
72 # Initialize LDA parameters
73 self.num_topics = num_topics
74 self.passes = passes
75 self.min_topic_freq = min_topic_freq
77 # Initialize LDA model
78 self.lda_model = None
79 self.dictionary = None
81 # Cache for processed documents
82 self._doc_cache: dict = {}
83 self._doc_cache_lock = threading.Lock()
85 def _build_cache_key(
86 self, text: str, doc_id: str | None, include_enhanced: bool
87 ) -> tuple[str, bool, str] | None:
88 """Build a cache key that includes a content fingerprint.
90 Including a fingerprint prevents stale cache hits when the same doc_id
91 is reused with different content.
92 """
93 if not doc_id:
94 return None
96 text_fingerprint = hashlib.sha256(text.encode("utf-8")).hexdigest()
97 return (doc_id, include_enhanced, text_fingerprint)
99 def analyze_text(
100 self,
101 text: str,
102 doc_id: str | None = None,
103 include_enhanced: bool = False,
104 ) -> SemanticAnalysisResult:
105 """Perform comprehensive semantic analysis on text.
107 Args:
108 text: Text to analyze
109 doc_id: Optional document ID for caching
110 include_enhanced: Whether to compute enhanced NLP fields
111 (pos_tags, dependencies, document_similarity)
113 Returns:
114 SemanticAnalysisResult containing all analysis results
115 """
116 # Check cache
117 cache_key = self._build_cache_key(text, doc_id, include_enhanced)
119 # Protected read
120 with self._doc_cache_lock:
121 cached = self._doc_cache.get(cache_key) if cache_key else None
123 if cached is not None:
124 if include_enhanced:
125 # Compute similarity OUTSIDE the lock (can be slow)
126 doc_similarity = self._calculate_document_similarity(
127 text, doc_id=doc_id
128 )
129 refreshed = SemanticAnalysisResult(
130 entities=cached.entities,
131 pos_tags=cached.pos_tags,
132 dependencies=cached.dependencies,
133 topics=cached.topics,
134 key_phrases=cached.key_phrases,
135 document_similarity=doc_similarity,
136 )
137 # Protected write-back
138 with self._doc_cache_lock:
139 self._doc_cache[cache_key] = refreshed
140 return refreshed
141 return cached
143 # Process with spaCy
144 doc = self.nlp(text)
146 # Extract entities with linking
147 entities = self._extract_entities(doc)
149 if include_enhanced:
150 # Get part-of-speech tags
151 pos_tags = self._get_pos_tags(doc)
153 # Get dependency parse
154 dependencies = self._get_dependencies(doc)
155 else:
156 pos_tags = []
157 dependencies = []
159 # Extract topics
160 topics = self._extract_topics(text)
162 # Extract key phrases
163 key_phrases = self._extract_key_phrases(doc)
165 # Calculate document similarity
166 doc_similarity = (
167 self._calculate_document_similarity(text, doc_id=doc_id)
168 if include_enhanced
169 else {}
170 )
172 # Create result
173 result = SemanticAnalysisResult(
174 entities=entities,
175 pos_tags=pos_tags,
176 dependencies=dependencies,
177 topics=topics,
178 key_phrases=key_phrases,
179 document_similarity=doc_similarity,
180 )
182 # Protected write
183 if cache_key:
184 with self._doc_cache_lock:
185 self._doc_cache[cache_key] = result
187 return result
189 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]:
190 """Extract named entities with linking, filtering garbage entities.
192 Filters out entities that:
193 - Only contain punctuation/symbols (., #, |, etc.)
194 - Don't have any alphanumeric characters
195 - Are just whitespace
197 Args:
198 doc: spaCy document
200 Returns:
201 List of entity dictionaries with linking information
202 """
203 entities = []
204 for ent in doc.ents:
205 # Filter entities that only contain punctuation/symbols
206 if not is_meaningful_text(ent.text):
207 continue
209 # Get entity context
210 start_sent = ent.sent.start
211 end_sent = ent.sent.end
212 context = doc[start_sent:end_sent].text
214 # Get entity description
215 description = self.nlp.vocab.strings[ent.label_]
217 # Get related entities (also filter meaningless ones)
218 related = []
219 for token in ent.sent:
220 if token.ent_type_ and token.text != ent.text:
221 # Only add related entities with meaningful text
222 if is_meaningful_text(token.text):
223 related.append(
224 {
225 "text": token.text,
226 "type": token.ent_type_,
227 "relation": token.dep_,
228 }
229 )
231 entities.append(
232 {
233 "text": ent.text,
234 "label": ent.label_,
235 "start": ent.start_char,
236 "end": ent.end_char,
237 "description": description,
238 "context": context,
239 "related_entities": related,
240 }
241 )
243 return entities
245 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]:
246 """Get part-of-speech tags with detailed information, filtering noise tokens.
248 Filters out multiple types of noise:
249 - Whitespace tokens (is_space=True)
250 - Punctuation tokens (is_punct=True)
251 - Symbol-only tokens without alphanumeric content (e.g., ---, ..., |||)
253 This is especially important for Excel tables and structured data.
255 Args:
256 doc: spaCy document
258 Returns:
259 List of POS tag dictionaries (excluding spaces, punctuation, and symbols)
260 """
261 pos_tags = []
262 for token in doc:
263 # Skip whitespace and punctuation - they pollute metadata
264 if token.is_space or token.is_punct:
265 continue
267 # Also skip tokens with no meaningful content (e.g., ---, ...)
268 # This catches edge cases where spaCy doesn't mark as punct
269 if not is_meaningful_text(token.text):
270 continue
272 pos_tags.append(
273 {
274 "text": token.text,
275 "pos": token.pos_,
276 "tag": token.tag_,
277 "lemma": token.lemma_,
278 "is_stop": token.is_stop,
279 }
280 )
281 return pos_tags
283 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]:
284 """Get dependency parse information with filtering.
286 Filters out:
287 - Whitespace tokens (is_space=True)
288 - Punctuation tokens (is_punct=True)
289 - Symbol-only tokens without alphanumeric content
290 - Children that are punctuation or meaningless symbols
292 Args:
293 doc: spaCy document
295 Returns:
296 List of dependency dictionaries (excluding noise tokens)
297 """
298 dependencies = []
299 for token in doc:
300 # Skip whitespace and punctuation tokens
301 if token.is_space or token.is_punct:
302 continue
304 # Skip tokens with no meaningful content (e.g., ---, ...)
305 if not is_meaningful_text(token.text):
306 continue
308 # Filter children to only include meaningful tokens
309 meaningful_children = [
310 child.text
311 for child in token.children
312 if not child.is_space
313 and not child.is_punct
314 and is_meaningful_text(child.text)
315 ]
317 dependencies.append(
318 {
319 "text": token.text,
320 "dep": token.dep_,
321 "head": token.head.text,
322 "head_pos": token.head.pos_,
323 "children": meaningful_children,
324 }
325 )
326 return dependencies
328 def _extract_topics(self, text: str) -> list[dict[str, Any]]:
329 """Extract topics using LDA.
331 Args:
332 text: Text to analyze
334 Returns:
335 List of topic dictionaries
336 """
337 try:
338 # Preprocess text
339 processed_text = preprocess_string(text)
341 # Skip topic extraction for very short texts
342 if len(processed_text) < 5:
343 self.logger.debug("Text too short for topic extraction")
344 return [
345 {
346 "id": 0,
347 "terms": [{"term": "general", "weight": 1.0}],
348 "coherence": 0.5,
349 }
350 ]
352 # If we have existing models, use and update them
353 if self.dictionary is not None and self.lda_model is not None:
354 # Add new documents to existing dictionary
355 self.dictionary.add_documents([processed_text])
357 # Create corpus for the new text
358 corpus = [self.dictionary.doc2bow(processed_text)]
360 # Update existing LDA model
361 self.lda_model.update(corpus)
363 # Use the updated model for topic extraction
364 current_lda_model = self.lda_model
365 else:
366 # Create fresh models for first use or when models aren't available
367 temp_dictionary = corpora.Dictionary([processed_text])
368 corpus = [temp_dictionary.doc2bow(processed_text)]
370 # Create a fresh LDA model for this specific text
371 current_lda_model = LdaModel(
372 corpus,
373 num_topics=min(
374 self.num_topics, len(processed_text) // 2
375 ), # Ensure reasonable topic count
376 passes=self.passes,
377 id2word=temp_dictionary,
378 random_state=42, # For reproducibility
379 alpha=0.1, # Fixed positive value for document-topic density
380 eta=0.01, # Fixed positive value for topic-word density
381 )
383 # Get topics
384 topics = []
385 for topic_id, topic in current_lda_model.print_topics():
386 # Parse topic terms
387 terms = []
388 for term in topic.split("+"):
389 try:
390 weight, word = term.strip().split("*")
391 terms.append({"term": word.strip('"'), "weight": float(weight)})
392 except ValueError:
393 # Skip malformed terms
394 continue
396 topics.append(
397 {
398 "id": topic_id,
399 "terms": terms,
400 "coherence": self._calculate_topic_coherence(terms),
401 }
402 )
404 return (
405 topics
406 if topics
407 else [
408 {
409 "id": 0,
410 "terms": [{"term": "general", "weight": 1.0}],
411 "coherence": 0.5,
412 }
413 ]
414 )
416 except Exception as e:
417 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True)
418 # Return fallback topic
419 return [
420 {
421 "id": 0,
422 "terms": [{"term": "general", "weight": 1.0}],
423 "coherence": 0.5,
424 }
425 ]
427 def _extract_key_phrases(self, doc: Doc) -> list[str]:
428 """Extract key phrases from text.
430 Args:
431 doc: spaCy document
433 Returns:
434 List of key phrases
435 """
436 key_phrases = []
438 # Extract noun phrases
439 for chunk in doc.noun_chunks:
440 if len(chunk.text.split()) >= 2: # Only multi-word phrases
441 key_phrases.append(chunk.text)
443 # Extract named entities
444 for ent in doc.ents:
445 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]:
446 key_phrases.append(ent.text)
448 return list(set(key_phrases)) # Remove duplicates
450 def _calculate_document_similarity(
451 self, text: str, doc_id: str | None = None
452 ) -> dict[str, float]:
453 """Calculate similarity with other processed documents.
455 Args:
456 text: Text to compare
457 doc_id: Optional current document ID to exclude from results
459 Returns:
460 Dictionary of document similarities
461 """
462 similarities = {}
463 skipped_ids = {doc_id} if doc_id else set()
465 doc = self.nlp(text)
467 # Check if the model has word vectors
468 has_vectors = self.nlp.vocab.vectors_length > 0
470 with self._doc_cache_lock:
471 cached_items = list(self._doc_cache.items())
473 for cache_key, cached_result in cached_items:
474 cached_doc_id = cache_key[0] if isinstance(cache_key, tuple) else cache_key
475 if cached_doc_id is None or cached_doc_id in skipped_ids:
476 continue
478 # Check if cached_result has entities and the first entity has context
479 if not cached_result.entities or not cached_result.entities[0].get(
480 "context"
481 ):
482 continue
484 cached_doc = self.nlp(cached_result.entities[0]["context"])
486 if has_vectors:
487 # Use spaCy's built-in similarity which uses word vectors
488 similarity = doc.similarity(cached_doc)
489 else:
490 # Use alternative similarity calculation for models without word vectors
491 # This avoids the spaCy warning about missing word vectors
492 similarity = self._calculate_alternative_similarity(doc, cached_doc)
494 similarities[cached_doc_id] = float(similarity)
495 skipped_ids.add(cached_doc_id)
497 return similarities
499 def _calculate_alternative_similarity(self, doc1: Doc, doc2: Doc) -> float:
500 """Calculate similarity for models without word vectors.
502 Uses token overlap and shared entities as similarity metrics.
504 Args:
505 doc1: First document
506 doc2: Second document
508 Returns:
509 Similarity score between 0 and 1
510 """
511 # Extract lemmatized tokens (excluding stop words and punctuation)
512 tokens1 = {
513 token.lemma_.lower()
514 for token in doc1
515 if not token.is_stop and not token.is_punct and token.is_alpha
516 }
517 tokens2 = {
518 token.lemma_.lower()
519 for token in doc2
520 if not token.is_stop and not token.is_punct and token.is_alpha
521 }
523 # Calculate token overlap (Jaccard similarity)
524 if not tokens1 and not tokens2:
525 return 1.0 # Both empty
526 if not tokens1 or not tokens2:
527 return 0.0 # One empty
529 intersection = len(tokens1.intersection(tokens2))
530 union = len(tokens1.union(tokens2))
531 token_similarity = intersection / union if union > 0 else 0.0
533 # Extract named entities
534 entities1 = {ent.text.lower() for ent in doc1.ents}
535 entities2 = {ent.text.lower() for ent in doc2.ents}
537 # Calculate entity overlap
538 entity_similarity = 0.0
539 if entities1 or entities2:
540 entity_intersection = len(entities1.intersection(entities2))
541 entity_union = len(entities1.union(entities2))
542 entity_similarity = (
543 entity_intersection / entity_union if entity_union > 0 else 0.0
544 )
546 # Combine token and entity similarities (weighted average)
547 # Token similarity gets more weight as it's more comprehensive
548 combined_similarity = 0.7 * token_similarity + 0.3 * entity_similarity
550 return combined_similarity
552 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float:
553 """Calculate topic coherence score.
555 Args:
556 terms: List of topic terms with weights
558 Returns:
559 Coherence score between 0 and 1
560 """
561 # Simple coherence based on term weights
562 weights = [term["weight"] for term in terms]
563 return sum(weights) / len(weights) if weights else 0.0
565 def clear_cache(self):
566 """Clear the document cache and release all resources."""
567 # Clear document cache
568 with self._doc_cache_lock:
569 self._doc_cache.clear()
571 # Release LDA model resources
572 if hasattr(self, "lda_model") and self.lda_model is not None:
573 try:
574 # Clear LDA model
575 self.lda_model = None
576 except Exception as e:
577 logger.warning(f"Error releasing LDA model: {e}")
579 # Release dictionary
580 if hasattr(self, "dictionary") and self.dictionary is not None:
581 try:
582 self.dictionary = None
583 except Exception as e:
584 logger.warning(f"Error releasing dictionary: {e}")
586 # Release spaCy model resources
587 if hasattr(self, "nlp") and self.nlp is not None:
588 try:
589 # Clear spaCy caches and release memory
590 if hasattr(self.nlp, "vocab") and hasattr(self.nlp.vocab, "strings"):
591 # Try different methods to clear spaCy caches
592 if hasattr(self.nlp.vocab.strings, "_map") and hasattr(
593 self.nlp.vocab.strings._map, "clear"
594 ):
595 self.nlp.vocab.strings._map.clear()
596 elif hasattr(self.nlp.vocab.strings, "clear"):
597 self.nlp.vocab.strings.clear()
598 # Additional cleanup for different spaCy versions
599 if hasattr(self.nlp.vocab, "_vectors") and hasattr(
600 self.nlp.vocab._vectors, "clear"
601 ):
602 self.nlp.vocab._vectors.clear()
603 # Note: We don't set nlp to None as it might be needed for other operations
604 # but we clear its internal caches
605 except Exception as e:
606 logger.debug(f"spaCy cache clearing skipped (version-specific): {e}")
608 logger.debug("Semantic analyzer resources cleared")
610 def shutdown(self):
611 """Shutdown the semantic analyzer and release all resources.
613 This method should be called when the analyzer is no longer needed
614 to ensure proper cleanup of all resources.
615 """
616 self.clear_cache()
618 # More aggressive cleanup for shutdown
619 if hasattr(self, "nlp"):
620 try:
621 # Release the spaCy model completely
622 del self.nlp
623 except Exception as e:
624 logger.warning(f"Error releasing spaCy model: {e}")
626 logger.debug("Semantic analyzer shutdown completed")