Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%
75 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Topic modeling module for document analysis."""
3from typing import Any
5import spacy
6from gensim import corpora, models
7from spacy.cli.download import download
9from qdrant_loader.utils.logging import LoggingConfig
11logger = LoggingConfig.get_logger(__name__)
14class TopicModeler:
15 """Handles batched LDA topic modeling for document analysis."""
17 def __init__(self, num_topics: int = 3, passes: int = 10):
18 """Initialize the topic modeler.
20 Args:
21 num_topics: Number of topics to extract
22 passes: Number of passes for LDA training
23 """
24 self.num_topics = num_topics
25 self.passes = passes
26 self.dictionary = None
27 self.lda_model = None
28 self._cached_topics = {} # Cache for topic inference results
29 self._processed_texts = set() # Track processed texts
31 # Initialize spaCy for text preprocessing
32 try:
33 self.nlp = spacy.load("en_core_web_sm")
34 except OSError:
35 logger.info("Downloading spaCy model...")
36 download("en_core_web_sm")
37 self.nlp = spacy.load("en_core_web_sm")
39 def _preprocess_text(self, text: str) -> list[str]:
40 """Preprocess text for topic modeling.
42 Args:
43 text: Input text
45 Returns:
46 List of preprocessed tokens
47 """
48 # Check if text is too short for meaningful topic modeling
49 if len(text.split()) < 5:
50 return []
52 doc = self.nlp(text)
53 return [
54 token.text.lower()
55 for token in doc
56 if not token.is_stop and not token.is_punct
57 ]
59 def train_model(self, texts: list[str]) -> None:
60 """Train LDA model on a batch of texts.
62 Args:
63 texts: List of texts to train on
64 """
65 if not texts:
66 logger.warning("No texts provided for training")
67 return
69 # Filter out short texts and already processed ones
70 new_texts = [
71 text
72 for text in texts
73 if text not in self._processed_texts and len(text.split()) >= 5
74 ]
75 if not new_texts:
76 logger.info("No new texts to process")
77 return
79 # Preprocess all texts
80 processed_texts = [self._preprocess_text(text) for text in new_texts]
81 processed_texts = [
82 text for text in processed_texts if text
83 ] # Remove empty texts
85 if not processed_texts:
86 logger.warning("No valid texts after preprocessing")
87 return
89 # Create or update dictionary
90 if self.dictionary is None:
91 self.dictionary = corpora.Dictionary(processed_texts)
92 else:
93 self.dictionary.add_documents(processed_texts)
95 # Create document-term matrix
96 corpus = [self.dictionary.doc2bow(text) for text in processed_texts]
98 # Train LDA model with optimized settings
99 if self.lda_model is None:
100 self.lda_model = models.LdaModel(
101 corpus,
102 num_topics=self.num_topics,
103 id2word=self.dictionary,
104 passes=self.passes,
105 chunksize=2000,
106 update_every=1,
107 alpha="auto",
108 eta="auto",
109 decay=0.5,
110 offset=1.0,
111 eval_every=10,
112 iterations=50,
113 gamma_threshold=0.001,
114 minimum_probability=0.01,
115 )
116 else:
117 # Update existing model
118 self.lda_model.update(corpus)
120 # Update processed texts set
121 self._processed_texts.update(new_texts)
123 # Clear cache when model is updated
124 self._cached_topics.clear()
126 logger.info(
127 "Trained/Updated LDA model",
128 num_documents=len(new_texts),
129 num_topics=self.num_topics,
130 dictionary_size=len(self.dictionary),
131 )
133 def infer_topics(self, text: str) -> dict[str, Any]:
134 """Infer topics for a single text using the trained model.
136 Args:
137 text: Text to analyze
139 Returns:
140 Dictionary containing topic analysis results
141 """
142 if not self.lda_model or not self.dictionary:
143 logger.warning("LDA model not trained")
144 return {"topics": [], "coherence": 0.0}
146 # Check cache first
147 if text in self._cached_topics:
148 return self._cached_topics[text]
150 try:
151 # Preprocess text
152 tokens = self._preprocess_text(text)
153 if not tokens:
154 logger.debug("No tokens found for topic analysis")
155 return {"topics": [], "coherence": 0.0}
157 # Create document-term matrix
158 doc_bow = self.dictionary.doc2bow(tokens)
160 # Get topic distribution for this document
161 doc_topics = self.lda_model[doc_bow]
163 # Get topics without coherence calculation for speed
164 topics = self.lda_model.print_topics(num_words=5)
166 # Calculate topic coherence only for longer texts
167 coherence = 0.0
168 if len(tokens) > 20: # Increased threshold for coherence calculation
169 try:
170 coherence_model = models.CoherenceModel(
171 model=self.lda_model,
172 texts=[tokens],
173 dictionary=self.dictionary,
174 coherence="c_v",
175 processes=1, # Use single process for stability
176 )
177 coherence = coherence_model.get_coherence()
178 except Exception as e:
179 logger.warning("Failed to calculate coherence", error=str(e))
181 result = {
182 "topics": topics,
183 "doc_topics": doc_topics,
184 "coherence": coherence,
185 }
187 # Cache the result
188 self._cached_topics[text] = result
190 return result
192 except Exception as e:
193 logger.error(
194 "Error in topic inference",
195 error=str(e),
196 error_type=type(e).__name__,
197 text_length=len(text),
198 )
199 return {"topics": [], "coherence": 0.0}