Coverage for src/qdrant_loader/core/chunking/strategy/default_strategy.py: 78%

139 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Default chunking strategy for text documents.""" 

2 

3import re 

4from typing import Any 

5 

6import structlog 

7 

8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

9from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.config import Settings 

12 

13logger = structlog.get_logger(__name__) 

14 

15# Maximum number of chunks to process to prevent performance issues 

16MAX_CHUNKS_TO_PROCESS = 1000 

17 

18 

19class DefaultChunkingStrategy(BaseChunkingStrategy): 

20 """Default text chunking strategy using simple text splitting.""" 

21 

22 def __init__(self, settings: Settings): 

23 super().__init__(settings) 

24 self.progress_tracker = ChunkingProgressTracker(logger) 

25 

26 # Log configuration for debugging 

27 logger.info( 

28 "DefaultChunkingStrategy initialized", 

29 chunk_size=self.chunk_size, 

30 chunk_overlap=self.chunk_overlap, 

31 tokenizer=self.tokenizer, 

32 has_encoding=self.encoding is not None, 

33 ) 

34 

35 # Warn about suspiciously small chunk sizes 

36 if self.chunk_size < 100: 

37 logger.warning( 

38 f"Very small chunk_size detected: {self.chunk_size}. " 

39 f"This may cause performance issues and excessive chunking. " 

40 f"Consider using a larger value (e.g., 1000-1500 characters)." 

41 ) 

42 

43 def _split_text(self, text: str) -> list[str]: 

44 """Split text into chunks using sentence boundaries and size limits. 

45 

46 Args: 

47 text: The text to split 

48 

49 Returns: 

50 List of text chunks 

51 """ 

52 if not text.strip(): 

53 return [""] 

54 

55 # Use tokenizer-based chunking if available 

56 if self.encoding is not None: 

57 return self._split_text_with_tokenizer(text) 

58 else: 

59 return self._split_text_without_tokenizer(text) 

60 

61 def _split_text_with_tokenizer(self, text: str) -> list[str]: 

62 """Split text using tokenizer for accurate token counting. 

63 

64 Args: 

65 text: The text to split 

66 

67 Returns: 

68 List of text chunks 

69 """ 

70 if self.encoding is None: 

71 # Fallback to character-based chunking 

72 return self._split_text_without_tokenizer(text) 

73 

74 tokens = self.encoding.encode(text) 

75 

76 if len(tokens) <= self.chunk_size: 

77 return [text] 

78 

79 chunks = [] 

80 start = 0 

81 

82 while start < len(tokens) and len(chunks) < MAX_CHUNKS_TO_PROCESS: 

83 # Calculate end position 

84 end = min(start + self.chunk_size, len(tokens)) 

85 

86 # Extract chunk tokens 

87 chunk_tokens = tokens[start:end] 

88 

89 # Decode tokens back to text 

90 chunk_text = self.encoding.decode(chunk_tokens) 

91 chunks.append(chunk_text) 

92 

93 # Move start position forward, accounting for overlap 

94 # Ensure we always make progress by advancing at least 1 token 

95 advance = max(1, self.chunk_size - self.chunk_overlap) 

96 start += advance 

97 

98 # If we're near the end and the remaining tokens are small, include them in the last chunk 

99 if start < len(tokens) and (len(tokens) - start) <= self.chunk_overlap: 

100 # Create final chunk with remaining tokens 

101 final_chunk_tokens = tokens[start:] 

102 if final_chunk_tokens: # Only add if there are tokens 

103 final_chunk_text = self.encoding.decode(final_chunk_tokens) 

104 chunks.append(final_chunk_text) 

105 break 

106 

107 # Log warning if we hit the chunk limit 

108 if len(chunks) >= MAX_CHUNKS_TO_PROCESS and start < len(tokens): 

109 logger.warning( 

110 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. " 

111 f"Document may be truncated." 

112 ) 

113 

114 return chunks 

115 

116 def _split_text_without_tokenizer(self, text: str) -> list[str]: 

117 """Split text without tokenizer using character-based chunking. 

118 

119 Args: 

120 text: The text to split 

121 

122 Returns: 

123 List of text chunks 

124 """ 

125 # Safety check: if chunk_size is invalid, use a reasonable default 

126 if self.chunk_size <= 0: 

127 logger.warning(f"Invalid chunk_size {self.chunk_size}, using default 1000") 

128 effective_chunk_size = 1000 

129 else: 

130 effective_chunk_size = self.chunk_size 

131 

132 if len(text) <= effective_chunk_size: 

133 return [text] 

134 

135 # First, try to split by paragraphs (double newlines) 

136 paragraphs = re.split(r"\n\s*\n", text.strip()) 

137 chunks = [] 

138 current_chunk = "" 

139 

140 for paragraph in paragraphs: 

141 paragraph = paragraph.strip() 

142 if not paragraph: 

143 continue 

144 

145 # If adding this paragraph would exceed chunk size, finalize current chunk 

146 if ( 

147 current_chunk 

148 and len(current_chunk) + len(paragraph) + 2 > effective_chunk_size 

149 ): 

150 chunks.append(current_chunk.strip()) 

151 current_chunk = paragraph 

152 else: 

153 if current_chunk: 

154 current_chunk += "\n\n" + paragraph 

155 else: 

156 current_chunk = paragraph 

157 

158 # Add the last chunk if it exists 

159 if current_chunk.strip(): 

160 chunks.append(current_chunk.strip()) 

161 

162 # If we still have chunks that are too large, split them further 

163 final_chunks = [] 

164 for chunk in chunks: 

165 if len(chunk) <= effective_chunk_size: 

166 final_chunks.append(chunk) 

167 else: 

168 # Split large chunks by sentences 

169 sentences = re.split(r"(?<=[.!?])\s+", chunk) 

170 current_subchunk = "" 

171 

172 for sentence in sentences: 

173 if ( 

174 current_subchunk 

175 and len(current_subchunk) + len(sentence) + 1 

176 > effective_chunk_size 

177 ): 

178 if current_subchunk.strip(): 

179 final_chunks.append(current_subchunk.strip()) 

180 current_subchunk = sentence 

181 else: 

182 if current_subchunk: 

183 current_subchunk += " " + sentence 

184 else: 

185 current_subchunk = sentence 

186 

187 if current_subchunk.strip(): 

188 final_chunks.append(current_subchunk.strip()) 

189 

190 # Final fallback: if chunks are still too large, split by character count 

191 result_chunks = [] 

192 for chunk in final_chunks: 

193 if len(chunk) <= effective_chunk_size: 

194 result_chunks.append(chunk) 

195 else: 

196 # Split by character count with word boundaries 

197 words = chunk.split() 

198 current_word_chunk = "" 

199 

200 for word in words: 

201 if ( 

202 current_word_chunk 

203 and len(current_word_chunk) + len(word) + 1 

204 > effective_chunk_size 

205 ): 

206 if current_word_chunk.strip(): 

207 result_chunks.append(current_word_chunk.strip()) 

208 current_word_chunk = word 

209 else: 

210 if current_word_chunk: 

211 current_word_chunk += " " + word 

212 else: 

213 current_word_chunk = word 

214 

215 if current_word_chunk.strip(): 

216 result_chunks.append(current_word_chunk.strip()) 

217 

218 # Ultimate fallback: if chunks are still too large (no word boundaries), split by character count 

219 final_result_chunks = [] 

220 for chunk in result_chunks: 

221 if len(chunk) <= effective_chunk_size: 

222 final_result_chunks.append(chunk) 

223 else: 

224 # Split by pure character count as last resort 

225 for i in range(0, len(chunk), effective_chunk_size): 

226 char_chunk = chunk[i : i + effective_chunk_size] 

227 if char_chunk.strip(): 

228 final_result_chunks.append(char_chunk) 

229 

230 # Safety check: if we somehow generated too many chunks from a small document, something is wrong 

231 if len(text) < 1000 and len(final_result_chunks) > 100: 

232 logger.error( 

233 f"Suspicious chunking result: {len(text)} chars generated {len(final_result_chunks)} chunks. " 

234 f"Chunk size: {effective_chunk_size}. Returning single chunk as fallback." 

235 ) 

236 return [text] 

237 

238 # Apply chunk limit 

239 if len(final_result_chunks) > MAX_CHUNKS_TO_PROCESS: 

240 logger.warning( 

241 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. " 

242 f"Document may be truncated. Text length: {len(text)}, Chunk size: {effective_chunk_size}" 

243 ) 

244 final_result_chunks = final_result_chunks[:MAX_CHUNKS_TO_PROCESS] 

245 

246 return [chunk for chunk in final_result_chunks if chunk.strip()] 

247 

248 def chunk_document(self, document: Document) -> list[Document]: 

249 """Split a document into chunks while preserving metadata. 

250 

251 Args: 

252 document: The document to chunk 

253 

254 Returns: 

255 List of chunked documents with preserved metadata 

256 """ 

257 file_name = ( 

258 document.metadata.get("file_name") 

259 or document.metadata.get("original_filename") 

260 or document.title 

261 or f"{document.source_type}:{document.source}" 

262 ) 

263 

264 # Start progress tracking 

265 self.progress_tracker.start_chunking( 

266 document.id, 

267 document.source, 

268 document.source_type, 

269 len(document.content), 

270 file_name, 

271 ) 

272 

273 logger.debug( 

274 "Starting default chunking", 

275 document_id=document.id, 

276 content_length=len(document.content), 

277 chunk_size=self.chunk_size, 

278 chunk_overlap=self.chunk_overlap, 

279 ) 

280 

281 try: 

282 # Split the text into chunks 

283 text_chunks = self._split_text(document.content) 

284 

285 if not text_chunks: 

286 self.progress_tracker.finish_chunking(document.id, 0, "default") 

287 return [] 

288 

289 # Apply chunk limit at document level too 

290 if len(text_chunks) > MAX_CHUNKS_TO_PROCESS: 

291 logger.warning( 

292 f"Document {document.id} generated {len(text_chunks)} chunks, " 

293 f"limiting to {MAX_CHUNKS_TO_PROCESS}" 

294 ) 

295 text_chunks = text_chunks[:MAX_CHUNKS_TO_PROCESS] 

296 

297 # Create Document objects for each chunk using base class method 

298 chunk_documents = [] 

299 for i, chunk_text in enumerate(text_chunks): 

300 chunk_doc = self._create_chunk_document( 

301 original_doc=document, 

302 chunk_content=chunk_text, 

303 chunk_index=i, 

304 total_chunks=len(text_chunks), 

305 skip_nlp=False, 

306 ) 

307 

308 # Generate unique chunk ID 

309 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

310 

311 # Add strategy-specific metadata 

312 chunk_doc.metadata.update( 

313 { 

314 "chunking_strategy": "default", 

315 "parent_document_id": document.id, 

316 } 

317 ) 

318 

319 chunk_documents.append(chunk_doc) 

320 

321 # Finish progress tracking 

322 self.progress_tracker.finish_chunking( 

323 document.id, len(chunk_documents), "default" 

324 ) 

325 

326 logger.debug( 

327 "Successfully chunked document", 

328 document_id=document.id, 

329 num_chunks=len(chunk_documents), 

330 strategy="default", 

331 ) 

332 

333 return chunk_documents 

334 

335 except Exception as e: 

336 self.progress_tracker.log_error(document.id, str(e)) 

337 raise