Coverage for src/qdrant_loader/core/chunking/strategy/default_strategy.py: 79%

156 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Default chunking strategy for text documents. 

2 

3This strategy uses character-based chunking for consistency with other strategies. 

4When a tokenizer is available, it's used for better boundary detection to avoid 

5splitting in the middle of tokens, but the chunk size limits are still based on 

6character count. 

7""" 

8 

9import re 

10 

11import structlog 

12 

13from qdrant_loader.config import Settings 

14from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

15from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

16from qdrant_loader.core.document import Document 

17 

18logger = structlog.get_logger(__name__) 

19 

20# Maximum number of chunks to process to prevent performance issues 

21MAX_CHUNKS_TO_PROCESS = 1000 

22 

23 

24class DefaultChunkingStrategy(BaseChunkingStrategy): 

25 """Default text chunking strategy using character-based splitting. 

26  

27 This strategy splits text into chunks based on character count, ensuring 

28 consistency with other chunking strategies like the markdown strategy. 

29 When a tokenizer is available, it's used to find better split boundaries 

30 (avoiding splits in the middle of tokens/words), but the size limits 

31 are always based on character count. 

32 """ 

33 

34 def __init__(self, settings: Settings): 

35 super().__init__(settings) 

36 self.progress_tracker = ChunkingProgressTracker(logger) 

37 

38 # Log configuration for debugging 

39 logger.info( 

40 "DefaultChunkingStrategy initialized", 

41 chunk_size=self.chunk_size, 

42 chunk_overlap=self.chunk_overlap, 

43 tokenizer=self.tokenizer, 

44 has_encoding=self.encoding is not None, 

45 chunking_method="character-based" + (" with token boundary detection" if self.encoding is not None else ""), 

46 ) 

47 

48 # Warn about suspiciously small chunk sizes 

49 if self.chunk_size < 100: 

50 logger.warning( 

51 f"Very small chunk_size detected: {self.chunk_size} characters. " 

52 f"This may cause performance issues and excessive chunking. " 

53 f"Consider using a larger value (e.g., 1000-1500 characters)." 

54 ) 

55 

56 def _split_text(self, text: str) -> list[str]: 

57 """Split text into chunks using sentence boundaries and size limits. 

58 

59 Args: 

60 text: The text to split 

61 

62 Returns: 

63 List of text chunks 

64 """ 

65 if not text.strip(): 

66 return [""] 

67 

68 # Use tokenizer-based chunking if available 

69 if self.encoding is not None: 

70 return self._split_text_with_tokenizer(text) 

71 else: 

72 return self._split_text_without_tokenizer(text) 

73 

74 def _split_text_with_tokenizer(self, text: str) -> list[str]: 

75 """Split text using tokenizer for accurate token boundary detection but character-based sizing. 

76 

77 Args: 

78 text: The text to split 

79 

80 Returns: 

81 List of text chunks 

82 """ 

83 if self.encoding is None: 

84 # Fallback to character-based chunking 

85 return self._split_text_without_tokenizer(text) 

86 

87 # Use character-based size limit (consistent with markdown strategy) 

88 if len(text) <= self.chunk_size: 

89 return [text] 

90 

91 chunks = [] 

92 start_char = 0 

93 

94 while start_char < len(text) and len(chunks) < MAX_CHUNKS_TO_PROCESS: 

95 # Calculate end position based on character count 

96 end_char = min(start_char + self.chunk_size, len(text)) 

97 

98 # Get the chunk text 

99 chunk_text = text[start_char:end_char] 

100 

101 # If we're not at the end of the text, try to find a good token boundary 

102 # to avoid splitting in the middle of words/tokens 

103 if end_char < len(text): 

104 # Try to encode/decode to find a clean token boundary 

105 try: 

106 # Get tokens for the chunk 

107 tokens = self.encoding.encode(chunk_text) 

108 

109 # If the chunk ends mid-token, back up to the last complete token 

110 # by decoding and checking if we get the same text 

111 decoded_text = self.encoding.decode(tokens) 

112 if len(decoded_text) < len(chunk_text): 

113 # The last token was incomplete, use the decoded text 

114 chunk_text = decoded_text 

115 end_char = start_char + len(chunk_text) 

116 

117 # Alternatively, try to find a word boundary near the target end 

118 remaining_chars = self.chunk_size - len(chunk_text) 

119 if remaining_chars > 0 and end_char < len(text): 

120 # Look ahead a bit to find a word boundary 

121 lookahead_end = min(end_char + min(100, remaining_chars), len(text)) 

122 lookahead_text = text[end_char:lookahead_end] 

123 

124 # Find the first word boundary (space, newline, punctuation) 

125 import re 

126 word_boundary_match = re.search(r'[\s\n\.\!\?\;]', lookahead_text) 

127 if word_boundary_match: 

128 boundary_pos = word_boundary_match.start() 

129 end_char = end_char + boundary_pos + 1 

130 chunk_text = text[start_char:end_char] 

131 

132 except Exception: 

133 # If tokenizer operations fail, stick with character-based splitting 

134 pass 

135 

136 chunks.append(chunk_text) 

137 

138 # Calculate overlap in characters and move start position forward 

139 if self.chunk_overlap > 0 and end_char < len(text): 

140 # Calculate how much to advance (chunk_size - overlap) 

141 advance = max(1, self.chunk_size - self.chunk_overlap) 

142 start_char += advance 

143 else: 

144 # No overlap, advance by full chunk size 

145 start_char = end_char 

146 

147 # If we're near the end and the remaining text is small, include it in the last chunk 

148 if start_char < len(text) and (len(text) - start_char) <= self.chunk_overlap: 

149 # Create final chunk with remaining text 

150 final_chunk_text = text[start_char:] 

151 if final_chunk_text.strip(): # Only add if there's meaningful content 

152 chunks.append(final_chunk_text) 

153 break 

154 

155 # Log warning if we hit the chunk limit 

156 if len(chunks) >= MAX_CHUNKS_TO_PROCESS and start_char < len(text): 

157 logger.warning( 

158 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. " 

159 f"Document may be truncated." 

160 ) 

161 

162 return chunks 

163 

164 def _split_text_without_tokenizer(self, text: str) -> list[str]: 

165 """Split text without tokenizer using character-based chunking. 

166 

167 Args: 

168 text: The text to split 

169 

170 Returns: 

171 List of text chunks 

172 """ 

173 # Safety check: if chunk_size is invalid, use a reasonable default 

174 if self.chunk_size <= 0: 

175 logger.warning(f"Invalid chunk_size {self.chunk_size}, using default 1000") 

176 effective_chunk_size = 1000 

177 else: 

178 effective_chunk_size = self.chunk_size 

179 

180 if len(text) <= effective_chunk_size: 

181 return [text] 

182 

183 # First, try to split by paragraphs (double newlines) 

184 paragraphs = re.split(r"\n\s*\n", text.strip()) 

185 chunks = [] 

186 current_chunk = "" 

187 

188 for paragraph in paragraphs: 

189 paragraph = paragraph.strip() 

190 if not paragraph: 

191 continue 

192 

193 # If adding this paragraph would exceed chunk size, finalize current chunk 

194 if ( 

195 current_chunk 

196 and len(current_chunk) + len(paragraph) + 2 > effective_chunk_size 

197 ): 

198 chunks.append(current_chunk.strip()) 

199 current_chunk = paragraph 

200 else: 

201 if current_chunk: 

202 current_chunk += "\n\n" + paragraph 

203 else: 

204 current_chunk = paragraph 

205 

206 # Add the last chunk if it exists 

207 if current_chunk.strip(): 

208 chunks.append(current_chunk.strip()) 

209 

210 # If we still have chunks that are too large, split them further 

211 final_chunks = [] 

212 for chunk in chunks: 

213 if len(chunk) <= effective_chunk_size: 

214 final_chunks.append(chunk) 

215 else: 

216 # Split large chunks by sentences 

217 sentences = re.split(r"(?<=[.!?])\s+", chunk) 

218 current_subchunk = "" 

219 

220 for sentence in sentences: 

221 if ( 

222 current_subchunk 

223 and len(current_subchunk) + len(sentence) + 1 

224 > effective_chunk_size 

225 ): 

226 if current_subchunk.strip(): 

227 final_chunks.append(current_subchunk.strip()) 

228 current_subchunk = sentence 

229 else: 

230 if current_subchunk: 

231 current_subchunk += " " + sentence 

232 else: 

233 current_subchunk = sentence 

234 

235 if current_subchunk.strip(): 

236 final_chunks.append(current_subchunk.strip()) 

237 

238 # Final fallback: if chunks are still too large, split by character count 

239 result_chunks = [] 

240 for chunk in final_chunks: 

241 if len(chunk) <= effective_chunk_size: 

242 result_chunks.append(chunk) 

243 else: 

244 # Split by character count with word boundaries 

245 words = chunk.split() 

246 current_word_chunk = "" 

247 

248 for word in words: 

249 if ( 

250 current_word_chunk 

251 and len(current_word_chunk) + len(word) + 1 

252 > effective_chunk_size 

253 ): 

254 if current_word_chunk.strip(): 

255 result_chunks.append(current_word_chunk.strip()) 

256 current_word_chunk = word 

257 else: 

258 if current_word_chunk: 

259 current_word_chunk += " " + word 

260 else: 

261 current_word_chunk = word 

262 

263 if current_word_chunk.strip(): 

264 result_chunks.append(current_word_chunk.strip()) 

265 

266 # Ultimate fallback: if chunks are still too large (no word boundaries), split by character count 

267 final_result_chunks = [] 

268 for chunk in result_chunks: 

269 if len(chunk) <= effective_chunk_size: 

270 final_result_chunks.append(chunk) 

271 else: 

272 # Split by pure character count as last resort 

273 for i in range(0, len(chunk), effective_chunk_size): 

274 char_chunk = chunk[i : i + effective_chunk_size] 

275 if char_chunk.strip(): 

276 final_result_chunks.append(char_chunk) 

277 

278 # Safety check: if we somehow generated too many chunks from a small document, something is wrong 

279 if len(text) < 1000 and len(final_result_chunks) > 100: 

280 logger.error( 

281 f"Suspicious chunking result: {len(text)} chars generated {len(final_result_chunks)} chunks. " 

282 f"Chunk size: {effective_chunk_size}. Returning single chunk as fallback." 

283 ) 

284 return [text] 

285 

286 # Apply chunk limit 

287 if len(final_result_chunks) > MAX_CHUNKS_TO_PROCESS: 

288 logger.warning( 

289 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. " 

290 f"Document may be truncated. Text length: {len(text)}, Chunk size: {effective_chunk_size}" 

291 ) 

292 final_result_chunks = final_result_chunks[:MAX_CHUNKS_TO_PROCESS] 

293 

294 return [chunk for chunk in final_result_chunks if chunk.strip()] 

295 

296 def chunk_document(self, document: Document) -> list[Document]: 

297 """Split a document into chunks while preserving metadata. 

298 

299 Args: 

300 document: The document to chunk 

301 

302 Returns: 

303 List of chunked documents with preserved metadata 

304 """ 

305 file_name = ( 

306 document.metadata.get("file_name") 

307 or document.metadata.get("original_filename") 

308 or document.title 

309 or f"{document.source_type}:{document.source}" 

310 ) 

311 

312 # Start progress tracking 

313 self.progress_tracker.start_chunking( 

314 document.id, 

315 document.source, 

316 document.source_type, 

317 len(document.content), 

318 file_name, 

319 ) 

320 

321 logger.debug( 

322 "Starting default chunking", 

323 document_id=document.id, 

324 content_length=len(document.content), 

325 chunk_size=self.chunk_size, 

326 chunk_overlap=self.chunk_overlap, 

327 ) 

328 

329 try: 

330 # Split the text into chunks 

331 text_chunks = self._split_text(document.content) 

332 

333 if not text_chunks: 

334 self.progress_tracker.finish_chunking(document.id, 0, "default") 

335 return [] 

336 

337 # Apply chunk limit at document level too 

338 if len(text_chunks) > MAX_CHUNKS_TO_PROCESS: 

339 logger.warning( 

340 f"Document {document.id} generated {len(text_chunks)} chunks, " 

341 f"limiting to {MAX_CHUNKS_TO_PROCESS}" 

342 ) 

343 text_chunks = text_chunks[:MAX_CHUNKS_TO_PROCESS] 

344 

345 # Create Document objects for each chunk using base class method 

346 chunk_documents = [] 

347 for i, chunk_text in enumerate(text_chunks): 

348 chunk_doc = self._create_chunk_document( 

349 original_doc=document, 

350 chunk_content=chunk_text, 

351 chunk_index=i, 

352 total_chunks=len(text_chunks), 

353 skip_nlp=False, 

354 ) 

355 

356 # Generate unique chunk ID 

357 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

358 

359 # Add strategy-specific metadata 

360 chunk_doc.metadata.update( 

361 { 

362 "chunking_strategy": "default", 

363 "parent_document_id": document.id, 

364 } 

365 ) 

366 

367 chunk_documents.append(chunk_doc) 

368 

369 # Finish progress tracking 

370 self.progress_tracker.finish_chunking( 

371 document.id, len(chunk_documents), "default" 

372 ) 

373 

374 logger.debug( 

375 "Successfully chunked document", 

376 document_id=document.id, 

377 num_chunks=len(chunk_documents), 

378 strategy="default", 

379 ) 

380 

381 return chunk_documents 

382 

383 except Exception as e: 

384 self.progress_tracker.log_error(document.id, str(e)) 

385 raise