Coverage for src/qdrant_loader/core/text_processing/topic_modeler.py: 64%

75 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Topic modeling module for document analysis.""" 

2 

3from typing import Any 

4 

5import spacy 

6from gensim import corpora, models 

7from spacy.cli.download import download 

8 

9from qdrant_loader.utils.logging import LoggingConfig 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14class TopicModeler: 

15 """Handles batched LDA topic modeling for document analysis.""" 

16 

17 def __init__(self, num_topics: int = 3, passes: int = 10): 

18 """Initialize the topic modeler. 

19 

20 Args: 

21 num_topics: Number of topics to extract 

22 passes: Number of passes for LDA training 

23 """ 

24 self.num_topics = num_topics 

25 self.passes = passes 

26 self.dictionary = None 

27 self.lda_model = None 

28 self._cached_topics = {} # Cache for topic inference results 

29 self._processed_texts = set() # Track processed texts 

30 

31 # Initialize spaCy for text preprocessing 

32 try: 

33 self.nlp = spacy.load("en_core_web_sm") 

34 except OSError: 

35 logger.info("Downloading spaCy model...") 

36 download("en_core_web_sm") 

37 self.nlp = spacy.load("en_core_web_sm") 

38 

39 def _preprocess_text(self, text: str) -> list[str]: 

40 """Preprocess text for topic modeling. 

41 

42 Args: 

43 text: Input text 

44 

45 Returns: 

46 List of preprocessed tokens 

47 """ 

48 # Check if text is too short for meaningful topic modeling 

49 if len(text.split()) < 5: 

50 return [] 

51 

52 doc = self.nlp(text) 

53 return [ 

54 token.text.lower() 

55 for token in doc 

56 if not token.is_stop and not token.is_punct 

57 ] 

58 

59 def train_model(self, texts: list[str]) -> None: 

60 """Train LDA model on a batch of texts. 

61 

62 Args: 

63 texts: List of texts to train on 

64 """ 

65 if not texts: 

66 logger.warning("No texts provided for training") 

67 return 

68 

69 # Filter out short texts and already processed ones 

70 new_texts = [ 

71 text 

72 for text in texts 

73 if text not in self._processed_texts and len(text.split()) >= 5 

74 ] 

75 if not new_texts: 

76 logger.info("No new texts to process") 

77 return 

78 

79 # Preprocess all texts 

80 processed_texts = [self._preprocess_text(text) for text in new_texts] 

81 processed_texts = [ 

82 text for text in processed_texts if text 

83 ] # Remove empty texts 

84 

85 if not processed_texts: 

86 logger.warning("No valid texts after preprocessing") 

87 return 

88 

89 # Create or update dictionary 

90 if self.dictionary is None: 

91 self.dictionary = corpora.Dictionary(processed_texts) 

92 else: 

93 self.dictionary.add_documents(processed_texts) 

94 

95 # Create document-term matrix 

96 corpus = [self.dictionary.doc2bow(text) for text in processed_texts] 

97 

98 # Train LDA model with optimized settings 

99 if self.lda_model is None: 

100 self.lda_model = models.LdaModel( 

101 corpus, 

102 num_topics=self.num_topics, 

103 id2word=self.dictionary, 

104 passes=self.passes, 

105 chunksize=2000, 

106 update_every=1, 

107 alpha="auto", 

108 eta="auto", 

109 decay=0.5, 

110 offset=1.0, 

111 eval_every=10, 

112 iterations=50, 

113 gamma_threshold=0.001, 

114 minimum_probability=0.01, 

115 ) 

116 else: 

117 # Update existing model 

118 self.lda_model.update(corpus) 

119 

120 # Update processed texts set 

121 self._processed_texts.update(new_texts) 

122 

123 # Clear cache when model is updated 

124 self._cached_topics.clear() 

125 

126 logger.info( 

127 "Trained/Updated LDA model", 

128 num_documents=len(new_texts), 

129 num_topics=self.num_topics, 

130 dictionary_size=len(self.dictionary), 

131 ) 

132 

133 def infer_topics(self, text: str) -> dict[str, Any]: 

134 """Infer topics for a single text using the trained model. 

135 

136 Args: 

137 text: Text to analyze 

138 

139 Returns: 

140 Dictionary containing topic analysis results 

141 """ 

142 if not self.lda_model or not self.dictionary: 

143 logger.warning("LDA model not trained") 

144 return {"topics": [], "coherence": 0.0} 

145 

146 # Check cache first 

147 if text in self._cached_topics: 

148 return self._cached_topics[text] 

149 

150 try: 

151 # Preprocess text 

152 tokens = self._preprocess_text(text) 

153 if not tokens: 

154 logger.debug("No tokens found for topic analysis") 

155 return {"topics": [], "coherence": 0.0} 

156 

157 # Create document-term matrix 

158 doc_bow = self.dictionary.doc2bow(tokens) 

159 

160 # Get topic distribution for this document 

161 doc_topics = self.lda_model[doc_bow] 

162 

163 # Get topics without coherence calculation for speed 

164 topics = self.lda_model.print_topics(num_words=5) 

165 

166 # Calculate topic coherence only for longer texts 

167 coherence = 0.0 

168 if len(tokens) > 20: # Increased threshold for coherence calculation 

169 try: 

170 coherence_model = models.CoherenceModel( 

171 model=self.lda_model, 

172 texts=[tokens], 

173 dictionary=self.dictionary, 

174 coherence="c_v", 

175 processes=1, # Use single process for stability 

176 ) 

177 coherence = coherence_model.get_coherence() 

178 except Exception as e: 

179 logger.warning("Failed to calculate coherence", error=str(e)) 

180 

181 result = { 

182 "topics": topics, 

183 "doc_topics": doc_topics, 

184 "coherence": coherence, 

185 } 

186 

187 # Cache the result 

188 self._cached_topics[text] = result 

189 

190 return result 

191 

192 except Exception as e: 

193 logger.error( 

194 "Error in topic inference", 

195 error=str(e), 

196 error_type=type(e).__name__, 

197 text_length=len(text), 

198 ) 

199 return {"topics": [], "coherence": 0.0}