Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Topic modeling module for document analysis.""" 

2 

3from typing import Any 

4 

5import spacy 

6from gensim import corpora, models 

7from qdrant_loader.utils.logging import LoggingConfig 

8from spacy.cli.download import download 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class TopicModeler: 

14 """Handles batched LDA topic modeling for document analysis.""" 

15 

16 def __init__(self, num_topics: int = 3, passes: int = 10, spacy_model: str = "en_core_web_md"): 

17 """Initialize the topic modeler. 

18 

19 Args: 

20 num_topics: Number of topics to extract 

21 passes: Number of passes for LDA training 

22 spacy_model: spaCy model to use for text preprocessing 

23 """ 

24 self.num_topics = num_topics 

25 self.passes = passes 

26 self.spacy_model = spacy_model 

27 self.dictionary = None 

28 self.lda_model = None 

29 self._cached_topics = {} # Cache for topic inference results 

30 self._processed_texts = set() # Track processed texts 

31 

32 # Initialize spaCy for text preprocessing 

33 try: 

34 self.nlp = spacy.load(spacy_model) 

35 except OSError: 

36 logger.info(f"Downloading spaCy model {spacy_model}...") 

37 download(spacy_model) 

38 self.nlp = spacy.load(spacy_model) 

39 

40 def _preprocess_text(self, text: str) -> list[str]: 

41 """Preprocess text for topic modeling. 

42 

43 Args: 

44 text: Input text 

45 

46 Returns: 

47 List of preprocessed tokens 

48 """ 

49 # Check if text is too short for meaningful topic modeling 

50 if len(text.split()) < 5: 

51 return [] 

52 

53 doc = self.nlp(text) 

54 return [ 

55 token.text.lower() 

56 for token in doc 

57 if not token.is_stop and not token.is_punct 

58 ] 

59 

60 def train_model(self, texts: list[str]) -> None: 

61 """Train LDA model on a batch of texts. 

62 

63 Args: 

64 texts: List of texts to train on 

65 """ 

66 if not texts: 

67 logger.warning("No texts provided for training") 

68 return 

69 

70 # Filter out short texts and already processed ones 

71 new_texts = [ 

72 text 

73 for text in texts 

74 if text not in self._processed_texts and len(text.split()) >= 5 

75 ] 

76 if not new_texts: 

77 logger.info("No new texts to process") 

78 return 

79 

80 # Preprocess all texts 

81 processed_texts = [self._preprocess_text(text) for text in new_texts] 

82 processed_texts = [ 

83 text for text in processed_texts if text 

84 ] # Remove empty texts 

85 

86 if not processed_texts: 

87 logger.warning("No valid texts after preprocessing") 

88 return 

89 

90 # Create or update dictionary 

91 if self.dictionary is None: 

92 self.dictionary = corpora.Dictionary(processed_texts) 

93 else: 

94 self.dictionary.add_documents(processed_texts) 

95 

96 # Create document-term matrix 

97 corpus = [self.dictionary.doc2bow(text) for text in processed_texts] 

98 

99 # Train LDA model with optimized settings 

100 if self.lda_model is None: 

101 self.lda_model = models.LdaModel( 

102 corpus, 

103 num_topics=self.num_topics, 

104 id2word=self.dictionary, 

105 passes=self.passes, 

106 chunksize=2000, 

107 update_every=1, 

108 alpha=0.1, # Fixed positive value for document-topic density 

109 eta=0.01, # Fixed positive value for topic-word density 

110 decay=0.5, 

111 offset=1.0, 

112 eval_every=10, 

113 iterations=50, 

114 gamma_threshold=0.001, 

115 minimum_probability=0.01, 

116 ) 

117 else: 

118 # Update existing model 

119 self.lda_model.update(corpus) 

120 

121 # Update processed texts set 

122 self._processed_texts.update(new_texts) 

123 

124 # Clear cache when model is updated 

125 self._cached_topics.clear() 

126 

127 logger.info( 

128 "Trained/Updated LDA model", 

129 num_documents=len(new_texts), 

130 num_topics=self.num_topics, 

131 dictionary_size=len(self.dictionary), 

132 ) 

133 

134 def infer_topics(self, text: str) -> dict[str, Any]: 

135 """Infer topics for a single text using the trained model. 

136 

137 Args: 

138 text: Text to analyze 

139 

140 Returns: 

141 Dictionary containing topic analysis results 

142 """ 

143 if not self.lda_model or not self.dictionary: 

144 logger.warning("LDA model not trained") 

145 return {"topics": [], "coherence": 0.0} 

146 

147 # Check cache first 

148 if text in self._cached_topics: 

149 return self._cached_topics[text] 

150 

151 try: 

152 # Preprocess text 

153 tokens = self._preprocess_text(text) 

154 if not tokens: 

155 logger.debug("No tokens found for topic analysis") 

156 return {"topics": [], "coherence": 0.0} 

157 

158 # Create document-term matrix 

159 doc_bow = self.dictionary.doc2bow(tokens) 

160 

161 # Get topic distribution for this document 

162 doc_topics = self.lda_model[doc_bow] 

163 

164 # Get topics without coherence calculation for speed 

165 topics = self.lda_model.print_topics(num_words=5) 

166 

167 # Calculate topic coherence only for longer texts 

168 coherence = 0.0 

169 if len(tokens) > 20: # Increased threshold for coherence calculation 

170 try: 

171 coherence_model = models.CoherenceModel( 

172 model=self.lda_model, 

173 texts=[tokens], 

174 dictionary=self.dictionary, 

175 coherence="c_v", 

176 processes=1, # Use single process for stability 

177 ) 

178 coherence = coherence_model.get_coherence() 

179 except Exception as e: 

180 logger.warning("Failed to calculate coherence", error=str(e)) 

181 

182 result = { 

183 "topics": topics, 

184 "doc_topics": doc_topics, 

185 "coherence": coherence, 

186 } 

187 

188 # Cache the result 

189 self._cached_topics[text] = result 

190 

191 return result 

192 

193 except Exception as e: 

194 logger.error( 

195 "Error in topic inference", 

196 error=str(e), 

197 error_type=type(e).__name__, 

198 text_length=len(text), 

199 ) 

200 return {"topics": [], "coherence": 0.0}