Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%
76 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Topic modeling module for document analysis."""
3from typing import Any
5import spacy
6from gensim import corpora, models
7from qdrant_loader.utils.logging import LoggingConfig
8from spacy.cli.download import download
10logger = LoggingConfig.get_logger(__name__)
13class TopicModeler:
14 """Handles batched LDA topic modeling for document analysis."""
16 def __init__(
17 self, num_topics: int = 3, passes: int = 10, spacy_model: str = "en_core_web_md"
18 ):
19 """Initialize the topic modeler.
21 Args:
22 num_topics: Number of topics to extract
23 passes: Number of passes for LDA training
24 spacy_model: spaCy model to use for text preprocessing
25 """
26 self.num_topics = num_topics
27 self.passes = passes
28 self.spacy_model = spacy_model
29 self.dictionary = None
30 self.lda_model = None
31 self._cached_topics = {} # Cache for topic inference results
32 self._processed_texts = set() # Track processed texts
34 # Initialize spaCy for text preprocessing
35 try:
36 self.nlp = spacy.load(spacy_model)
37 except OSError:
38 logger.info(f"Downloading spaCy model {spacy_model}...")
39 download(spacy_model)
40 self.nlp = spacy.load(spacy_model)
42 def _preprocess_text(self, text: str) -> list[str]:
43 """Preprocess text for topic modeling.
45 Args:
46 text: Input text
48 Returns:
49 List of preprocessed tokens
50 """
51 # Check if text is too short for meaningful topic modeling
52 if len(text.split()) < 5:
53 return []
55 doc = self.nlp(text)
56 return [
57 token.text.lower()
58 for token in doc
59 if not token.is_stop and not token.is_punct
60 ]
62 def train_model(self, texts: list[str]) -> None:
63 """Train LDA model on a batch of texts.
65 Args:
66 texts: List of texts to train on
67 """
68 if not texts:
69 logger.warning("No texts provided for training")
70 return
72 # Filter out short texts and already processed ones
73 new_texts = [
74 text
75 for text in texts
76 if text not in self._processed_texts and len(text.split()) >= 5
77 ]
78 if not new_texts:
79 logger.info("No new texts to process")
80 return
82 # Preprocess all texts
83 processed_texts = [self._preprocess_text(text) for text in new_texts]
84 processed_texts = [
85 text for text in processed_texts if text
86 ] # Remove empty texts
88 if not processed_texts:
89 logger.warning("No valid texts after preprocessing")
90 return
92 # Create or update dictionary
93 if self.dictionary is None:
94 self.dictionary = corpora.Dictionary(processed_texts)
95 else:
96 self.dictionary.add_documents(processed_texts)
98 # Create document-term matrix
99 corpus = [self.dictionary.doc2bow(text) for text in processed_texts]
101 # Train LDA model with optimized settings
102 if self.lda_model is None:
103 self.lda_model = models.LdaModel(
104 corpus,
105 num_topics=self.num_topics,
106 id2word=self.dictionary,
107 passes=self.passes,
108 chunksize=2000,
109 update_every=1,
110 alpha=0.1, # Fixed positive value for document-topic density
111 eta=0.01, # Fixed positive value for topic-word density
112 decay=0.5,
113 offset=1.0,
114 eval_every=10,
115 iterations=50,
116 gamma_threshold=0.001,
117 minimum_probability=0.01,
118 )
119 else:
120 # Update existing model
121 self.lda_model.update(corpus)
123 # Update processed texts set
124 self._processed_texts.update(new_texts)
126 # Clear cache when model is updated
127 self._cached_topics.clear()
129 logger.info(
130 "Trained/Updated LDA model",
131 num_documents=len(new_texts),
132 num_topics=self.num_topics,
133 dictionary_size=len(self.dictionary),
134 )
136 def infer_topics(self, text: str) -> dict[str, Any]:
137 """Infer topics for a single text using the trained model.
139 Args:
140 text: Text to analyze
142 Returns:
143 Dictionary containing topic analysis results
144 """
145 if not self.lda_model or not self.dictionary:
146 logger.warning("LDA model not trained")
147 return {"topics": [], "coherence": 0.0}
149 # Check cache first
150 if text in self._cached_topics:
151 return self._cached_topics[text]
153 try:
154 # Preprocess text
155 tokens = self._preprocess_text(text)
156 if not tokens:
157 logger.debug("No tokens found for topic analysis")
158 return {"topics": [], "coherence": 0.0}
160 # Create document-term matrix
161 doc_bow = self.dictionary.doc2bow(tokens)
163 # Get topic distribution for this document
164 doc_topics = self.lda_model[doc_bow]
166 # Get topics without coherence calculation for speed
167 topics = self.lda_model.print_topics(num_words=5)
169 # Calculate topic coherence only for longer texts
170 coherence = 0.0
171 if len(tokens) > 20: # Increased threshold for coherence calculation
172 try:
173 coherence_model = models.CoherenceModel(
174 model=self.lda_model,
175 texts=[tokens],
176 dictionary=self.dictionary,
177 coherence="c_v",
178 processes=1, # Use single process for stability
179 )
180 coherence = coherence_model.get_coherence()
181 except Exception as e:
182 logger.warning("Failed to calculate coherence", error=str(e))
184 result = {
185 "topics": topics,
186 "doc_topics": doc_topics,
187 "coherence": coherence,
188 }
190 # Cache the result
191 self._cached_topics[text] = result
193 return result
195 except Exception as e:
196 logger.error(
197 "Error in topic inference",
198 error=str(e),
199 error_type=type(e).__name__,
200 text_length=len(text),
201 )
202 return {"topics": [], "coherence": 0.0}