Coverage for src/qdrant_loader/core/text_processing/semantic_analyzer.py: 93%

123 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Semantic analysis module for text processing.""" 

2 

3import logging 

4import warnings 

5from dataclasses import dataclass 

6from typing import Any 

7 

8import spacy 

9from gensim import corpora 

10from gensim.models import LdaModel 

11from gensim.parsing.preprocessing import preprocess_string 

12from spacy.cli.download import download as spacy_download 

13from spacy.tokens import Doc 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18@dataclass 

19class SemanticAnalysisResult: 

20 """Container for semantic analysis results.""" 

21 

22 entities: list[dict[str, Any]] 

23 pos_tags: list[dict[str, Any]] 

24 dependencies: list[dict[str, Any]] 

25 topics: list[dict[str, Any]] 

26 key_phrases: list[str] 

27 document_similarity: dict[str, float] 

28 

29 

30class SemanticAnalyzer: 

31 """Advanced semantic analysis for text processing.""" 

32 

33 def __init__( 

34 self, 

35 spacy_model: str = "en_core_web_md", 

36 num_topics: int = 5, 

37 passes: int = 10, 

38 min_topic_freq: int = 2, 

39 ): 

40 """Initialize the semantic analyzer. 

41 

42 Args: 

43 spacy_model: Name of the spaCy model to use 

44 num_topics: Number of topics for LDA 

45 passes: Number of passes for LDA training 

46 min_topic_freq: Minimum frequency for topic terms 

47 """ 

48 self.logger = logging.getLogger(__name__) 

49 

50 # Initialize spaCy 

51 try: 

52 self.nlp = spacy.load(spacy_model) 

53 except OSError: 

54 self.logger.info(f"Downloading spaCy model {spacy_model}...") 

55 spacy_download(spacy_model) 

56 self.nlp = spacy.load(spacy_model) 

57 

58 # Initialize LDA parameters 

59 self.num_topics = num_topics 

60 self.passes = passes 

61 self.min_topic_freq = min_topic_freq 

62 

63 # Initialize LDA model 

64 self.lda_model = None 

65 self.dictionary = None 

66 

67 # Cache for processed documents 

68 self._doc_cache = {} 

69 

70 def analyze_text( 

71 self, text: str, doc_id: str | None = None 

72 ) -> SemanticAnalysisResult: 

73 """Perform comprehensive semantic analysis on text. 

74 

75 Args: 

76 text: Text to analyze 

77 doc_id: Optional document ID for caching 

78 

79 Returns: 

80 SemanticAnalysisResult containing all analysis results 

81 """ 

82 # Check cache 

83 if doc_id and doc_id in self._doc_cache: 

84 return self._doc_cache[doc_id] 

85 

86 # Process with spaCy 

87 doc = self.nlp(text) 

88 

89 # Extract entities with linking 

90 entities = self._extract_entities(doc) 

91 

92 # Get part-of-speech tags 

93 pos_tags = self._get_pos_tags(doc) 

94 

95 # Get dependency parse 

96 dependencies = self._get_dependencies(doc) 

97 

98 # Extract topics 

99 topics = self._extract_topics(text) 

100 

101 # Extract key phrases 

102 key_phrases = self._extract_key_phrases(doc) 

103 

104 # Calculate document similarity 

105 doc_similarity = self._calculate_document_similarity(text) 

106 

107 # Create result 

108 result = SemanticAnalysisResult( 

109 entities=entities, 

110 pos_tags=pos_tags, 

111 dependencies=dependencies, 

112 topics=topics, 

113 key_phrases=key_phrases, 

114 document_similarity=doc_similarity, 

115 ) 

116 

117 # Cache result 

118 if doc_id: 

119 self._doc_cache[doc_id] = result 

120 

121 return result 

122 

123 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]: 

124 """Extract named entities with linking. 

125 

126 Args: 

127 doc: spaCy document 

128 

129 Returns: 

130 List of entity dictionaries with linking information 

131 """ 

132 entities = [] 

133 for ent in doc.ents: 

134 # Get entity context 

135 start_sent = ent.sent.start 

136 end_sent = ent.sent.end 

137 context = doc[start_sent:end_sent].text 

138 

139 # Get entity description 

140 description = self.nlp.vocab.strings[ent.label_] 

141 

142 # Get related entities 

143 related = [] 

144 for token in ent.sent: 

145 if token.ent_type_ and token.text != ent.text: 

146 related.append( 

147 { 

148 "text": token.text, 

149 "type": token.ent_type_, 

150 "relation": token.dep_, 

151 } 

152 ) 

153 

154 entities.append( 

155 { 

156 "text": ent.text, 

157 "label": ent.label_, 

158 "start": ent.start_char, 

159 "end": ent.end_char, 

160 "description": description, 

161 "context": context, 

162 "related_entities": related, 

163 } 

164 ) 

165 

166 return entities 

167 

168 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]: 

169 """Get part-of-speech tags with detailed information. 

170 

171 Args: 

172 doc: spaCy document 

173 

174 Returns: 

175 List of POS tag dictionaries 

176 """ 

177 pos_tags = [] 

178 for token in doc: 

179 pos_tags.append( 

180 { 

181 "text": token.text, 

182 "pos": token.pos_, 

183 "tag": token.tag_, 

184 "lemma": token.lemma_, 

185 "is_stop": token.is_stop, 

186 "is_punct": token.is_punct, 

187 "is_space": token.is_space, 

188 } 

189 ) 

190 return pos_tags 

191 

192 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]: 

193 """Get dependency parse information. 

194 

195 Args: 

196 doc: spaCy document 

197 

198 Returns: 

199 List of dependency dictionaries 

200 """ 

201 dependencies = [] 

202 for token in doc: 

203 dependencies.append( 

204 { 

205 "text": token.text, 

206 "dep": token.dep_, 

207 "head": token.head.text, 

208 "head_pos": token.head.pos_, 

209 "children": [child.text for child in token.children], 

210 } 

211 ) 

212 return dependencies 

213 

214 def _extract_topics(self, text: str) -> list[dict[str, Any]]: 

215 """Extract topics using LDA. 

216 

217 Args: 

218 text: Text to analyze 

219 

220 Returns: 

221 List of topic dictionaries 

222 """ 

223 try: 

224 # Preprocess text 

225 processed_text = preprocess_string(text) 

226 

227 # Skip topic extraction for very short texts 

228 if len(processed_text) < 5: 

229 self.logger.debug("Text too short for topic extraction") 

230 return [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}] 

231 

232 # If we have existing models, use and update them 

233 if self.dictionary is not None and self.lda_model is not None: 

234 # Add new documents to existing dictionary 

235 self.dictionary.add_documents([processed_text]) 

236 

237 # Create corpus for the new text 

238 corpus = [self.dictionary.doc2bow(processed_text)] 

239 

240 # Update existing LDA model 

241 self.lda_model.update(corpus) 

242 

243 # Use the updated model for topic extraction 

244 current_lda_model = self.lda_model 

245 else: 

246 # Create fresh models for first use or when models aren't available 

247 temp_dictionary = corpora.Dictionary([processed_text]) 

248 corpus = [temp_dictionary.doc2bow(processed_text)] 

249 

250 # Create a fresh LDA model for this specific text 

251 current_lda_model = LdaModel( 

252 corpus, 

253 num_topics=min(self.num_topics, len(processed_text) // 2), # Ensure reasonable topic count 

254 passes=self.passes, 

255 id2word=temp_dictionary, 

256 random_state=42, # For reproducibility 

257 alpha=0.1, # Fixed positive value for document-topic density 

258 eta=0.01 # Fixed positive value for topic-word density 

259 ) 

260 

261 # Get topics 

262 topics = [] 

263 for topic_id, topic in current_lda_model.print_topics(): 

264 # Parse topic terms 

265 terms = [] 

266 for term in topic.split("+"): 

267 try: 

268 weight, word = term.strip().split("*") 

269 terms.append({"term": word.strip('"'), "weight": float(weight)}) 

270 except ValueError: 

271 # Skip malformed terms 

272 continue 

273 

274 topics.append( 

275 { 

276 "id": topic_id, 

277 "terms": terms, 

278 "coherence": self._calculate_topic_coherence(terms), 

279 } 

280 ) 

281 

282 return topics if topics else [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}] 

283 

284 except Exception as e: 

285 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True) 

286 # Return fallback topic 

287 return [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}] 

288 

289 def _extract_key_phrases(self, doc: Doc) -> list[str]: 

290 """Extract key phrases from text. 

291 

292 Args: 

293 doc: spaCy document 

294 

295 Returns: 

296 List of key phrases 

297 """ 

298 key_phrases = [] 

299 

300 # Extract noun phrases 

301 for chunk in doc.noun_chunks: 

302 if len(chunk.text.split()) >= 2: # Only multi-word phrases 

303 key_phrases.append(chunk.text) 

304 

305 # Extract named entities 

306 for ent in doc.ents: 

307 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]: 

308 key_phrases.append(ent.text) 

309 

310 return list(set(key_phrases)) # Remove duplicates 

311 

312 def _calculate_document_similarity(self, text: str) -> dict[str, float]: 

313 """Calculate similarity with other processed documents. 

314 

315 Args: 

316 text: Text to compare 

317 

318 Returns: 

319 Dictionary of document similarities 

320 """ 

321 similarities = {} 

322 doc = self.nlp(text) 

323 

324 for doc_id, cached_result in self._doc_cache.items(): 

325 # Check if cached_result has entities and the first entity has context 

326 if not cached_result.entities or not cached_result.entities[0].get("context"): 

327 continue 

328 

329 cached_doc = self.nlp(cached_result.entities[0]["context"]) 

330 similarity = doc.similarity(cached_doc) 

331 similarities[doc_id] = float(similarity) 

332 

333 return similarities 

334 

335 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float: 

336 """Calculate topic coherence score. 

337 

338 Args: 

339 terms: List of topic terms with weights 

340 

341 Returns: 

342 Coherence score between 0 and 1 

343 """ 

344 # Simple coherence based on term weights 

345 weights = [term["weight"] for term in terms] 

346 return sum(weights) / len(weights) if weights else 0.0 

347 

348 def clear_cache(self): 

349 """Clear the document cache.""" 

350 self._doc_cache.clear()