Coverage for src / qdrant_loader / core / text_processing / semantic_analyzer.py: 90%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Semantic analysis module for text processing.""" 

2 

3import hashlib 

4import logging 

5import threading 

6from dataclasses import dataclass 

7from typing import Any 

8 

9import spacy 

10from gensim import corpora 

11from gensim.models import LdaModel 

12from gensim.parsing.preprocessing import preprocess_string 

13from spacy.cli.download import download as spacy_download 

14from spacy.tokens import Doc 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def is_meaningful_text(text: str) -> bool: 

20 """Check if text contains meaningful content (letters or digits). 

21 

22 Returns False for text that only contains: 

23 - Punctuation marks: ., #, @, |, -, _, etc. 

24 - Whitespace characters 

25 - Special symbols without semantic meaning (---, ..., |||, etc.) 

26 

27 """ 

28 # Check if text contains at least one alphanumeric character 

29 return any(c.isalnum() for c in text) 

30 

31 

32@dataclass 

33class SemanticAnalysisResult: 

34 """Container for semantic analysis results.""" 

35 

36 entities: list[dict[str, Any]] 

37 pos_tags: list[dict[str, Any]] 

38 dependencies: list[dict[str, Any]] 

39 topics: list[dict[str, Any]] 

40 key_phrases: list[str] 

41 document_similarity: dict[str, float] 

42 

43 

44class SemanticAnalyzer: 

45 """Advanced semantic analysis for text processing.""" 

46 

47 def __init__( 

48 self, 

49 spacy_model: str = "en_core_web_md", 

50 num_topics: int = 5, 

51 passes: int = 10, 

52 min_topic_freq: int = 2, 

53 ): 

54 """Initialize the semantic analyzer. 

55 

56 Args: 

57 spacy_model: Name of the spaCy model to use 

58 num_topics: Number of topics for LDA 

59 passes: Number of passes for LDA training 

60 min_topic_freq: Minimum frequency for topic terms 

61 """ 

62 self.logger = logging.getLogger(__name__) 

63 

64 # Initialize spaCy 

65 try: 

66 self.nlp = spacy.load(spacy_model) 

67 except OSError: 

68 self.logger.info(f"Downloading spaCy model {spacy_model}...") 

69 spacy_download(spacy_model) 

70 self.nlp = spacy.load(spacy_model) 

71 

72 # Initialize LDA parameters 

73 self.num_topics = num_topics 

74 self.passes = passes 

75 self.min_topic_freq = min_topic_freq 

76 

77 # Initialize LDA model 

78 self.lda_model = None 

79 self.dictionary = None 

80 

81 # Cache for processed documents 

82 self._doc_cache: dict = {} 

83 self._doc_cache_lock = threading.Lock() 

84 

85 def _build_cache_key( 

86 self, text: str, doc_id: str | None, include_enhanced: bool 

87 ) -> tuple[str, bool, str] | None: 

88 """Build a cache key that includes a content fingerprint. 

89 

90 Including a fingerprint prevents stale cache hits when the same doc_id 

91 is reused with different content. 

92 """ 

93 if not doc_id: 

94 return None 

95 

96 text_fingerprint = hashlib.sha256(text.encode("utf-8")).hexdigest() 

97 return (doc_id, include_enhanced, text_fingerprint) 

98 

99 def analyze_text( 

100 self, 

101 text: str, 

102 doc_id: str | None = None, 

103 include_enhanced: bool = False, 

104 ) -> SemanticAnalysisResult: 

105 """Perform comprehensive semantic analysis on text. 

106 

107 Args: 

108 text: Text to analyze 

109 doc_id: Optional document ID for caching 

110 include_enhanced: Whether to compute enhanced NLP fields 

111 (pos_tags, dependencies, document_similarity) 

112 

113 Returns: 

114 SemanticAnalysisResult containing all analysis results 

115 """ 

116 # Check cache 

117 cache_key = self._build_cache_key(text, doc_id, include_enhanced) 

118 

119 # Protected read 

120 with self._doc_cache_lock: 

121 cached = self._doc_cache.get(cache_key) if cache_key else None 

122 

123 if cached is not None: 

124 if include_enhanced: 

125 # Compute similarity OUTSIDE the lock (can be slow) 

126 doc_similarity = self._calculate_document_similarity( 

127 text, doc_id=doc_id 

128 ) 

129 refreshed = SemanticAnalysisResult( 

130 entities=cached.entities, 

131 pos_tags=cached.pos_tags, 

132 dependencies=cached.dependencies, 

133 topics=cached.topics, 

134 key_phrases=cached.key_phrases, 

135 document_similarity=doc_similarity, 

136 ) 

137 # Protected write-back 

138 with self._doc_cache_lock: 

139 self._doc_cache[cache_key] = refreshed 

140 return refreshed 

141 return cached 

142 

143 # Process with spaCy 

144 doc = self.nlp(text) 

145 

146 # Extract entities with linking 

147 entities = self._extract_entities(doc) 

148 

149 if include_enhanced: 

150 # Get part-of-speech tags 

151 pos_tags = self._get_pos_tags(doc) 

152 

153 # Get dependency parse 

154 dependencies = self._get_dependencies(doc) 

155 else: 

156 pos_tags = [] 

157 dependencies = [] 

158 

159 # Extract topics 

160 topics = self._extract_topics(text) 

161 

162 # Extract key phrases 

163 key_phrases = self._extract_key_phrases(doc) 

164 

165 # Calculate document similarity 

166 doc_similarity = ( 

167 self._calculate_document_similarity(text, doc_id=doc_id) 

168 if include_enhanced 

169 else {} 

170 ) 

171 

172 # Create result 

173 result = SemanticAnalysisResult( 

174 entities=entities, 

175 pos_tags=pos_tags, 

176 dependencies=dependencies, 

177 topics=topics, 

178 key_phrases=key_phrases, 

179 document_similarity=doc_similarity, 

180 ) 

181 

182 # Protected write 

183 if cache_key: 

184 with self._doc_cache_lock: 

185 self._doc_cache[cache_key] = result 

186 

187 return result 

188 

189 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]: 

190 """Extract named entities with linking, filtering garbage entities. 

191 

192 Filters out entities that: 

193 - Only contain punctuation/symbols (., #, |, etc.) 

194 - Don't have any alphanumeric characters 

195 - Are just whitespace 

196 

197 Args: 

198 doc: spaCy document 

199 

200 Returns: 

201 List of entity dictionaries with linking information 

202 """ 

203 entities = [] 

204 for ent in doc.ents: 

205 # Filter entities that only contain punctuation/symbols 

206 if not is_meaningful_text(ent.text): 

207 continue 

208 

209 # Get entity context 

210 start_sent = ent.sent.start 

211 end_sent = ent.sent.end 

212 context = doc[start_sent:end_sent].text 

213 

214 # Get entity description 

215 description = self.nlp.vocab.strings[ent.label_] 

216 

217 # Get related entities (also filter meaningless ones) 

218 related = [] 

219 for token in ent.sent: 

220 if token.ent_type_ and token.text != ent.text: 

221 # Only add related entities with meaningful text 

222 if is_meaningful_text(token.text): 

223 related.append( 

224 { 

225 "text": token.text, 

226 "type": token.ent_type_, 

227 "relation": token.dep_, 

228 } 

229 ) 

230 

231 entities.append( 

232 { 

233 "text": ent.text, 

234 "label": ent.label_, 

235 "start": ent.start_char, 

236 "end": ent.end_char, 

237 "description": description, 

238 "context": context, 

239 "related_entities": related, 

240 } 

241 ) 

242 

243 return entities 

244 

245 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]: 

246 """Get part-of-speech tags with detailed information, filtering noise tokens. 

247 

248 Filters out multiple types of noise: 

249 - Whitespace tokens (is_space=True) 

250 - Punctuation tokens (is_punct=True) 

251 - Symbol-only tokens without alphanumeric content (e.g., ---, ..., |||) 

252 

253 This is especially important for Excel tables and structured data. 

254 

255 Args: 

256 doc: spaCy document 

257 

258 Returns: 

259 List of POS tag dictionaries (excluding spaces, punctuation, and symbols) 

260 """ 

261 pos_tags = [] 

262 for token in doc: 

263 # Skip whitespace and punctuation - they pollute metadata 

264 if token.is_space or token.is_punct: 

265 continue 

266 

267 # Also skip tokens with no meaningful content (e.g., ---, ...) 

268 # This catches edge cases where spaCy doesn't mark as punct 

269 if not is_meaningful_text(token.text): 

270 continue 

271 

272 pos_tags.append( 

273 { 

274 "text": token.text, 

275 "pos": token.pos_, 

276 "tag": token.tag_, 

277 "lemma": token.lemma_, 

278 "is_stop": token.is_stop, 

279 } 

280 ) 

281 return pos_tags 

282 

283 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]: 

284 """Get dependency parse information with filtering. 

285 

286 Filters out: 

287 - Whitespace tokens (is_space=True) 

288 - Punctuation tokens (is_punct=True) 

289 - Symbol-only tokens without alphanumeric content 

290 - Children that are punctuation or meaningless symbols 

291 

292 Args: 

293 doc: spaCy document 

294 

295 Returns: 

296 List of dependency dictionaries (excluding noise tokens) 

297 """ 

298 dependencies = [] 

299 for token in doc: 

300 # Skip whitespace and punctuation tokens 

301 if token.is_space or token.is_punct: 

302 continue 

303 

304 # Skip tokens with no meaningful content (e.g., ---, ...) 

305 if not is_meaningful_text(token.text): 

306 continue 

307 

308 # Filter children to only include meaningful tokens 

309 meaningful_children = [ 

310 child.text 

311 for child in token.children 

312 if not child.is_space 

313 and not child.is_punct 

314 and is_meaningful_text(child.text) 

315 ] 

316 

317 dependencies.append( 

318 { 

319 "text": token.text, 

320 "dep": token.dep_, 

321 "head": token.head.text, 

322 "head_pos": token.head.pos_, 

323 "children": meaningful_children, 

324 } 

325 ) 

326 return dependencies 

327 

328 def _extract_topics(self, text: str) -> list[dict[str, Any]]: 

329 """Extract topics using LDA. 

330 

331 Args: 

332 text: Text to analyze 

333 

334 Returns: 

335 List of topic dictionaries 

336 """ 

337 try: 

338 # Preprocess text 

339 processed_text = preprocess_string(text) 

340 

341 # Skip topic extraction for very short texts 

342 if len(processed_text) < 5: 

343 self.logger.debug("Text too short for topic extraction") 

344 return [ 

345 { 

346 "id": 0, 

347 "terms": [{"term": "general", "weight": 1.0}], 

348 "coherence": 0.5, 

349 } 

350 ] 

351 

352 # If we have existing models, use and update them 

353 if self.dictionary is not None and self.lda_model is not None: 

354 # Add new documents to existing dictionary 

355 self.dictionary.add_documents([processed_text]) 

356 

357 # Create corpus for the new text 

358 corpus = [self.dictionary.doc2bow(processed_text)] 

359 

360 # Update existing LDA model 

361 self.lda_model.update(corpus) 

362 

363 # Use the updated model for topic extraction 

364 current_lda_model = self.lda_model 

365 else: 

366 # Create fresh models for first use or when models aren't available 

367 temp_dictionary = corpora.Dictionary([processed_text]) 

368 corpus = [temp_dictionary.doc2bow(processed_text)] 

369 

370 # Create a fresh LDA model for this specific text 

371 current_lda_model = LdaModel( 

372 corpus, 

373 num_topics=min( 

374 self.num_topics, len(processed_text) // 2 

375 ), # Ensure reasonable topic count 

376 passes=self.passes, 

377 id2word=temp_dictionary, 

378 random_state=42, # For reproducibility 

379 alpha=0.1, # Fixed positive value for document-topic density 

380 eta=0.01, # Fixed positive value for topic-word density 

381 ) 

382 

383 # Get topics 

384 topics = [] 

385 for topic_id, topic in current_lda_model.print_topics(): 

386 # Parse topic terms 

387 terms = [] 

388 for term in topic.split("+"): 

389 try: 

390 weight, word = term.strip().split("*") 

391 terms.append({"term": word.strip('"'), "weight": float(weight)}) 

392 except ValueError: 

393 # Skip malformed terms 

394 continue 

395 

396 topics.append( 

397 { 

398 "id": topic_id, 

399 "terms": terms, 

400 "coherence": self._calculate_topic_coherence(terms), 

401 } 

402 ) 

403 

404 return ( 

405 topics 

406 if topics 

407 else [ 

408 { 

409 "id": 0, 

410 "terms": [{"term": "general", "weight": 1.0}], 

411 "coherence": 0.5, 

412 } 

413 ] 

414 ) 

415 

416 except Exception as e: 

417 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True) 

418 # Return fallback topic 

419 return [ 

420 { 

421 "id": 0, 

422 "terms": [{"term": "general", "weight": 1.0}], 

423 "coherence": 0.5, 

424 } 

425 ] 

426 

427 def _extract_key_phrases(self, doc: Doc) -> list[str]: 

428 """Extract key phrases from text. 

429 

430 Args: 

431 doc: spaCy document 

432 

433 Returns: 

434 List of key phrases 

435 """ 

436 key_phrases = [] 

437 

438 # Extract noun phrases 

439 for chunk in doc.noun_chunks: 

440 if len(chunk.text.split()) >= 2: # Only multi-word phrases 

441 key_phrases.append(chunk.text) 

442 

443 # Extract named entities 

444 for ent in doc.ents: 

445 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]: 

446 key_phrases.append(ent.text) 

447 

448 return list(set(key_phrases)) # Remove duplicates 

449 

450 def _calculate_document_similarity( 

451 self, text: str, doc_id: str | None = None 

452 ) -> dict[str, float]: 

453 """Calculate similarity with other processed documents. 

454 

455 Args: 

456 text: Text to compare 

457 doc_id: Optional current document ID to exclude from results 

458 

459 Returns: 

460 Dictionary of document similarities 

461 """ 

462 similarities = {} 

463 skipped_ids = {doc_id} if doc_id else set() 

464 

465 doc = self.nlp(text) 

466 

467 # Check if the model has word vectors 

468 has_vectors = self.nlp.vocab.vectors_length > 0 

469 

470 with self._doc_cache_lock: 

471 cached_items = list(self._doc_cache.items()) 

472 

473 for cache_key, cached_result in cached_items: 

474 cached_doc_id = cache_key[0] if isinstance(cache_key, tuple) else cache_key 

475 if cached_doc_id is None or cached_doc_id in skipped_ids: 

476 continue 

477 

478 # Check if cached_result has entities and the first entity has context 

479 if not cached_result.entities or not cached_result.entities[0].get( 

480 "context" 

481 ): 

482 continue 

483 

484 cached_doc = self.nlp(cached_result.entities[0]["context"]) 

485 

486 if has_vectors: 

487 # Use spaCy's built-in similarity which uses word vectors 

488 similarity = doc.similarity(cached_doc) 

489 else: 

490 # Use alternative similarity calculation for models without word vectors 

491 # This avoids the spaCy warning about missing word vectors 

492 similarity = self._calculate_alternative_similarity(doc, cached_doc) 

493 

494 similarities[cached_doc_id] = float(similarity) 

495 skipped_ids.add(cached_doc_id) 

496 

497 return similarities 

498 

499 def _calculate_alternative_similarity(self, doc1: Doc, doc2: Doc) -> float: 

500 """Calculate similarity for models without word vectors. 

501 

502 Uses token overlap and shared entities as similarity metrics. 

503 

504 Args: 

505 doc1: First document 

506 doc2: Second document 

507 

508 Returns: 

509 Similarity score between 0 and 1 

510 """ 

511 # Extract lemmatized tokens (excluding stop words and punctuation) 

512 tokens1 = { 

513 token.lemma_.lower() 

514 for token in doc1 

515 if not token.is_stop and not token.is_punct and token.is_alpha 

516 } 

517 tokens2 = { 

518 token.lemma_.lower() 

519 for token in doc2 

520 if not token.is_stop and not token.is_punct and token.is_alpha 

521 } 

522 

523 # Calculate token overlap (Jaccard similarity) 

524 if not tokens1 and not tokens2: 

525 return 1.0 # Both empty 

526 if not tokens1 or not tokens2: 

527 return 0.0 # One empty 

528 

529 intersection = len(tokens1.intersection(tokens2)) 

530 union = len(tokens1.union(tokens2)) 

531 token_similarity = intersection / union if union > 0 else 0.0 

532 

533 # Extract named entities 

534 entities1 = {ent.text.lower() for ent in doc1.ents} 

535 entities2 = {ent.text.lower() for ent in doc2.ents} 

536 

537 # Calculate entity overlap 

538 entity_similarity = 0.0 

539 if entities1 or entities2: 

540 entity_intersection = len(entities1.intersection(entities2)) 

541 entity_union = len(entities1.union(entities2)) 

542 entity_similarity = ( 

543 entity_intersection / entity_union if entity_union > 0 else 0.0 

544 ) 

545 

546 # Combine token and entity similarities (weighted average) 

547 # Token similarity gets more weight as it's more comprehensive 

548 combined_similarity = 0.7 * token_similarity + 0.3 * entity_similarity 

549 

550 return combined_similarity 

551 

552 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float: 

553 """Calculate topic coherence score. 

554 

555 Args: 

556 terms: List of topic terms with weights 

557 

558 Returns: 

559 Coherence score between 0 and 1 

560 """ 

561 # Simple coherence based on term weights 

562 weights = [term["weight"] for term in terms] 

563 return sum(weights) / len(weights) if weights else 0.0 

564 

565 def clear_cache(self): 

566 """Clear the document cache and release all resources.""" 

567 # Clear document cache 

568 with self._doc_cache_lock: 

569 self._doc_cache.clear() 

570 

571 # Release LDA model resources 

572 if hasattr(self, "lda_model") and self.lda_model is not None: 

573 try: 

574 # Clear LDA model 

575 self.lda_model = None 

576 except Exception as e: 

577 logger.warning(f"Error releasing LDA model: {e}") 

578 

579 # Release dictionary 

580 if hasattr(self, "dictionary") and self.dictionary is not None: 

581 try: 

582 self.dictionary = None 

583 except Exception as e: 

584 logger.warning(f"Error releasing dictionary: {e}") 

585 

586 # Release spaCy model resources 

587 if hasattr(self, "nlp") and self.nlp is not None: 

588 try: 

589 # Clear spaCy caches and release memory 

590 if hasattr(self.nlp, "vocab") and hasattr(self.nlp.vocab, "strings"): 

591 # Try different methods to clear spaCy caches 

592 if hasattr(self.nlp.vocab.strings, "_map") and hasattr( 

593 self.nlp.vocab.strings._map, "clear" 

594 ): 

595 self.nlp.vocab.strings._map.clear() 

596 elif hasattr(self.nlp.vocab.strings, "clear"): 

597 self.nlp.vocab.strings.clear() 

598 # Additional cleanup for different spaCy versions 

599 if hasattr(self.nlp.vocab, "_vectors") and hasattr( 

600 self.nlp.vocab._vectors, "clear" 

601 ): 

602 self.nlp.vocab._vectors.clear() 

603 # Note: We don't set nlp to None as it might be needed for other operations 

604 # but we clear its internal caches 

605 except Exception as e: 

606 logger.debug(f"spaCy cache clearing skipped (version-specific): {e}") 

607 

608 logger.debug("Semantic analyzer resources cleared") 

609 

610 def shutdown(self): 

611 """Shutdown the semantic analyzer and release all resources. 

612 

613 This method should be called when the analyzer is no longer needed 

614 to ensure proper cleanup of all resources. 

615 """ 

616 self.clear_cache() 

617 

618 # More aggressive cleanup for shutdown 

619 if hasattr(self, "nlp"): 

620 try: 

621 # Release the spaCy model completely 

622 del self.nlp 

623 except Exception as e: 

624 logger.warning(f"Error releasing spaCy model: {e}") 

625 

626 logger.debug("Semantic analyzer shutdown completed")