Coverage for src/qdrant_loader/core/chunking/strategy/default/text_chunk_processor.py: 99%
81 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Text-specific chunk processor for document creation and management."""
3from typing import Any
5from qdrant_loader.config import Settings
6from qdrant_loader.core.chunking.strategy.base.chunk_processor import BaseChunkProcessor
7from qdrant_loader.core.document import Document
10class TextChunkProcessor(BaseChunkProcessor):
11 """Chunk processor for text documents with enhanced text-specific processing."""
13 def __init__(self, settings: Settings):
14 super().__init__(settings)
15 # Get strategy-specific configuration
16 self.default_config = settings.global_config.chunking.strategies.default
18 def create_chunk_document(
19 self,
20 original_doc: Document,
21 chunk_content: str,
22 chunk_index: int,
23 total_chunks: int,
24 chunk_metadata: dict[str, Any],
25 skip_nlp: bool = False,
26 ) -> Document:
27 """Create a document for a text chunk with enhanced metadata."""
29 # Generate unique chunk ID
30 chunk_id = self.generate_chunk_id(original_doc, chunk_index)
32 # Create base metadata
33 base_metadata = self.create_base_chunk_metadata(
34 original_doc, chunk_index, total_chunks, chunk_metadata
35 )
37 # Add text-specific metadata
38 text_metadata = self._create_text_specific_metadata(chunk_content, original_doc)
39 base_metadata.update(text_metadata)
41 # Create chunk document
42 chunk_doc = Document(
43 id=chunk_id,
44 content=chunk_content,
45 metadata=base_metadata,
46 source=original_doc.source,
47 source_type=original_doc.source_type,
48 url=original_doc.url,
49 content_type=original_doc.content_type,
50 title=f"{original_doc.title} - Chunk {chunk_index + 1}",
51 )
53 return chunk_doc
55 def _create_text_specific_metadata(
56 self, content: str, original_doc: Document
57 ) -> dict[str, Any]:
58 """Create text-specific metadata for the chunk."""
59 metadata = {
60 "chunk_strategy": "text",
61 "processing_method": "intelligent_splitting",
62 "content_analysis": self._analyze_chunk_content(content),
63 "quality_metrics": self._calculate_quality_metrics(content),
64 "text_characteristics": self._extract_text_characteristics(content),
65 }
67 # Add semantic analysis indicators if enabled
68 if self.default_config.enable_semantic_analysis:
69 metadata["semantic_analysis_enabled"] = True
70 metadata["semantic_indicators"] = self._extract_semantic_indicators(content)
72 # Add entity extraction indicators if enabled
73 if self.default_config.enable_entity_extraction:
74 metadata["entity_extraction_enabled"] = True
75 metadata["entity_hints"] = self._extract_entity_hints(content)
77 return metadata
79 def _analyze_chunk_content(self, content: str) -> dict[str, Any]:
80 """Analyze the content structure and characteristics of the chunk."""
81 words = content.split()
82 sentences = content.split(".")
83 paragraphs = content.split("\n\n")
85 return {
86 "word_count": len(words),
87 "sentence_count": len([s for s in sentences if s.strip()]),
88 "paragraph_count": len([p for p in paragraphs if p.strip()]),
89 "avg_words_per_sentence": len(words)
90 / max(1, len([s for s in sentences if s.strip()])),
91 "character_count": len(content),
92 "content_density": (
93 len(content.replace(" ", "")) / len(content) if content else 0
94 ),
95 }
97 def _calculate_quality_metrics(self, content: str) -> dict[str, Any]:
98 """Calculate quality metrics for the chunk."""
99 words = content.split()
101 # Content completeness (does it end with proper punctuation?)
102 ends_properly = content.strip().endswith((".", "!", "?", ":", ";"))
104 # Content coherence (rough estimate based on word repetition)
105 unique_words = len({word.lower() for word in words})
106 word_diversity = unique_words / len(words) if words else 0
108 # Content readability (simple metric based on sentence structure)
109 sentences = [s.strip() for s in content.split(".") if s.strip()]
110 avg_sentence_length = (
111 sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
112 )
114 return {
115 "ends_properly": ends_properly,
116 "word_diversity": round(word_diversity, 3),
117 "avg_sentence_length": round(avg_sentence_length, 1),
118 "readability_score": self._estimate_readability(
119 avg_sentence_length, word_diversity
120 ),
121 "chunk_completeness": self._assess_chunk_completeness(content),
122 }
124 def _extract_text_characteristics(self, content: str) -> dict[str, Any]:
125 """Extract various text characteristics from the chunk."""
126 import re
128 return {
129 "has_numbers": bool(re.search(r"\d+", content)),
130 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)),
131 "has_urls": bool(re.search(r"https?://\S+", content)),
132 "has_email": bool(
133 re.search(
134 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content
135 )
136 ),
137 "has_phone": bool(re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content)),
138 "has_currency": bool(re.search(r"\$\d+(?:\.\d{2})?", content)),
139 "has_percentages": bool(re.search(r"\b\d+(?:\.\d+)?%\b", content)),
140 "has_quotes": '"' in content or "'" in content,
141 "has_parentheses": "(" in content and ")" in content,
142 "has_formatting": bool(re.search(r"[*_`#]", content)),
143 "language_indicators": self._detect_language_indicators(content),
144 }
146 def _extract_semantic_indicators(self, content: str) -> dict[str, Any]:
147 """Extract indicators for semantic analysis."""
148 import re
150 # Topic indicators (simple keyword-based)
151 business_keywords = [
152 "company",
153 "business",
154 "market",
155 "revenue",
156 "profit",
157 "customer",
158 ]
159 tech_keywords = [
160 "software",
161 "technology",
162 "system",
163 "data",
164 "algorithm",
165 "code",
166 ]
167 academic_keywords = [
168 "research",
169 "study",
170 "analysis",
171 "theory",
172 "methodology",
173 "conclusion",
174 ]
176 content_lower = content.lower()
178 return {
179 "topic_indicators": {
180 "business": sum(1 for kw in business_keywords if kw in content_lower),
181 "technology": sum(1 for kw in tech_keywords if kw in content_lower),
182 "academic": sum(1 for kw in academic_keywords if kw in content_lower),
183 },
184 "discourse_markers": {
185 "enumeration": bool(
186 re.search(r"\b(first|second|third|finally|lastly)\b", content_lower)
187 ),
188 "causation": bool(
189 re.search(
190 r"\b(because|therefore|thus|consequently|as a result)\b",
191 content_lower,
192 )
193 ),
194 "contrast": bool(
195 re.search(
196 r"\b(however|although|despite|nevertheless|on the other hand)\b",
197 content_lower,
198 )
199 ),
200 "comparison": bool(
201 re.search(
202 r"\b(similarly|likewise|compared to|in contrast)\b",
203 content_lower,
204 )
205 ),
206 },
207 "complexity_indicators": {
208 "has_subordinate_clauses": bool(
209 re.search(r"\b(which|that|who|whom|whose)\b", content_lower)
210 ),
211 "has_conditionals": bool(
212 re.search(r"\b(if|unless|provided|assuming)\b", content_lower)
213 ),
214 "has_temporal_references": bool(
215 re.search(r"\b(when|while|before|after|during)\b", content_lower)
216 ),
217 },
218 }
220 def _extract_entity_hints(self, content: str) -> dict[str, Any]:
221 """Extract hints for entity extraction."""
222 import re
224 # Potential entity patterns
225 proper_nouns = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", content)
226 acronyms = re.findall(r"\b[A-Z]{2,}\b", content)
228 return {
229 "proper_noun_count": len(proper_nouns),
230 "acronym_count": len(acronyms),
231 "capitalized_words": len(re.findall(r"\b[A-Z][a-z]+\b", content)),
232 "potential_names": len(
233 [noun for noun in proper_nouns if len(noun.split()) <= 3]
234 ),
235 "potential_organizations": len(
236 [noun for noun in proper_nouns if len(noun.split()) > 1]
237 ),
238 "has_titles": bool(
239 re.search(r"\b(Dr|Mr|Mrs|Ms|Prof|CEO|CTO|VP)\b\.?\s+[A-Z]", content)
240 ),
241 "has_locations": bool(
242 re.search(
243 r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:City|State|Country|Street|Ave|Road))\b",
244 content,
245 )
246 ),
247 }
249 def _estimate_readability(
250 self, avg_sentence_length: float, word_diversity: float
251 ) -> str:
252 """Estimate readability level based on simple metrics."""
253 if avg_sentence_length < 10 and word_diversity > 0.7:
254 return "easy"
255 elif avg_sentence_length < 20 and word_diversity > 0.5:
256 return "moderate"
257 elif avg_sentence_length < 30:
258 return "difficult"
259 else:
260 return "very_difficult"
262 def _assess_chunk_completeness(self, content: str) -> float:
263 """Assess how complete/coherent the chunk appears to be."""
264 score = 0.0
266 # Check for proper sentence endings
267 if content.strip().endswith((".", "!", "?")):
268 score += 0.3
270 # Check for proper sentence beginnings
271 if content.strip() and content.strip()[0].isupper():
272 score += 0.2
274 # Check for balanced punctuation
275 open_parens = content.count("(")
276 close_parens = content.count(")")
277 open_quotes = content.count('"') + content.count("'")
279 if open_parens == close_parens:
280 score += 0.2
281 if open_quotes % 2 == 0: # Even number of quotes
282 score += 0.1
284 # Check for paragraph structure
285 if "\n\n" in content or len(content.split(".")) > 1:
286 score += 0.2
288 return min(1.0, score)
290 def _detect_language_indicators(self, content: str) -> dict[str, Any]:
291 """Detect language indicators in the content."""
292 content_lower = content.lower()
294 # Common English function words
295 english_indicators = [
296 "the",
297 "and",
298 "or",
299 "but",
300 "in",
301 "on",
302 "at",
303 "to",
304 "for",
305 "of",
306 "with",
307 ]
308 english_count = sum(
309 1 for word in english_indicators if f" {word} " in f" {content_lower} "
310 )
312 return {
313 "english_function_words": english_count,
314 "likely_english": english_count >= 3,
315 "punctuation_style": "american" if ". " in content else "other",
316 }