Coverage for src/qdrant_loader/core/text_processing/semantic_analyzer.py: 87%
174 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Semantic analysis module for text processing."""
3import logging
4from dataclasses import dataclass
5from typing import Any
7import spacy
8from gensim import corpora
9from gensim.models import LdaModel
10from gensim.parsing.preprocessing import preprocess_string
11from spacy.cli.download import download as spacy_download
12from spacy.tokens import Doc
14logger = logging.getLogger(__name__)
17@dataclass
18class SemanticAnalysisResult:
19 """Container for semantic analysis results."""
21 entities: list[dict[str, Any]]
22 pos_tags: list[dict[str, Any]]
23 dependencies: list[dict[str, Any]]
24 topics: list[dict[str, Any]]
25 key_phrases: list[str]
26 document_similarity: dict[str, float]
29class SemanticAnalyzer:
30 """Advanced semantic analysis for text processing."""
32 def __init__(
33 self,
34 spacy_model: str = "en_core_web_md",
35 num_topics: int = 5,
36 passes: int = 10,
37 min_topic_freq: int = 2,
38 ):
39 """Initialize the semantic analyzer.
41 Args:
42 spacy_model: Name of the spaCy model to use
43 num_topics: Number of topics for LDA
44 passes: Number of passes for LDA training
45 min_topic_freq: Minimum frequency for topic terms
46 """
47 self.logger = logging.getLogger(__name__)
49 # Initialize spaCy
50 try:
51 self.nlp = spacy.load(spacy_model)
52 except OSError:
53 self.logger.info(f"Downloading spaCy model {spacy_model}...")
54 spacy_download(spacy_model)
55 self.nlp = spacy.load(spacy_model)
57 # Initialize LDA parameters
58 self.num_topics = num_topics
59 self.passes = passes
60 self.min_topic_freq = min_topic_freq
62 # Initialize LDA model
63 self.lda_model = None
64 self.dictionary = None
66 # Cache for processed documents
67 self._doc_cache = {}
69 def analyze_text(
70 self, text: str, doc_id: str | None = None
71 ) -> SemanticAnalysisResult:
72 """Perform comprehensive semantic analysis on text.
74 Args:
75 text: Text to analyze
76 doc_id: Optional document ID for caching
78 Returns:
79 SemanticAnalysisResult containing all analysis results
80 """
81 # Check cache
82 if doc_id and doc_id in self._doc_cache:
83 return self._doc_cache[doc_id]
85 # Process with spaCy
86 doc = self.nlp(text)
88 # Extract entities with linking
89 entities = self._extract_entities(doc)
91 # Get part-of-speech tags
92 pos_tags = self._get_pos_tags(doc)
94 # Get dependency parse
95 dependencies = self._get_dependencies(doc)
97 # Extract topics
98 topics = self._extract_topics(text)
100 # Extract key phrases
101 key_phrases = self._extract_key_phrases(doc)
103 # Calculate document similarity
104 doc_similarity = self._calculate_document_similarity(text)
106 # Create result
107 result = SemanticAnalysisResult(
108 entities=entities,
109 pos_tags=pos_tags,
110 dependencies=dependencies,
111 topics=topics,
112 key_phrases=key_phrases,
113 document_similarity=doc_similarity,
114 )
116 # Cache result
117 if doc_id:
118 self._doc_cache[doc_id] = result
120 return result
122 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]:
123 """Extract named entities with linking.
125 Args:
126 doc: spaCy document
128 Returns:
129 List of entity dictionaries with linking information
130 """
131 entities = []
132 for ent in doc.ents:
133 # Get entity context
134 start_sent = ent.sent.start
135 end_sent = ent.sent.end
136 context = doc[start_sent:end_sent].text
138 # Get entity description
139 description = self.nlp.vocab.strings[ent.label_]
141 # Get related entities
142 related = []
143 for token in ent.sent:
144 if token.ent_type_ and token.text != ent.text:
145 related.append(
146 {
147 "text": token.text,
148 "type": token.ent_type_,
149 "relation": token.dep_,
150 }
151 )
153 entities.append(
154 {
155 "text": ent.text,
156 "label": ent.label_,
157 "start": ent.start_char,
158 "end": ent.end_char,
159 "description": description,
160 "context": context,
161 "related_entities": related,
162 }
163 )
165 return entities
167 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]:
168 """Get part-of-speech tags with detailed information.
170 Args:
171 doc: spaCy document
173 Returns:
174 List of POS tag dictionaries
175 """
176 pos_tags = []
177 for token in doc:
178 pos_tags.append(
179 {
180 "text": token.text,
181 "pos": token.pos_,
182 "tag": token.tag_,
183 "lemma": token.lemma_,
184 "is_stop": token.is_stop,
185 "is_punct": token.is_punct,
186 "is_space": token.is_space,
187 }
188 )
189 return pos_tags
191 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]:
192 """Get dependency parse information.
194 Args:
195 doc: spaCy document
197 Returns:
198 List of dependency dictionaries
199 """
200 dependencies = []
201 for token in doc:
202 dependencies.append(
203 {
204 "text": token.text,
205 "dep": token.dep_,
206 "head": token.head.text,
207 "head_pos": token.head.pos_,
208 "children": [child.text for child in token.children],
209 }
210 )
211 return dependencies
213 def _extract_topics(self, text: str) -> list[dict[str, Any]]:
214 """Extract topics using LDA.
216 Args:
217 text: Text to analyze
219 Returns:
220 List of topic dictionaries
221 """
222 try:
223 # Preprocess text
224 processed_text = preprocess_string(text)
226 # Skip topic extraction for very short texts
227 if len(processed_text) < 5:
228 self.logger.debug("Text too short for topic extraction")
229 return [
230 {
231 "id": 0,
232 "terms": [{"term": "general", "weight": 1.0}],
233 "coherence": 0.5,
234 }
235 ]
237 # If we have existing models, use and update them
238 if self.dictionary is not None and self.lda_model is not None:
239 # Add new documents to existing dictionary
240 self.dictionary.add_documents([processed_text])
242 # Create corpus for the new text
243 corpus = [self.dictionary.doc2bow(processed_text)]
245 # Update existing LDA model
246 self.lda_model.update(corpus)
248 # Use the updated model for topic extraction
249 current_lda_model = self.lda_model
250 else:
251 # Create fresh models for first use or when models aren't available
252 temp_dictionary = corpora.Dictionary([processed_text])
253 corpus = [temp_dictionary.doc2bow(processed_text)]
255 # Create a fresh LDA model for this specific text
256 current_lda_model = LdaModel(
257 corpus,
258 num_topics=min(
259 self.num_topics, len(processed_text) // 2
260 ), # Ensure reasonable topic count
261 passes=self.passes,
262 id2word=temp_dictionary,
263 random_state=42, # For reproducibility
264 alpha=0.1, # Fixed positive value for document-topic density
265 eta=0.01, # Fixed positive value for topic-word density
266 )
268 # Get topics
269 topics = []
270 for topic_id, topic in current_lda_model.print_topics():
271 # Parse topic terms
272 terms = []
273 for term in topic.split("+"):
274 try:
275 weight, word = term.strip().split("*")
276 terms.append({"term": word.strip('"'), "weight": float(weight)})
277 except ValueError:
278 # Skip malformed terms
279 continue
281 topics.append(
282 {
283 "id": topic_id,
284 "terms": terms,
285 "coherence": self._calculate_topic_coherence(terms),
286 }
287 )
289 return (
290 topics
291 if topics
292 else [
293 {
294 "id": 0,
295 "terms": [{"term": "general", "weight": 1.0}],
296 "coherence": 0.5,
297 }
298 ]
299 )
301 except Exception as e:
302 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True)
303 # Return fallback topic
304 return [
305 {
306 "id": 0,
307 "terms": [{"term": "general", "weight": 1.0}],
308 "coherence": 0.5,
309 }
310 ]
312 def _extract_key_phrases(self, doc: Doc) -> list[str]:
313 """Extract key phrases from text.
315 Args:
316 doc: spaCy document
318 Returns:
319 List of key phrases
320 """
321 key_phrases = []
323 # Extract noun phrases
324 for chunk in doc.noun_chunks:
325 if len(chunk.text.split()) >= 2: # Only multi-word phrases
326 key_phrases.append(chunk.text)
328 # Extract named entities
329 for ent in doc.ents:
330 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]:
331 key_phrases.append(ent.text)
333 return list(set(key_phrases)) # Remove duplicates
335 def _calculate_document_similarity(self, text: str) -> dict[str, float]:
336 """Calculate similarity with other processed documents.
338 Args:
339 text: Text to compare
341 Returns:
342 Dictionary of document similarities
343 """
344 similarities = {}
345 doc = self.nlp(text)
347 # Check if the model has word vectors
348 has_vectors = self.nlp.vocab.vectors_length > 0
350 for doc_id, cached_result in self._doc_cache.items():
351 # Check if cached_result has entities and the first entity has context
352 if not cached_result.entities or not cached_result.entities[0].get(
353 "context"
354 ):
355 continue
357 cached_doc = self.nlp(cached_result.entities[0]["context"])
359 if has_vectors:
360 # Use spaCy's built-in similarity which uses word vectors
361 similarity = doc.similarity(cached_doc)
362 else:
363 # Use alternative similarity calculation for models without word vectors
364 # This avoids the spaCy warning about missing word vectors
365 similarity = self._calculate_alternative_similarity(doc, cached_doc)
367 similarities[doc_id] = float(similarity)
369 return similarities
371 def _calculate_alternative_similarity(self, doc1: Doc, doc2: Doc) -> float:
372 """Calculate similarity for models without word vectors.
374 Uses token overlap and shared entities as similarity metrics.
376 Args:
377 doc1: First document
378 doc2: Second document
380 Returns:
381 Similarity score between 0 and 1
382 """
383 # Extract lemmatized tokens (excluding stop words and punctuation)
384 tokens1 = {
385 token.lemma_.lower()
386 for token in doc1
387 if not token.is_stop and not token.is_punct and token.is_alpha
388 }
389 tokens2 = {
390 token.lemma_.lower()
391 for token in doc2
392 if not token.is_stop and not token.is_punct and token.is_alpha
393 }
395 # Calculate token overlap (Jaccard similarity)
396 if not tokens1 and not tokens2:
397 return 1.0 # Both empty
398 if not tokens1 or not tokens2:
399 return 0.0 # One empty
401 intersection = len(tokens1.intersection(tokens2))
402 union = len(tokens1.union(tokens2))
403 token_similarity = intersection / union if union > 0 else 0.0
405 # Extract named entities
406 entities1 = {ent.text.lower() for ent in doc1.ents}
407 entities2 = {ent.text.lower() for ent in doc2.ents}
409 # Calculate entity overlap
410 entity_similarity = 0.0
411 if entities1 or entities2:
412 entity_intersection = len(entities1.intersection(entities2))
413 entity_union = len(entities1.union(entities2))
414 entity_similarity = (
415 entity_intersection / entity_union if entity_union > 0 else 0.0
416 )
418 # Combine token and entity similarities (weighted average)
419 # Token similarity gets more weight as it's more comprehensive
420 combined_similarity = 0.7 * token_similarity + 0.3 * entity_similarity
422 return combined_similarity
424 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float:
425 """Calculate topic coherence score.
427 Args:
428 terms: List of topic terms with weights
430 Returns:
431 Coherence score between 0 and 1
432 """
433 # Simple coherence based on term weights
434 weights = [term["weight"] for term in terms]
435 return sum(weights) / len(weights) if weights else 0.0
437 def clear_cache(self):
438 """Clear the document cache and release all resources."""
439 # Clear document cache
440 self._doc_cache.clear()
442 # Release LDA model resources
443 if hasattr(self, "lda_model") and self.lda_model is not None:
444 try:
445 # Clear LDA model
446 self.lda_model = None
447 except Exception as e:
448 logger.warning(f"Error releasing LDA model: {e}")
450 # Release dictionary
451 if hasattr(self, "dictionary") and self.dictionary is not None:
452 try:
453 self.dictionary = None
454 except Exception as e:
455 logger.warning(f"Error releasing dictionary: {e}")
457 # Release spaCy model resources
458 if hasattr(self, "nlp") and self.nlp is not None:
459 try:
460 # Clear spaCy caches and release memory
461 if hasattr(self.nlp, "vocab") and hasattr(self.nlp.vocab, "strings"):
462 # Try different methods to clear spaCy caches
463 if hasattr(self.nlp.vocab.strings, "_map") and hasattr(
464 self.nlp.vocab.strings._map, "clear"
465 ):
466 self.nlp.vocab.strings._map.clear()
467 elif hasattr(self.nlp.vocab.strings, "clear"):
468 self.nlp.vocab.strings.clear()
469 # Additional cleanup for different spaCy versions
470 if hasattr(self.nlp.vocab, "_vectors") and hasattr(
471 self.nlp.vocab._vectors, "clear"
472 ):
473 self.nlp.vocab._vectors.clear()
474 # Note: We don't set nlp to None as it might be needed for other operations
475 # but we clear its internal caches
476 except Exception as e:
477 logger.debug(f"spaCy cache clearing skipped (version-specific): {e}")
479 logger.debug("Semantic analyzer resources cleared")
481 def shutdown(self):
482 """Shutdown the semantic analyzer and release all resources.
484 This method should be called when the analyzer is no longer needed
485 to ensure proper cleanup of all resources.
486 """
487 self.clear_cache()
489 # More aggressive cleanup for shutdown
490 if hasattr(self, "nlp"):
491 try:
492 # Release the spaCy model completely
493 del self.nlp
494 except Exception as e:
495 logger.warning(f"Error releasing spaCy model: {e}")
497 logger.debug("Semantic analyzer shutdown completed")