Coverage for src/qdrant_loader/core/chunking/strategy/default/text_section_splitter.py: 92%
141 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Text-specific section splitter for intelligent text chunking."""
3import re
4from typing import Any
6from qdrant_loader.config import Settings
7from qdrant_loader.core.chunking.strategy.base.section_splitter import (
8 BaseSectionSplitter,
9)
10from qdrant_loader.core.document import Document
13class TextSectionSplitter(BaseSectionSplitter):
14 """Section splitter for text documents with intelligent boundary detection."""
16 def __init__(self, settings: Settings):
17 super().__init__(settings)
18 # Get strategy-specific configuration
19 self.default_config = settings.global_config.chunking.strategies.default
20 self.min_chunk_size = self.default_config.min_chunk_size
22 def split_sections(
23 self, content: str, document: Document | None = None
24 ) -> list[dict[str, Any]]:
25 """Split text content into intelligent sections."""
26 if not content.strip():
27 # For empty content, return a single empty section for compatibility
28 if content == "":
29 return [
30 {
31 "content": "",
32 "metadata": {
33 "section_type": "empty",
34 "paragraph_index": 0,
35 "word_count": 0,
36 "has_formatting": False,
37 "content_characteristics": {
38 "sentence_count": 0,
39 "avg_sentence_length": 0,
40 "has_questions": False,
41 "has_exclamations": False,
42 "capitalization_ratio": 0,
43 "number_count": 0,
44 },
45 },
46 }
47 ]
48 return []
50 # First, try to split by natural boundaries (paragraphs)
51 sections = self._split_by_paragraphs(content)
53 # If sections are too large, split them further
54 final_sections = []
55 for section in sections:
56 if len(section["content"]) > self.chunk_size:
57 subsections = self._split_large_section(section["content"])
58 for i, subsection in enumerate(subsections):
59 final_sections.append(
60 {
61 "content": subsection,
62 "metadata": {
63 **section["metadata"],
64 "subsection_index": i,
65 "is_subsection": True,
66 "original_section_size": len(section["content"]),
67 },
68 }
69 )
70 else:
71 final_sections.append(section)
73 # Merge small adjacent sections if beneficial
74 merged_sections = self._merge_small_sections(final_sections)
76 return merged_sections[: self.max_chunks_per_document]
78 def _split_by_paragraphs(self, content: str) -> list[dict[str, Any]]:
79 """Split content by paragraph boundaries."""
80 paragraphs = re.split(r"\n\s*\n", content)
81 sections = []
83 for i, paragraph in enumerate(paragraphs):
84 if not paragraph.strip():
85 continue
87 sections.append(
88 {
89 "content": paragraph.strip(),
90 "metadata": {
91 "section_type": "paragraph",
92 "paragraph_index": i,
93 "word_count": len(paragraph.split()),
94 "has_formatting": self._has_special_formatting(paragraph),
95 "content_characteristics": self._analyze_paragraph_content(
96 paragraph
97 ),
98 },
99 }
100 )
102 return sections
104 def _split_large_section(self, content: str) -> list[str]:
105 """Split large sections using intelligent boundary detection."""
106 if len(content) <= self.chunk_size:
107 return [content]
109 chunks = []
110 remaining = content
111 previous_length = len(remaining)
113 while len(remaining) > self.chunk_size:
114 # Find the best split point within the chunk size limit
115 split_point = self._find_best_split_point(remaining, self.chunk_size)
117 if split_point <= 0:
118 # Fallback: split at chunk size boundary
119 split_point = self.chunk_size
121 chunk = remaining[:split_point].strip()
122 if chunk:
123 chunks.append(chunk)
125 # Move to next chunk with overlap if configured
126 # Ensure we always make meaningful progress to prevent infinite loops
127 overlap_start = max(0, split_point - self.chunk_overlap)
129 # Safety check: ensure we advance at least min_chunk_size characters
130 # This prevents infinite loops when overlap is too large
131 min_advance = max(self.min_chunk_size, split_point // 2)
132 overlap_start = min(overlap_start, split_point - min_advance)
134 remaining = remaining[overlap_start:].strip()
136 # Prevent infinite loops - ensure we're making progress
137 if len(remaining) >= previous_length:
138 # Force progress by advancing more aggressively
139 remaining = remaining[min_advance:].strip()
141 previous_length = len(remaining)
143 # Additional safety: break if remaining content is small
144 if len(remaining) <= self.min_chunk_size:
145 break
147 # Add remaining content if substantial
148 if remaining.strip() and len(remaining.strip()) >= self.min_chunk_size:
149 chunks.append(remaining.strip())
151 return chunks
153 def _find_best_split_point(self, content: str, max_size: int) -> int:
154 """Find the best point to split content within the size limit."""
155 if len(content) <= max_size:
156 return len(content)
158 # Try tokenizer-based boundary detection if available
159 tokenizer_split = self._find_tokenizer_boundary(content, max_size)
160 if tokenizer_split > 0:
161 return tokenizer_split
163 # Search window for optimal split point
164 search_start = max(0, max_size - 200)
165 search_end = min(len(content), max_size)
166 search_text = content[search_start:search_end]
168 # Priority order for split points
169 split_patterns = [
170 (r"\.\s+(?=[A-Z])", "sentence_end"), # Sentence boundaries
171 (r"\n\s*\n", "paragraph_break"), # Paragraph breaks
172 (r"\n(?=\s*[•\-\*\d])", "list_item"), # List item boundaries
173 (r"\.\s", "sentence_fragment"), # Sentence fragments
174 (r"[,;]\s+", "clause_boundary"), # Clause boundaries
175 (r"\s+", "word_boundary"), # Word boundaries
176 ]
178 best_split = 0
179 best_score = -1
181 for pattern, split_type in split_patterns:
182 matches = list(re.finditer(pattern, search_text))
183 if not matches:
184 continue
186 for match in reversed(matches): # Start from the end
187 split_pos = search_start + match.end()
189 # Score the split point
190 score = self._score_split_point(content, split_pos, split_type)
192 if score > best_score:
193 best_score = score
194 best_split = split_pos
196 return best_split if best_split > 0 else max_size
198 def _find_tokenizer_boundary(self, content: str, max_size: int) -> int:
199 """Use tokenizer to find optimal boundary if available."""
200 try:
201 # Access the encoding from the parent strategy if available
202 parent_strategy = getattr(self, "_parent_strategy", None)
203 if (
204 not parent_strategy
205 or not hasattr(parent_strategy, "encoding")
206 or not parent_strategy.encoding
207 ):
208 return 0
210 encoding = parent_strategy.encoding
212 # Get tokens for the content up to max_size
213 text_to_encode = content[:max_size]
214 tokens = encoding.encode(text_to_encode)
216 # Find a good boundary by decoding back from slightly fewer tokens
217 if len(tokens) > 10: # Only if we have enough tokens
218 boundary_tokens = tokens[
219 :-5
220 ] # Remove last few tokens to find clean boundary
221 decoded_text = encoding.decode(boundary_tokens)
223 # Find where the decoded text ends in the original content
224 if decoded_text and decoded_text in content:
225 return len(decoded_text)
227 return 0
228 except Exception:
229 # If tokenizer boundary detection fails, fall back to regex patterns
230 return 0
232 def _score_split_point(
233 self, content: str, split_pos: int, split_type: str
234 ) -> float:
235 """Score a potential split point based on quality criteria."""
236 if split_pos <= 0 or split_pos >= len(content):
237 return 0.0
239 score = 0.0
241 # Base score by split type quality
242 type_scores = {
243 "sentence_end": 1.0,
244 "paragraph_break": 0.9,
245 "list_item": 0.8,
246 "sentence_fragment": 0.6,
247 "clause_boundary": 0.4,
248 "word_boundary": 0.2,
249 }
250 score += type_scores.get(split_type, 0.1)
252 # Bonus for balanced chunk sizes
253 left_size = split_pos
254 right_size = len(content) - split_pos
255 size_ratio = min(left_size, right_size) / max(left_size, right_size)
256 score += size_ratio * 0.3
258 # Penalty for very small chunks
259 if left_size < self.min_chunk_size:
260 score -= 0.5
262 return score
264 def _merge_small_sections(
265 self, sections: list[dict[str, Any]]
266 ) -> list[dict[str, Any]]:
267 """Merge small adjacent sections for better chunk utilization."""
268 if not sections:
269 return []
271 merged = []
272 current_content = ""
273 current_metadata = None
274 accumulated_word_count = 0
276 for section in sections:
277 content = section["content"]
278 word_count = len(content.split())
280 # If current section is large enough or we have no accumulated content
281 if (len(content) >= self.min_chunk_size and not current_content) or len(
282 current_content + " " + content
283 ) > self.chunk_size:
285 # Save current accumulated content if any
286 if current_content:
287 merged.append(
288 {
289 "content": current_content.strip(),
290 "metadata": {
291 **current_metadata,
292 "merged_sections": True,
293 "total_word_count": accumulated_word_count,
294 },
295 }
296 )
298 # Start new section
299 current_content = content
300 current_metadata = section["metadata"].copy()
301 accumulated_word_count = word_count
302 else:
303 # Merge with current content
304 if current_content:
305 current_content += "\n\n" + content
306 current_metadata["merged_sections"] = True
307 accumulated_word_count += word_count
308 else:
309 current_content = content
310 current_metadata = section["metadata"].copy()
311 accumulated_word_count = word_count
313 # Add final accumulated content
314 if current_content:
315 merged.append(
316 {
317 "content": current_content.strip(),
318 "metadata": {
319 **current_metadata,
320 "total_word_count": accumulated_word_count,
321 },
322 }
323 )
325 return merged
327 def _has_special_formatting(self, text: str) -> bool:
328 """Check if text has special formatting indicators."""
329 formatting_patterns = [
330 r"^\s*[•\-\*]\s", # Bullet points
331 r"^\s*\d+\.\s", # Numbered lists
332 r"[A-Z][A-Z\s]{2,}", # All caps (headings)
333 r"\*\*.*?\*\*", # Bold text
334 r"_.*?_", # Italic text
335 r"`.*?`", # Code formatting
336 ]
338 return any(
339 re.search(pattern, text, re.MULTILINE) for pattern in formatting_patterns
340 )
342 def _analyze_paragraph_content(self, paragraph: str) -> dict[str, Any]:
343 """Analyze paragraph content characteristics."""
344 return {
345 "sentence_count": len(re.split(r"[.!?]+", paragraph)),
346 "avg_sentence_length": len(paragraph)
347 / max(1, len(re.split(r"[.!?]+", paragraph))),
348 "has_questions": "?" in paragraph,
349 "has_exclamations": "!" in paragraph,
350 "capitalization_ratio": len(re.findall(r"[A-Z]", paragraph))
351 / max(1, len(paragraph)),
352 "number_count": len(re.findall(r"\d+", paragraph)),
353 }