Coverage for src/qdrant_loader/core/text_processing/semantic_analyzer.py: 93%
123 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Semantic analysis module for text processing."""
3import logging
4import warnings
5from dataclasses import dataclass
6from typing import Any
8import spacy
9from gensim import corpora
10from gensim.models import LdaModel
11from gensim.parsing.preprocessing import preprocess_string
12from spacy.cli.download import download as spacy_download
13from spacy.tokens import Doc
15logger = logging.getLogger(__name__)
18@dataclass
19class SemanticAnalysisResult:
20 """Container for semantic analysis results."""
22 entities: list[dict[str, Any]]
23 pos_tags: list[dict[str, Any]]
24 dependencies: list[dict[str, Any]]
25 topics: list[dict[str, Any]]
26 key_phrases: list[str]
27 document_similarity: dict[str, float]
30class SemanticAnalyzer:
31 """Advanced semantic analysis for text processing."""
33 def __init__(
34 self,
35 spacy_model: str = "en_core_web_md",
36 num_topics: int = 5,
37 passes: int = 10,
38 min_topic_freq: int = 2,
39 ):
40 """Initialize the semantic analyzer.
42 Args:
43 spacy_model: Name of the spaCy model to use
44 num_topics: Number of topics for LDA
45 passes: Number of passes for LDA training
46 min_topic_freq: Minimum frequency for topic terms
47 """
48 self.logger = logging.getLogger(__name__)
50 # Initialize spaCy
51 try:
52 self.nlp = spacy.load(spacy_model)
53 except OSError:
54 self.logger.info(f"Downloading spaCy model {spacy_model}...")
55 spacy_download(spacy_model)
56 self.nlp = spacy.load(spacy_model)
58 # Initialize LDA parameters
59 self.num_topics = num_topics
60 self.passes = passes
61 self.min_topic_freq = min_topic_freq
63 # Initialize LDA model
64 self.lda_model = None
65 self.dictionary = None
67 # Cache for processed documents
68 self._doc_cache = {}
70 def analyze_text(
71 self, text: str, doc_id: str | None = None
72 ) -> SemanticAnalysisResult:
73 """Perform comprehensive semantic analysis on text.
75 Args:
76 text: Text to analyze
77 doc_id: Optional document ID for caching
79 Returns:
80 SemanticAnalysisResult containing all analysis results
81 """
82 # Check cache
83 if doc_id and doc_id in self._doc_cache:
84 return self._doc_cache[doc_id]
86 # Process with spaCy
87 doc = self.nlp(text)
89 # Extract entities with linking
90 entities = self._extract_entities(doc)
92 # Get part-of-speech tags
93 pos_tags = self._get_pos_tags(doc)
95 # Get dependency parse
96 dependencies = self._get_dependencies(doc)
98 # Extract topics
99 topics = self._extract_topics(text)
101 # Extract key phrases
102 key_phrases = self._extract_key_phrases(doc)
104 # Calculate document similarity
105 doc_similarity = self._calculate_document_similarity(text)
107 # Create result
108 result = SemanticAnalysisResult(
109 entities=entities,
110 pos_tags=pos_tags,
111 dependencies=dependencies,
112 topics=topics,
113 key_phrases=key_phrases,
114 document_similarity=doc_similarity,
115 )
117 # Cache result
118 if doc_id:
119 self._doc_cache[doc_id] = result
121 return result
123 def _extract_entities(self, doc: Doc) -> list[dict[str, Any]]:
124 """Extract named entities with linking.
126 Args:
127 doc: spaCy document
129 Returns:
130 List of entity dictionaries with linking information
131 """
132 entities = []
133 for ent in doc.ents:
134 # Get entity context
135 start_sent = ent.sent.start
136 end_sent = ent.sent.end
137 context = doc[start_sent:end_sent].text
139 # Get entity description
140 description = self.nlp.vocab.strings[ent.label_]
142 # Get related entities
143 related = []
144 for token in ent.sent:
145 if token.ent_type_ and token.text != ent.text:
146 related.append(
147 {
148 "text": token.text,
149 "type": token.ent_type_,
150 "relation": token.dep_,
151 }
152 )
154 entities.append(
155 {
156 "text": ent.text,
157 "label": ent.label_,
158 "start": ent.start_char,
159 "end": ent.end_char,
160 "description": description,
161 "context": context,
162 "related_entities": related,
163 }
164 )
166 return entities
168 def _get_pos_tags(self, doc: Doc) -> list[dict[str, Any]]:
169 """Get part-of-speech tags with detailed information.
171 Args:
172 doc: spaCy document
174 Returns:
175 List of POS tag dictionaries
176 """
177 pos_tags = []
178 for token in doc:
179 pos_tags.append(
180 {
181 "text": token.text,
182 "pos": token.pos_,
183 "tag": token.tag_,
184 "lemma": token.lemma_,
185 "is_stop": token.is_stop,
186 "is_punct": token.is_punct,
187 "is_space": token.is_space,
188 }
189 )
190 return pos_tags
192 def _get_dependencies(self, doc: Doc) -> list[dict[str, Any]]:
193 """Get dependency parse information.
195 Args:
196 doc: spaCy document
198 Returns:
199 List of dependency dictionaries
200 """
201 dependencies = []
202 for token in doc:
203 dependencies.append(
204 {
205 "text": token.text,
206 "dep": token.dep_,
207 "head": token.head.text,
208 "head_pos": token.head.pos_,
209 "children": [child.text for child in token.children],
210 }
211 )
212 return dependencies
214 def _extract_topics(self, text: str) -> list[dict[str, Any]]:
215 """Extract topics using LDA.
217 Args:
218 text: Text to analyze
220 Returns:
221 List of topic dictionaries
222 """
223 try:
224 # Preprocess text
225 processed_text = preprocess_string(text)
227 # Skip topic extraction for very short texts
228 if len(processed_text) < 5:
229 self.logger.debug("Text too short for topic extraction")
230 return [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}]
232 # If we have existing models, use and update them
233 if self.dictionary is not None and self.lda_model is not None:
234 # Add new documents to existing dictionary
235 self.dictionary.add_documents([processed_text])
237 # Create corpus for the new text
238 corpus = [self.dictionary.doc2bow(processed_text)]
240 # Update existing LDA model
241 self.lda_model.update(corpus)
243 # Use the updated model for topic extraction
244 current_lda_model = self.lda_model
245 else:
246 # Create fresh models for first use or when models aren't available
247 temp_dictionary = corpora.Dictionary([processed_text])
248 corpus = [temp_dictionary.doc2bow(processed_text)]
250 # Create a fresh LDA model for this specific text
251 current_lda_model = LdaModel(
252 corpus,
253 num_topics=min(self.num_topics, len(processed_text) // 2), # Ensure reasonable topic count
254 passes=self.passes,
255 id2word=temp_dictionary,
256 random_state=42, # For reproducibility
257 alpha=0.1, # Fixed positive value for document-topic density
258 eta=0.01 # Fixed positive value for topic-word density
259 )
261 # Get topics
262 topics = []
263 for topic_id, topic in current_lda_model.print_topics():
264 # Parse topic terms
265 terms = []
266 for term in topic.split("+"):
267 try:
268 weight, word = term.strip().split("*")
269 terms.append({"term": word.strip('"'), "weight": float(weight)})
270 except ValueError:
271 # Skip malformed terms
272 continue
274 topics.append(
275 {
276 "id": topic_id,
277 "terms": terms,
278 "coherence": self._calculate_topic_coherence(terms),
279 }
280 )
282 return topics if topics else [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}]
284 except Exception as e:
285 self.logger.warning(f"Topic extraction failed: {e}", exc_info=True)
286 # Return fallback topic
287 return [{"id": 0, "terms": [{"term": "general", "weight": 1.0}], "coherence": 0.5}]
289 def _extract_key_phrases(self, doc: Doc) -> list[str]:
290 """Extract key phrases from text.
292 Args:
293 doc: spaCy document
295 Returns:
296 List of key phrases
297 """
298 key_phrases = []
300 # Extract noun phrases
301 for chunk in doc.noun_chunks:
302 if len(chunk.text.split()) >= 2: # Only multi-word phrases
303 key_phrases.append(chunk.text)
305 # Extract named entities
306 for ent in doc.ents:
307 if ent.label_ in ["ORG", "PRODUCT", "WORK_OF_ART", "LAW"]:
308 key_phrases.append(ent.text)
310 return list(set(key_phrases)) # Remove duplicates
312 def _calculate_document_similarity(self, text: str) -> dict[str, float]:
313 """Calculate similarity with other processed documents.
315 Args:
316 text: Text to compare
318 Returns:
319 Dictionary of document similarities
320 """
321 similarities = {}
322 doc = self.nlp(text)
324 for doc_id, cached_result in self._doc_cache.items():
325 # Check if cached_result has entities and the first entity has context
326 if not cached_result.entities or not cached_result.entities[0].get("context"):
327 continue
329 cached_doc = self.nlp(cached_result.entities[0]["context"])
330 similarity = doc.similarity(cached_doc)
331 similarities[doc_id] = float(similarity)
333 return similarities
335 def _calculate_topic_coherence(self, terms: list[dict[str, Any]]) -> float:
336 """Calculate topic coherence score.
338 Args:
339 terms: List of topic terms with weights
341 Returns:
342 Coherence score between 0 and 1
343 """
344 # Simple coherence based on term weights
345 weights = [term["weight"] for term in terms]
346 return sum(weights) / len(weights) if weights else 0.0
348 def clear_cache(self):
349 """Clear the document cache."""
350 self._doc_cache.clear()