Coverage for src/qdrant_loader/core/chunking/strategy/default/text_section_splitter.py: 92%

141 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Text-specific section splitter for intelligent text chunking.""" 

2 

3import re 

4from typing import Any 

5 

6from qdrant_loader.config import Settings 

7from qdrant_loader.core.chunking.strategy.base.section_splitter import ( 

8 BaseSectionSplitter, 

9) 

10from qdrant_loader.core.document import Document 

11 

12 

13class TextSectionSplitter(BaseSectionSplitter): 

14 """Section splitter for text documents with intelligent boundary detection.""" 

15 

16 def __init__(self, settings: Settings): 

17 super().__init__(settings) 

18 # Get strategy-specific configuration 

19 self.default_config = settings.global_config.chunking.strategies.default 

20 self.min_chunk_size = self.default_config.min_chunk_size 

21 

22 def split_sections( 

23 self, content: str, document: Document | None = None 

24 ) -> list[dict[str, Any]]: 

25 """Split text content into intelligent sections.""" 

26 if not content.strip(): 

27 # For empty content, return a single empty section for compatibility 

28 if content == "": 

29 return [ 

30 { 

31 "content": "", 

32 "metadata": { 

33 "section_type": "empty", 

34 "paragraph_index": 0, 

35 "word_count": 0, 

36 "has_formatting": False, 

37 "content_characteristics": { 

38 "sentence_count": 0, 

39 "avg_sentence_length": 0, 

40 "has_questions": False, 

41 "has_exclamations": False, 

42 "capitalization_ratio": 0, 

43 "number_count": 0, 

44 }, 

45 }, 

46 } 

47 ] 

48 return [] 

49 

50 # First, try to split by natural boundaries (paragraphs) 

51 sections = self._split_by_paragraphs(content) 

52 

53 # If sections are too large, split them further 

54 final_sections = [] 

55 for section in sections: 

56 if len(section["content"]) > self.chunk_size: 

57 subsections = self._split_large_section(section["content"]) 

58 for i, subsection in enumerate(subsections): 

59 final_sections.append( 

60 { 

61 "content": subsection, 

62 "metadata": { 

63 **section["metadata"], 

64 "subsection_index": i, 

65 "is_subsection": True, 

66 "original_section_size": len(section["content"]), 

67 }, 

68 } 

69 ) 

70 else: 

71 final_sections.append(section) 

72 

73 # Merge small adjacent sections if beneficial 

74 merged_sections = self._merge_small_sections(final_sections) 

75 

76 return merged_sections[: self.max_chunks_per_document] 

77 

78 def _split_by_paragraphs(self, content: str) -> list[dict[str, Any]]: 

79 """Split content by paragraph boundaries.""" 

80 paragraphs = re.split(r"\n\s*\n", content) 

81 sections = [] 

82 

83 for i, paragraph in enumerate(paragraphs): 

84 if not paragraph.strip(): 

85 continue 

86 

87 sections.append( 

88 { 

89 "content": paragraph.strip(), 

90 "metadata": { 

91 "section_type": "paragraph", 

92 "paragraph_index": i, 

93 "word_count": len(paragraph.split()), 

94 "has_formatting": self._has_special_formatting(paragraph), 

95 "content_characteristics": self._analyze_paragraph_content( 

96 paragraph 

97 ), 

98 }, 

99 } 

100 ) 

101 

102 return sections 

103 

104 def _split_large_section(self, content: str) -> list[str]: 

105 """Split large sections using intelligent boundary detection.""" 

106 if len(content) <= self.chunk_size: 

107 return [content] 

108 

109 chunks = [] 

110 remaining = content 

111 previous_length = len(remaining) 

112 

113 while len(remaining) > self.chunk_size: 

114 # Find the best split point within the chunk size limit 

115 split_point = self._find_best_split_point(remaining, self.chunk_size) 

116 

117 if split_point <= 0: 

118 # Fallback: split at chunk size boundary 

119 split_point = self.chunk_size 

120 

121 chunk = remaining[:split_point].strip() 

122 if chunk: 

123 chunks.append(chunk) 

124 

125 # Move to next chunk with overlap if configured 

126 # Ensure we always make meaningful progress to prevent infinite loops 

127 overlap_start = max(0, split_point - self.chunk_overlap) 

128 

129 # Safety check: ensure we advance at least min_chunk_size characters 

130 # This prevents infinite loops when overlap is too large 

131 min_advance = max(self.min_chunk_size, split_point // 2) 

132 overlap_start = min(overlap_start, split_point - min_advance) 

133 

134 remaining = remaining[overlap_start:].strip() 

135 

136 # Prevent infinite loops - ensure we're making progress 

137 if len(remaining) >= previous_length: 

138 # Force progress by advancing more aggressively 

139 remaining = remaining[min_advance:].strip() 

140 

141 previous_length = len(remaining) 

142 

143 # Additional safety: break if remaining content is small 

144 if len(remaining) <= self.min_chunk_size: 

145 break 

146 

147 # Add remaining content if substantial 

148 if remaining.strip() and len(remaining.strip()) >= self.min_chunk_size: 

149 chunks.append(remaining.strip()) 

150 

151 return chunks 

152 

153 def _find_best_split_point(self, content: str, max_size: int) -> int: 

154 """Find the best point to split content within the size limit.""" 

155 if len(content) <= max_size: 

156 return len(content) 

157 

158 # Try tokenizer-based boundary detection if available 

159 tokenizer_split = self._find_tokenizer_boundary(content, max_size) 

160 if tokenizer_split > 0: 

161 return tokenizer_split 

162 

163 # Search window for optimal split point 

164 search_start = max(0, max_size - 200) 

165 search_end = min(len(content), max_size) 

166 search_text = content[search_start:search_end] 

167 

168 # Priority order for split points 

169 split_patterns = [ 

170 (r"\.\s+(?=[A-Z])", "sentence_end"), # Sentence boundaries 

171 (r"\n\s*\n", "paragraph_break"), # Paragraph breaks 

172 (r"\n(?=\s*[•\-\*\d])", "list_item"), # List item boundaries 

173 (r"\.\s", "sentence_fragment"), # Sentence fragments 

174 (r"[,;]\s+", "clause_boundary"), # Clause boundaries 

175 (r"\s+", "word_boundary"), # Word boundaries 

176 ] 

177 

178 best_split = 0 

179 best_score = -1 

180 

181 for pattern, split_type in split_patterns: 

182 matches = list(re.finditer(pattern, search_text)) 

183 if not matches: 

184 continue 

185 

186 for match in reversed(matches): # Start from the end 

187 split_pos = search_start + match.end() 

188 

189 # Score the split point 

190 score = self._score_split_point(content, split_pos, split_type) 

191 

192 if score > best_score: 

193 best_score = score 

194 best_split = split_pos 

195 

196 return best_split if best_split > 0 else max_size 

197 

198 def _find_tokenizer_boundary(self, content: str, max_size: int) -> int: 

199 """Use tokenizer to find optimal boundary if available.""" 

200 try: 

201 # Access the encoding from the parent strategy if available 

202 parent_strategy = getattr(self, "_parent_strategy", None) 

203 if ( 

204 not parent_strategy 

205 or not hasattr(parent_strategy, "encoding") 

206 or not parent_strategy.encoding 

207 ): 

208 return 0 

209 

210 encoding = parent_strategy.encoding 

211 

212 # Get tokens for the content up to max_size 

213 text_to_encode = content[:max_size] 

214 tokens = encoding.encode(text_to_encode) 

215 

216 # Find a good boundary by decoding back from slightly fewer tokens 

217 if len(tokens) > 10: # Only if we have enough tokens 

218 boundary_tokens = tokens[ 

219 :-5 

220 ] # Remove last few tokens to find clean boundary 

221 decoded_text = encoding.decode(boundary_tokens) 

222 

223 # Find where the decoded text ends in the original content 

224 if decoded_text and decoded_text in content: 

225 return len(decoded_text) 

226 

227 return 0 

228 except Exception: 

229 # If tokenizer boundary detection fails, fall back to regex patterns 

230 return 0 

231 

232 def _score_split_point( 

233 self, content: str, split_pos: int, split_type: str 

234 ) -> float: 

235 """Score a potential split point based on quality criteria.""" 

236 if split_pos <= 0 or split_pos >= len(content): 

237 return 0.0 

238 

239 score = 0.0 

240 

241 # Base score by split type quality 

242 type_scores = { 

243 "sentence_end": 1.0, 

244 "paragraph_break": 0.9, 

245 "list_item": 0.8, 

246 "sentence_fragment": 0.6, 

247 "clause_boundary": 0.4, 

248 "word_boundary": 0.2, 

249 } 

250 score += type_scores.get(split_type, 0.1) 

251 

252 # Bonus for balanced chunk sizes 

253 left_size = split_pos 

254 right_size = len(content) - split_pos 

255 size_ratio = min(left_size, right_size) / max(left_size, right_size) 

256 score += size_ratio * 0.3 

257 

258 # Penalty for very small chunks 

259 if left_size < self.min_chunk_size: 

260 score -= 0.5 

261 

262 return score 

263 

264 def _merge_small_sections( 

265 self, sections: list[dict[str, Any]] 

266 ) -> list[dict[str, Any]]: 

267 """Merge small adjacent sections for better chunk utilization.""" 

268 if not sections: 

269 return [] 

270 

271 merged = [] 

272 current_content = "" 

273 current_metadata = None 

274 accumulated_word_count = 0 

275 

276 for section in sections: 

277 content = section["content"] 

278 word_count = len(content.split()) 

279 

280 # If current section is large enough or we have no accumulated content 

281 if (len(content) >= self.min_chunk_size and not current_content) or len( 

282 current_content + " " + content 

283 ) > self.chunk_size: 

284 

285 # Save current accumulated content if any 

286 if current_content: 

287 merged.append( 

288 { 

289 "content": current_content.strip(), 

290 "metadata": { 

291 **current_metadata, 

292 "merged_sections": True, 

293 "total_word_count": accumulated_word_count, 

294 }, 

295 } 

296 ) 

297 

298 # Start new section 

299 current_content = content 

300 current_metadata = section["metadata"].copy() 

301 accumulated_word_count = word_count 

302 else: 

303 # Merge with current content 

304 if current_content: 

305 current_content += "\n\n" + content 

306 current_metadata["merged_sections"] = True 

307 accumulated_word_count += word_count 

308 else: 

309 current_content = content 

310 current_metadata = section["metadata"].copy() 

311 accumulated_word_count = word_count 

312 

313 # Add final accumulated content 

314 if current_content: 

315 merged.append( 

316 { 

317 "content": current_content.strip(), 

318 "metadata": { 

319 **current_metadata, 

320 "total_word_count": accumulated_word_count, 

321 }, 

322 } 

323 ) 

324 

325 return merged 

326 

327 def _has_special_formatting(self, text: str) -> bool: 

328 """Check if text has special formatting indicators.""" 

329 formatting_patterns = [ 

330 r"^\s*[•\-\*]\s", # Bullet points 

331 r"^\s*\d+\.\s", # Numbered lists 

332 r"[A-Z][A-Z\s]{2,}", # All caps (headings) 

333 r"\*\*.*?\*\*", # Bold text 

334 r"_.*?_", # Italic text 

335 r"`.*?`", # Code formatting 

336 ] 

337 

338 return any( 

339 re.search(pattern, text, re.MULTILINE) for pattern in formatting_patterns 

340 ) 

341 

342 def _analyze_paragraph_content(self, paragraph: str) -> dict[str, Any]: 

343 """Analyze paragraph content characteristics.""" 

344 return { 

345 "sentence_count": len(re.split(r"[.!?]+", paragraph)), 

346 "avg_sentence_length": len(paragraph) 

347 / max(1, len(re.split(r"[.!?]+", paragraph))), 

348 "has_questions": "?" in paragraph, 

349 "has_exclamations": "!" in paragraph, 

350 "capitalization_ratio": len(re.findall(r"[A-Z]", paragraph)) 

351 / max(1, len(paragraph)), 

352 "number_count": len(re.findall(r"\d+", paragraph)), 

353 }