Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%
76 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Topic modeling module for document analysis."""
3from typing import Any
5import spacy
6from gensim import corpora, models
7from qdrant_loader.utils.logging import LoggingConfig
8from spacy.cli.download import download
10logger = LoggingConfig.get_logger(__name__)
13class TopicModeler:
14 """Handles batched LDA topic modeling for document analysis."""
16 def __init__(self, num_topics: int = 3, passes: int = 10, spacy_model: str = "en_core_web_md"):
17 """Initialize the topic modeler.
19 Args:
20 num_topics: Number of topics to extract
21 passes: Number of passes for LDA training
22 spacy_model: spaCy model to use for text preprocessing
23 """
24 self.num_topics = num_topics
25 self.passes = passes
26 self.spacy_model = spacy_model
27 self.dictionary = None
28 self.lda_model = None
29 self._cached_topics = {} # Cache for topic inference results
30 self._processed_texts = set() # Track processed texts
32 # Initialize spaCy for text preprocessing
33 try:
34 self.nlp = spacy.load(spacy_model)
35 except OSError:
36 logger.info(f"Downloading spaCy model {spacy_model}...")
37 download(spacy_model)
38 self.nlp = spacy.load(spacy_model)
40 def _preprocess_text(self, text: str) -> list[str]:
41 """Preprocess text for topic modeling.
43 Args:
44 text: Input text
46 Returns:
47 List of preprocessed tokens
48 """
49 # Check if text is too short for meaningful topic modeling
50 if len(text.split()) < 5:
51 return []
53 doc = self.nlp(text)
54 return [
55 token.text.lower()
56 for token in doc
57 if not token.is_stop and not token.is_punct
58 ]
60 def train_model(self, texts: list[str]) -> None:
61 """Train LDA model on a batch of texts.
63 Args:
64 texts: List of texts to train on
65 """
66 if not texts:
67 logger.warning("No texts provided for training")
68 return
70 # Filter out short texts and already processed ones
71 new_texts = [
72 text
73 for text in texts
74 if text not in self._processed_texts and len(text.split()) >= 5
75 ]
76 if not new_texts:
77 logger.info("No new texts to process")
78 return
80 # Preprocess all texts
81 processed_texts = [self._preprocess_text(text) for text in new_texts]
82 processed_texts = [
83 text for text in processed_texts if text
84 ] # Remove empty texts
86 if not processed_texts:
87 logger.warning("No valid texts after preprocessing")
88 return
90 # Create or update dictionary
91 if self.dictionary is None:
92 self.dictionary = corpora.Dictionary(processed_texts)
93 else:
94 self.dictionary.add_documents(processed_texts)
96 # Create document-term matrix
97 corpus = [self.dictionary.doc2bow(text) for text in processed_texts]
99 # Train LDA model with optimized settings
100 if self.lda_model is None:
101 self.lda_model = models.LdaModel(
102 corpus,
103 num_topics=self.num_topics,
104 id2word=self.dictionary,
105 passes=self.passes,
106 chunksize=2000,
107 update_every=1,
108 alpha=0.1, # Fixed positive value for document-topic density
109 eta=0.01, # Fixed positive value for topic-word density
110 decay=0.5,
111 offset=1.0,
112 eval_every=10,
113 iterations=50,
114 gamma_threshold=0.001,
115 minimum_probability=0.01,
116 )
117 else:
118 # Update existing model
119 self.lda_model.update(corpus)
121 # Update processed texts set
122 self._processed_texts.update(new_texts)
124 # Clear cache when model is updated
125 self._cached_topics.clear()
127 logger.info(
128 "Trained/Updated LDA model",
129 num_documents=len(new_texts),
130 num_topics=self.num_topics,
131 dictionary_size=len(self.dictionary),
132 )
134 def infer_topics(self, text: str) -> dict[str, Any]:
135 """Infer topics for a single text using the trained model.
137 Args:
138 text: Text to analyze
140 Returns:
141 Dictionary containing topic analysis results
142 """
143 if not self.lda_model or not self.dictionary:
144 logger.warning("LDA model not trained")
145 return {"topics": [], "coherence": 0.0}
147 # Check cache first
148 if text in self._cached_topics:
149 return self._cached_topics[text]
151 try:
152 # Preprocess text
153 tokens = self._preprocess_text(text)
154 if not tokens:
155 logger.debug("No tokens found for topic analysis")
156 return {"topics": [], "coherence": 0.0}
158 # Create document-term matrix
159 doc_bow = self.dictionary.doc2bow(tokens)
161 # Get topic distribution for this document
162 doc_topics = self.lda_model[doc_bow]
164 # Get topics without coherence calculation for speed
165 topics = self.lda_model.print_topics(num_words=5)
167 # Calculate topic coherence only for longer texts
168 coherence = 0.0
169 if len(tokens) > 20: # Increased threshold for coherence calculation
170 try:
171 coherence_model = models.CoherenceModel(
172 model=self.lda_model,
173 texts=[tokens],
174 dictionary=self.dictionary,
175 coherence="c_v",
176 processes=1, # Use single process for stability
177 )
178 coherence = coherence_model.get_coherence()
179 except Exception as e:
180 logger.warning("Failed to calculate coherence", error=str(e))
182 result = {
183 "topics": topics,
184 "doc_topics": doc_topics,
185 "coherence": coherence,
186 }
188 # Cache the result
189 self._cached_topics[text] = result
191 return result
193 except Exception as e:
194 logger.error(
195 "Error in topic inference",
196 error=str(e),
197 error_type=type(e).__name__,
198 text_length=len(text),
199 )
200 return {"topics": [], "coherence": 0.0}