Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Topic modeling module for document analysis.""" 

2 

3from typing import Any 

4 

5import spacy 

6from gensim import corpora, models 

7from qdrant_loader.utils.logging import LoggingConfig 

8from spacy.cli.download import download 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class TopicModeler: 

14 """Handles batched LDA topic modeling for document analysis.""" 

15 

16 def __init__( 

17 self, num_topics: int = 3, passes: int = 10, spacy_model: str = "en_core_web_md" 

18 ): 

19 """Initialize the topic modeler. 

20 

21 Args: 

22 num_topics: Number of topics to extract 

23 passes: Number of passes for LDA training 

24 spacy_model: spaCy model to use for text preprocessing 

25 """ 

26 self.num_topics = num_topics 

27 self.passes = passes 

28 self.spacy_model = spacy_model 

29 self.dictionary = None 

30 self.lda_model = None 

31 self._cached_topics = {} # Cache for topic inference results 

32 self._processed_texts = set() # Track processed texts 

33 

34 # Initialize spaCy for text preprocessing 

35 try: 

36 self.nlp = spacy.load(spacy_model) 

37 except OSError: 

38 logger.info(f"Downloading spaCy model {spacy_model}...") 

39 download(spacy_model) 

40 self.nlp = spacy.load(spacy_model) 

41 

42 def _preprocess_text(self, text: str) -> list[str]: 

43 """Preprocess text for topic modeling. 

44 

45 Args: 

46 text: Input text 

47 

48 Returns: 

49 List of preprocessed tokens 

50 """ 

51 # Check if text is too short for meaningful topic modeling 

52 if len(text.split()) < 5: 

53 return [] 

54 

55 doc = self.nlp(text) 

56 return [ 

57 token.text.lower() 

58 for token in doc 

59 if not token.is_stop and not token.is_punct 

60 ] 

61 

62 def train_model(self, texts: list[str]) -> None: 

63 """Train LDA model on a batch of texts. 

64 

65 Args: 

66 texts: List of texts to train on 

67 """ 

68 if not texts: 

69 logger.warning("No texts provided for training") 

70 return 

71 

72 # Filter out short texts and already processed ones 

73 new_texts = [ 

74 text 

75 for text in texts 

76 if text not in self._processed_texts and len(text.split()) >= 5 

77 ] 

78 if not new_texts: 

79 logger.info("No new texts to process") 

80 return 

81 

82 # Preprocess all texts 

83 processed_texts = [self._preprocess_text(text) for text in new_texts] 

84 processed_texts = [ 

85 text for text in processed_texts if text 

86 ] # Remove empty texts 

87 

88 if not processed_texts: 

89 logger.warning("No valid texts after preprocessing") 

90 return 

91 

92 # Create or update dictionary 

93 if self.dictionary is None: 

94 self.dictionary = corpora.Dictionary(processed_texts) 

95 else: 

96 self.dictionary.add_documents(processed_texts) 

97 

98 # Create document-term matrix 

99 corpus = [self.dictionary.doc2bow(text) for text in processed_texts] 

100 

101 # Train LDA model with optimized settings 

102 if self.lda_model is None: 

103 self.lda_model = models.LdaModel( 

104 corpus, 

105 num_topics=self.num_topics, 

106 id2word=self.dictionary, 

107 passes=self.passes, 

108 chunksize=2000, 

109 update_every=1, 

110 alpha=0.1, # Fixed positive value for document-topic density 

111 eta=0.01, # Fixed positive value for topic-word density 

112 decay=0.5, 

113 offset=1.0, 

114 eval_every=10, 

115 iterations=50, 

116 gamma_threshold=0.001, 

117 minimum_probability=0.01, 

118 ) 

119 else: 

120 # Update existing model 

121 self.lda_model.update(corpus) 

122 

123 # Update processed texts set 

124 self._processed_texts.update(new_texts) 

125 

126 # Clear cache when model is updated 

127 self._cached_topics.clear() 

128 

129 logger.info( 

130 "Trained/Updated LDA model", 

131 num_documents=len(new_texts), 

132 num_topics=self.num_topics, 

133 dictionary_size=len(self.dictionary), 

134 ) 

135 

136 def infer_topics(self, text: str) -> dict[str, Any]: 

137 """Infer topics for a single text using the trained model. 

138 

139 Args: 

140 text: Text to analyze 

141 

142 Returns: 

143 Dictionary containing topic analysis results 

144 """ 

145 if not self.lda_model or not self.dictionary: 

146 logger.warning("LDA model not trained") 

147 return {"topics": [], "coherence": 0.0} 

148 

149 # Check cache first 

150 if text in self._cached_topics: 

151 return self._cached_topics[text] 

152 

153 try: 

154 # Preprocess text 

155 tokens = self._preprocess_text(text) 

156 if not tokens: 

157 logger.debug("No tokens found for topic analysis") 

158 return {"topics": [], "coherence": 0.0} 

159 

160 # Create document-term matrix 

161 doc_bow = self.dictionary.doc2bow(tokens) 

162 

163 # Get topic distribution for this document 

164 doc_topics = self.lda_model[doc_bow] 

165 

166 # Get topics without coherence calculation for speed 

167 topics = self.lda_model.print_topics(num_words=5) 

168 

169 # Calculate topic coherence only for longer texts 

170 coherence = 0.0 

171 if len(tokens) > 20: # Increased threshold for coherence calculation 

172 try: 

173 coherence_model = models.CoherenceModel( 

174 model=self.lda_model, 

175 texts=[tokens], 

176 dictionary=self.dictionary, 

177 coherence="c_v", 

178 processes=1, # Use single process for stability 

179 ) 

180 coherence = coherence_model.get_coherence() 

181 except Exception as e: 

182 logger.warning("Failed to calculate coherence", error=str(e)) 

183 

184 result = { 

185 "topics": topics, 

186 "doc_topics": doc_topics, 

187 "coherence": coherence, 

188 } 

189 

190 # Cache the result 

191 self._cached_topics[text] = result 

192 

193 return result 

194 

195 except Exception as e: 

196 logger.error( 

197 "Error in topic inference", 

198 error=str(e), 

199 error_type=type(e).__name__, 

200 text_length=len(text), 

201 ) 

202 return {"topics": [], "coherence": 0.0}