Coverage for src/qdrant_loader/core/text_processing/semantic_analyzer.py: 87%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Semantic analysis module for text processing.""" 

2 

3import logging 

4from dataclasses import dataclass 

5from typing import Any 

6 

7import spacy 

8from gensim import corpora 

9from gensim.models import LdaModel 

10from gensim.parsing.preprocessing import preprocess_string 

11from spacy.cli.download import download as spacy_download 

12from spacy.tokens import Doc 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17@dataclass 

18class SemanticAnalysisResult: 

19 """Container for semantic analysis results.""" 

20 

21 entities: list[dict[str, Any]] 

22 pos_tags: list[dict[str, Any]] 

23 dependencies: list[dict[str, Any]] 

24 topics: list[dict[str, Any]] 

25 key_phrases: list[str] 

26 document_similarity: dict[str, float] 

27 

28 

29class SemanticAnalyzer: 

30 """Advanced semantic analysis for text processing.""" 

31 

32 def __init__( 

33 self, 

34 spacy_model: str = "en_core_web_md", 

35 num_topics: int = 5, 

36 passes: int = 10, 

37 min_topic_freq: int = 2, 

38 ): 

39 """Initialize the semantic analyzer. 

40 

41 Args: 

42 spacy_model: Name of the spaCy model to use 

43 num_topics: Number of topics for LDA 

44 passes: Number of passes for LDA training 

45 min_topic_freq: Minimum frequency for topic terms 

46 """ 

47 self.logger = logging.getLogger(__name__) 

48 

49 # Initialize spaCy 

50 try: 

51 self.nlp = spacy.load(spacy_model) 

52 except OSError: 

53 self.logger.info(f"Downloading spaCy model {spacy_model}...") 

54 spacy_download(spacy_model) 

55 self.nlp = spacy.load(spacy_model) 

56 

57 # Initialize LDA parameters 

58 self.num_topics = num_topics 

59 self.passes = passes 

60 self.min_topic_freq = min_topic_freq 

61 

62 # Initialize LDA model 

63 self.lda_model = None 

64 self.dictionary = None 

65 

66 # Cache for processed documents 

67 self._doc_cache = {} 

68 

69 def analyze_text( 

70 self, text: str, doc_id: str | None = None 

71 ) -> SemanticAnalysisResult: 

72 """Perform comprehensive semantic analysis on text. 

73 

74 Args: 

75 text: Text to analyze 

76 doc_id: Optional document ID for caching 

77 

78 Returns: 

79 SemanticAnalysisResult containing all analysis results 

80 """ 

81 # Check cache 

82 if doc_id and doc_id in self._doc_cache: 

83 return self._doc_cache[doc_id] 

84 

85 # Process with spaCy 

86 doc = self.nlp(text) 

87 

88 # Extract entities with linking 

89 entities = self._extract_entities(doc) 

90 

91 # Get part-of-speech tags 

92 pos_tags = self._get_pos_tags(doc) 

93 

94 # Get dependency parse 

95 dependencies = self._get_dependencies(doc) 

96 

97 # Extract topics 

98 topics = self._extract_topics(text) 

99 

100 # Extract key phrases 

101 key_phrases = self._extract_key_phrases(doc) 

102 

103 # Calculate document similarity 

104 doc_similarity = self._calculate_document_similarity(text) 

105 

106 # Create result 

107 result = SemanticAnalysisResult( 

108 entities=entities, 

109 pos_tags=pos_tags, 

110 dependencies=dependencies, 

111 topics=topics, 

112 key_phrases=key_phrases, 

113 document_similarity=doc_similarity, 

114 ) 

115 

116 # Cache result 

117 if doc_id: 

118 self._doc_cache[doc_id] = result 

119 

120 return result 

121 

122 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]: 

123 """Extract named entities with linking. 

124 

125 Args: 

126 doc: spaCy document 

127 

128 Returns: 

129 List of entity dictionaries with linking information 

130 """ 

131 entities = [] 

132 for ent in doc.ents: 

133 # Get entity context 

134 start_sent = ent.sent.start 

135 end_sent = ent.sent.end 

136 context = doc[start_sent:end_sent].text 

137 

138 # Get entity description 

139 description = self.nlp.vocab.strings[ent.label_] 

140 

141 # Get related entities 

142 related = [] 

143 for token in ent.sent: 

144 if token.ent_type_ and token.text != ent.text: 

145 related.append( 

146 { 

147 "text": token.text, 

148 "type": token.ent_type_, 

149 "relation": token.dep_, 

150 } 

151 ) 

152 

153 entities.append( 

154 { 

155 "text": ent.text, 

156 "label": ent.label_, 

157 "start": ent.start_char, 

158 "end": ent.end_char, 

159 "description": description, 

160 "context": context, 

161 "related_entities": related, 

162 } 

163 ) 

164 

165 return entities 

166 

167 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]: 

168 """Get part-of-speech tags with detailed information. 

169 

170 Args: 

171 doc: spaCy document 

172 

173 Returns: 

174 List of POS tag dictionaries 

175 """ 

176 pos_tags = [] 

177 for token in doc: 

178 pos_tags.append( 

179 { 

180 "text": token.text, 

181 "pos": token.pos_, 

182 "tag": token.tag_, 

183 "lemma": token.lemma_, 

184 "is_stop": token.is_stop, 

185 "is_punct": token.is_punct, 

186 "is_space": token.is_space, 

187 } 

188 ) 

189 return pos_tags 

190 

191 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]: 

192 """Get dependency parse information. 

193 

194 Args: 

195 doc: spaCy document 

196 

197 Returns: 

198 List of dependency dictionaries 

199 """ 

200 dependencies = [] 

201 for token in doc: 

202 dependencies.append( 

203 { 

204 "text": token.text, 

205 "dep": token.dep_, 

206 "head": token.head.text, 

207 "head_pos": token.head.pos_, 

208 "children": [child.text for child in token.children], 

209 } 

210 ) 

211 return dependencies 

212 

213 def _extract_topics(self, text: str) -> list[dict[str, Any]]: 

214 """Extract topics using LDA. 

215 

216 Args: 

217 text: Text to analyze 

218 

219 Returns: 

220 List of topic dictionaries 

221 """ 

222 try: 

223 # Preprocess text 

224 processed_text = preprocess_string(text) 

225 

226 # Skip topic extraction for very short texts 

227 if len(processed_text) < 5: 

228 self.logger.debug("Text too short for topic extraction") 

229 return [ 

230 { 

231 "id": 0, 

232 "terms": [{"term": "general", "weight": 1.0}], 

233 "coherence": 0.5, 

234 } 

235 ] 

236 

237 # If we have existing models, use and update them 

238 if self.dictionary is not None and self.lda_model is not None: 

239 # Add new documents to existing dictionary 

240 self.dictionary.add_documents([processed_text]) 

241 

242 # Create corpus for the new text 

243 corpus = [self.dictionary.doc2bow(processed_text)] 

244 

245 # Update existing LDA model 

246 self.lda_model.update(corpus) 

247 

248 # Use the updated model for topic extraction 

249 current_lda_model = self.lda_model 

250 else: 

251 # Create fresh models for first use or when models aren't available 

252 temp_dictionary = corpora.Dictionary([processed_text]) 

253 corpus = [temp_dictionary.doc2bow(processed_text)] 

254 

255 # Create a fresh LDA model for this specific text 

256 current_lda_model = LdaModel( 

257 corpus, 

258 num_topics=min( 

259 self.num_topics, len(processed_text) // 2 

260 ), # Ensure reasonable topic count 

261 passes=self.passes, 

262 id2word=temp_dictionary, 

263 random_state=42, # For reproducibility 

264 alpha=0.1, # Fixed positive value for document-topic density 

265 eta=0.01, # Fixed positive value for topic-word density 

266 ) 

267 

268 # Get topics 

269 topics = [] 

270 for topic_id, topic in current_lda_model.print_topics(): 

271 # Parse topic terms 

272 terms = [] 

273 for term in topic.split("+"): 

274 try: 

275 weight, word = term.strip().split("*") 

276 terms.append({"term": word.strip('"'), "weight": float(weight)}) 

277 except ValueError: 

278 # Skip malformed terms 

279 continue 

280 

281 topics.append( 

282 { 

283 "id": topic_id, 

284 "terms": terms, 

285 "coherence": self._calculate_topic_coherence(terms), 

286 } 

287 ) 

288 

289 return ( 

290 topics 

291 if topics 

292 else [ 

293 { 

294 "id": 0, 

295 "terms": [{"term": "general", "weight": 1.0}], 

296 "coherence": 0.5, 

297 } 

298 ] 

299 ) 

300 

301 except Exception as e: 

302 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True) 

303 # Return fallback topic 

304 return [ 

305 { 

306 "id": 0, 

307 "terms": [{"term": "general", "weight": 1.0}], 

308 "coherence": 0.5, 

309 } 

310 ] 

311 

312 def _extract_key_phrases(self, doc: Doc) -> list[str]: 

313 """Extract key phrases from text. 

314 

315 Args: 

316 doc: spaCy document 

317 

318 Returns: 

319 List of key phrases 

320 """ 

321 key_phrases = [] 

322 

323 # Extract noun phrases 

324 for chunk in doc.noun_chunks: 

325 if len(chunk.text.split()) >= 2: # Only multi-word phrases 

326 key_phrases.append(chunk.text) 

327 

328 # Extract named entities 

329 for ent in doc.ents: 

330 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]: 

331 key_phrases.append(ent.text) 

332 

333 return list(set(key_phrases)) # Remove duplicates 

334 

335 def _calculate_document_similarity(self, text: str) -> dict[str, float]: 

336 """Calculate similarity with other processed documents. 

337 

338 Args: 

339 text: Text to compare 

340 

341 Returns: 

342 Dictionary of document similarities 

343 """ 

344 similarities = {} 

345 doc = self.nlp(text) 

346 

347 # Check if the model has word vectors 

348 has_vectors = self.nlp.vocab.vectors_length > 0 

349 

350 for doc_id, cached_result in self._doc_cache.items(): 

351 # Check if cached_result has entities and the first entity has context 

352 if not cached_result.entities or not cached_result.entities[0].get( 

353 "context" 

354 ): 

355 continue 

356 

357 cached_doc = self.nlp(cached_result.entities[0]["context"]) 

358 

359 if has_vectors: 

360 # Use spaCy's built-in similarity which uses word vectors 

361 similarity = doc.similarity(cached_doc) 

362 else: 

363 # Use alternative similarity calculation for models without word vectors 

364 # This avoids the spaCy warning about missing word vectors 

365 similarity = self._calculate_alternative_similarity(doc, cached_doc) 

366 

367 similarities[doc_id] = float(similarity) 

368 

369 return similarities 

370 

371 def _calculate_alternative_similarity(self, doc1: Doc, doc2: Doc) -> float: 

372 """Calculate similarity for models without word vectors. 

373 

374 Uses token overlap and shared entities as similarity metrics. 

375 

376 Args: 

377 doc1: First document 

378 doc2: Second document 

379 

380 Returns: 

381 Similarity score between 0 and 1 

382 """ 

383 # Extract lemmatized tokens (excluding stop words and punctuation) 

384 tokens1 = { 

385 token.lemma_.lower() 

386 for token in doc1 

387 if not token.is_stop and not token.is_punct and token.is_alpha 

388 } 

389 tokens2 = { 

390 token.lemma_.lower() 

391 for token in doc2 

392 if not token.is_stop and not token.is_punct and token.is_alpha 

393 } 

394 

395 # Calculate token overlap (Jaccard similarity) 

396 if not tokens1 and not tokens2: 

397 return 1.0 # Both empty 

398 if not tokens1 or not tokens2: 

399 return 0.0 # One empty 

400 

401 intersection = len(tokens1.intersection(tokens2)) 

402 union = len(tokens1.union(tokens2)) 

403 token_similarity = intersection / union if union > 0 else 0.0 

404 

405 # Extract named entities 

406 entities1 = {ent.text.lower() for ent in doc1.ents} 

407 entities2 = {ent.text.lower() for ent in doc2.ents} 

408 

409 # Calculate entity overlap 

410 entity_similarity = 0.0 

411 if entities1 or entities2: 

412 entity_intersection = len(entities1.intersection(entities2)) 

413 entity_union = len(entities1.union(entities2)) 

414 entity_similarity = ( 

415 entity_intersection / entity_union if entity_union > 0 else 0.0 

416 ) 

417 

418 # Combine token and entity similarities (weighted average) 

419 # Token similarity gets more weight as it's more comprehensive 

420 combined_similarity = 0.7 * token_similarity + 0.3 * entity_similarity 

421 

422 return combined_similarity 

423 

424 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float: 

425 """Calculate topic coherence score. 

426 

427 Args: 

428 terms: List of topic terms with weights 

429 

430 Returns: 

431 Coherence score between 0 and 1 

432 """ 

433 # Simple coherence based on term weights 

434 weights = [term["weight"] for term in terms] 

435 return sum(weights) / len(weights) if weights else 0.0 

436 

437 def clear_cache(self): 

438 """Clear the document cache and release all resources.""" 

439 # Clear document cache 

440 self._doc_cache.clear() 

441 

442 # Release LDA model resources 

443 if hasattr(self, "lda_model") and self.lda_model is not None: 

444 try: 

445 # Clear LDA model 

446 self.lda_model = None 

447 except Exception as e: 

448 logger.warning(f"Error releasing LDA model: {e}") 

449 

450 # Release dictionary 

451 if hasattr(self, "dictionary") and self.dictionary is not None: 

452 try: 

453 self.dictionary = None 

454 except Exception as e: 

455 logger.warning(f"Error releasing dictionary: {e}") 

456 

457 # Release spaCy model resources 

458 if hasattr(self, "nlp") and self.nlp is not None: 

459 try: 

460 # Clear spaCy caches and release memory 

461 if hasattr(self.nlp, "vocab") and hasattr(self.nlp.vocab, "strings"): 

462 # Try different methods to clear spaCy caches 

463 if hasattr(self.nlp.vocab.strings, "_map") and hasattr( 

464 self.nlp.vocab.strings._map, "clear" 

465 ): 

466 self.nlp.vocab.strings._map.clear() 

467 elif hasattr(self.nlp.vocab.strings, "clear"): 

468 self.nlp.vocab.strings.clear() 

469 # Additional cleanup for different spaCy versions 

470 if hasattr(self.nlp.vocab, "_vectors") and hasattr( 

471 self.nlp.vocab._vectors, "clear" 

472 ): 

473 self.nlp.vocab._vectors.clear() 

474 # Note: We don't set nlp to None as it might be needed for other operations 

475 # but we clear its internal caches 

476 except Exception as e: 

477 logger.debug(f"spaCy cache clearing skipped (version-specific): {e}") 

478 

479 logger.debug("Semantic analyzer resources cleared") 

480 

481 def shutdown(self): 

482 """Shutdown the semantic analyzer and release all resources. 

483 

484 This method should be called when the analyzer is no longer needed 

485 to ensure proper cleanup of all resources. 

486 """ 

487 self.clear_cache() 

488 

489 # More aggressive cleanup for shutdown 

490 if hasattr(self, "nlp"): 

491 try: 

492 # Release the spaCy model completely 

493 del self.nlp 

494 except Exception as e: 

495 logger.warning(f"Error releasing spaCy model: {e}") 

496 

497 logger.debug("Semantic analyzer shutdown completed")