Coverage for src / qdrant_loader / core / chunking / strategy / default / text_chunk_processor.py: 99%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Text-specific chunk processor for document creation and management."""
3from typing import Any
5from qdrant_loader.config import Settings
6from qdrant_loader.core.chunking.strategy.base.chunk_processor import BaseChunkProcessor
7from qdrant_loader.core.document import Document
10class TextChunkProcessor(BaseChunkProcessor):
11 """Chunk processor for text documents with enhanced text-specific processing."""
13 def __init__(self, settings: Settings):
14 super().__init__(settings)
15 # Get strategy-specific configuration
16 self.default_config = settings.global_config.chunking.strategies.default
17 self._semantic_analysis_enabled = (
18 settings.global_config.chunking.enable_semantic_analysis
19 )
21 def create_chunk_document(
22 self,
23 original_doc: Document,
24 chunk_content: str,
25 chunk_index: int,
26 total_chunks: int,
27 chunk_metadata: dict[str, Any],
28 skip_nlp: bool = False,
29 ) -> Document:
30 """Create a document for a text chunk with enhanced metadata."""
32 # Generate unique chunk ID
33 chunk_id = self.generate_chunk_id(original_doc, chunk_index)
35 # Create base metadata
36 base_metadata = self.create_base_chunk_metadata(
37 original_doc, chunk_index, total_chunks, chunk_metadata
38 )
40 # Add text-specific metadata
41 text_metadata = self._create_text_specific_metadata(chunk_content, original_doc)
42 base_metadata.update(text_metadata)
44 # Always set NLP-state metadata so consumers can rely on these keys
45 if not self._semantic_analysis_enabled:
46 base_metadata.update(
47 {
48 "entities": [],
49 "pos_tags": [],
50 "nlp_skipped": True,
51 "skip_reason": "semantic_analysis_disabled",
52 }
53 )
54 else:
55 # Initialize with defaults when semantic analysis is enabled
56 # These will be updated if actual NLP processing occurs
57 base_metadata.update(
58 {
59 "entities": [],
60 "pos_tags": [],
61 "nlp_skipped": False,
62 "skip_reason": None,
63 }
64 )
66 # Create chunk document
67 chunk_doc = Document(
68 id=chunk_id,
69 content=chunk_content,
70 metadata=base_metadata,
71 source=original_doc.source,
72 source_type=original_doc.source_type,
73 url=original_doc.url,
74 content_type=original_doc.content_type,
75 title=f"{original_doc.title} - Chunk {chunk_index + 1}",
76 )
78 return chunk_doc
80 def _create_text_specific_metadata(
81 self, content: str, original_doc: Document
82 ) -> dict[str, Any]:
83 """Create text-specific metadata for the chunk."""
84 metadata = {
85 "chunk_strategy": "text",
86 "processing_method": "intelligent_splitting",
87 "content_analysis": self._analyze_chunk_content(content),
88 "quality_metrics": self._calculate_quality_metrics(content),
89 "text_characteristics": self._extract_text_characteristics(content),
90 }
92 # Add semantic analysis indicators if enabled
93 if self._semantic_analysis_enabled:
94 metadata["semantic_analysis_enabled"] = True
95 metadata["semantic_indicators"] = self._extract_semantic_indicators(content)
97 # Add entity extraction indicators if enabled
98 if self.default_config.enable_entity_extraction:
99 metadata["entity_extraction_enabled"] = True
100 metadata["entity_hints"] = self._extract_entity_hints(content)
102 return metadata
104 def _analyze_chunk_content(self, content: str) -> dict[str, Any]:
105 """Analyze the content structure and characteristics of the chunk."""
106 words = content.split()
107 sentences = content.split(".")
108 paragraphs = content.split("\n\n")
110 return {
111 "word_count": len(words),
112 "sentence_count": len([s for s in sentences if s.strip()]),
113 "paragraph_count": len([p for p in paragraphs if p.strip()]),
114 "avg_words_per_sentence": len(words)
115 / max(1, len([s for s in sentences if s.strip()])),
116 "character_count": len(content),
117 "content_density": (
118 len(content.replace(" ", "")) / len(content) if content else 0
119 ),
120 }
122 def _calculate_quality_metrics(self, content: str) -> dict[str, Any]:
123 """Calculate quality metrics for the chunk."""
124 words = content.split()
126 # Content completeness (does it end with proper punctuation?)
127 ends_properly = content.strip().endswith((".", "!", "?", ":", ";"))
129 # Content coherence (rough estimate based on word repetition)
130 unique_words = len({word.lower() for word in words})
131 word_diversity = unique_words / len(words) if words else 0
133 # Content readability (simple metric based on sentence structure)
134 sentences = [s.strip() for s in content.split(".") if s.strip()]
135 avg_sentence_length = (
136 sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
137 )
139 return {
140 "ends_properly": ends_properly,
141 "word_diversity": round(word_diversity, 3),
142 "avg_sentence_length": round(avg_sentence_length, 1),
143 "readability_score": self._estimate_readability(
144 avg_sentence_length, word_diversity
145 ),
146 "chunk_completeness": self._assess_chunk_completeness(content),
147 }
149 def _extract_text_characteristics(self, content: str) -> dict[str, Any]:
150 """Extract various text characteristics from the chunk."""
151 import re
153 return {
154 "has_numbers": bool(re.search(r"\d+", content)),
155 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)),
156 "has_urls": bool(re.search(r"https?://\S+", content)),
157 "has_email": bool(
158 re.search(
159 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content
160 )
161 ),
162 "has_phone": bool(re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content)),
163 "has_currency": bool(re.search(r"\$\d+(?:\.\d{2})?", content)),
164 "has_percentages": bool(re.search(r"\b\d+(?:\.\d+)?%\b", content)),
165 "has_quotes": '"' in content or "'" in content,
166 "has_parentheses": "(" in content and ")" in content,
167 "has_formatting": bool(re.search(r"[*_`#]", content)),
168 "language_indicators": self._detect_language_indicators(content),
169 }
171 def _extract_semantic_indicators(self, content: str) -> dict[str, Any]:
172 """Extract indicators for semantic analysis."""
173 import re
175 # Topic indicators (simple keyword-based)
176 business_keywords = [
177 "company",
178 "business",
179 "market",
180 "revenue",
181 "profit",
182 "customer",
183 ]
184 tech_keywords = [
185 "software",
186 "technology",
187 "system",
188 "data",
189 "algorithm",
190 "code",
191 ]
192 academic_keywords = [
193 "research",
194 "study",
195 "analysis",
196 "theory",
197 "methodology",
198 "conclusion",
199 ]
201 content_lower = content.lower()
203 return {
204 "topic_indicators": {
205 "business": sum(1 for kw in business_keywords if kw in content_lower),
206 "technology": sum(1 for kw in tech_keywords if kw in content_lower),
207 "academic": sum(1 for kw in academic_keywords if kw in content_lower),
208 },
209 "discourse_markers": {
210 "enumeration": bool(
211 re.search(r"\b(first|second|third|finally|lastly)\b", content_lower)
212 ),
213 "causation": bool(
214 re.search(
215 r"\b(because|therefore|thus|consequently|as a result)\b",
216 content_lower,
217 )
218 ),
219 "contrast": bool(
220 re.search(
221 r"\b(however|although|despite|nevertheless|on the other hand)\b",
222 content_lower,
223 )
224 ),
225 "comparison": bool(
226 re.search(
227 r"\b(similarly|likewise|compared to|in contrast)\b",
228 content_lower,
229 )
230 ),
231 },
232 "complexity_indicators": {
233 "has_subordinate_clauses": bool(
234 re.search(r"\b(which|that|who|whom|whose)\b", content_lower)
235 ),
236 "has_conditionals": bool(
237 re.search(r"\b(if|unless|provided|assuming)\b", content_lower)
238 ),
239 "has_temporal_references": bool(
240 re.search(r"\b(when|while|before|after|during)\b", content_lower)
241 ),
242 },
243 }
245 def _extract_entity_hints(self, content: str) -> dict[str, Any]:
246 """Extract hints for entity extraction."""
247 import re
249 # Potential entity patterns
250 proper_nouns = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", content)
251 acronyms = re.findall(r"\b[A-Z]{2,}\b", content)
253 return {
254 "proper_noun_count": len(proper_nouns),
255 "acronym_count": len(acronyms),
256 "capitalized_words": len(re.findall(r"\b[A-Z][a-z]+\b", content)),
257 "potential_names": len(
258 [noun for noun in proper_nouns if len(noun.split()) <= 3]
259 ),
260 "potential_organizations": len(
261 [noun for noun in proper_nouns if len(noun.split()) > 1]
262 ),
263 "has_titles": bool(
264 re.search(r"\b(Dr|Mr|Mrs|Ms|Prof|CEO|CTO|VP)\b\.?\s+[A-Z]", content)
265 ),
266 "has_locations": bool(
267 re.search(
268 r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:City|State|Country|Street|Ave|Road))\b",
269 content,
270 )
271 ),
272 }
274 def _estimate_readability(
275 self, avg_sentence_length: float, word_diversity: float
276 ) -> str:
277 """Estimate readability level based on simple metrics."""
278 if avg_sentence_length < 10 and word_diversity > 0.7:
279 return "easy"
280 elif avg_sentence_length < 20 and word_diversity > 0.5:
281 return "moderate"
282 elif avg_sentence_length < 30:
283 return "difficult"
284 else:
285 return "very_difficult"
287 def _assess_chunk_completeness(self, content: str) -> float:
288 """Assess how complete/coherent the chunk appears to be."""
289 score = 0.0
291 # Check for proper sentence endings
292 if content.strip().endswith((".", "!", "?")):
293 score += 0.3
295 # Check for proper sentence beginnings
296 if content.strip() and content.strip()[0].isupper():
297 score += 0.2
299 # Check for balanced punctuation
300 open_parens = content.count("(")
301 close_parens = content.count(")")
302 open_quotes = content.count('"') + content.count("'")
304 if open_parens == close_parens:
305 score += 0.2
306 if open_quotes % 2 == 0: # Even number of quotes
307 score += 0.1
309 # Check for paragraph structure
310 if "\n\n" in content or len(content.split(".")) > 1:
311 score += 0.2
313 return min(1.0, score)
315 def _detect_language_indicators(self, content: str) -> dict[str, Any]:
316 """Detect language indicators in the content."""
317 content_lower = content.lower()
319 # Common English function words
320 english_indicators = [
321 "the",
322 "and",
323 "or",
324 "but",
325 "in",
326 "on",
327 "at",
328 "to",
329 "for",
330 "of",
331 "with",
332 ]
333 english_count = sum(
334 1 for word in english_indicators if f" {word} " in f" {content_lower} "
335 )
337 return {
338 "english_function_words": english_count,
339 "likely_english": english_count >= 3,
340 "punctuation_style": "american" if ". " in content else "other",
341 }