Coverage for src/qdrant_loader/core/chunking/strategy/default_strategy.py: 79%
156 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Default chunking strategy for text documents.
3This strategy uses character-based chunking for consistency with other strategies.
4When a tokenizer is available, it's used for better boundary detection to avoid
5splitting in the middle of tokens, but the chunk size limits are still based on
6character count.
7"""
9import re
11import structlog
13from qdrant_loader.config import Settings
14from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
15from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
16from qdrant_loader.core.document import Document
18logger = structlog.get_logger(__name__)
20# Maximum number of chunks to process to prevent performance issues
21MAX_CHUNKS_TO_PROCESS = 1000
24class DefaultChunkingStrategy(BaseChunkingStrategy):
25 """Default text chunking strategy using character-based splitting.
27 This strategy splits text into chunks based on character count, ensuring
28 consistency with other chunking strategies like the markdown strategy.
29 When a tokenizer is available, it's used to find better split boundaries
30 (avoiding splits in the middle of tokens/words), but the size limits
31 are always based on character count.
32 """
34 def __init__(self, settings: Settings):
35 super().__init__(settings)
36 self.progress_tracker = ChunkingProgressTracker(logger)
38 # Log configuration for debugging
39 logger.info(
40 "DefaultChunkingStrategy initialized",
41 chunk_size=self.chunk_size,
42 chunk_overlap=self.chunk_overlap,
43 tokenizer=self.tokenizer,
44 has_encoding=self.encoding is not None,
45 chunking_method="character-based" + (" with token boundary detection" if self.encoding is not None else ""),
46 )
48 # Warn about suspiciously small chunk sizes
49 if self.chunk_size < 100:
50 logger.warning(
51 f"Very small chunk_size detected: {self.chunk_size} characters. "
52 f"This may cause performance issues and excessive chunking. "
53 f"Consider using a larger value (e.g., 1000-1500 characters)."
54 )
56 def _split_text(self, text: str) -> list[str]:
57 """Split text into chunks using sentence boundaries and size limits.
59 Args:
60 text: The text to split
62 Returns:
63 List of text chunks
64 """
65 if not text.strip():
66 return [""]
68 # Use tokenizer-based chunking if available
69 if self.encoding is not None:
70 return self._split_text_with_tokenizer(text)
71 else:
72 return self._split_text_without_tokenizer(text)
74 def _split_text_with_tokenizer(self, text: str) -> list[str]:
75 """Split text using tokenizer for accurate token boundary detection but character-based sizing.
77 Args:
78 text: The text to split
80 Returns:
81 List of text chunks
82 """
83 if self.encoding is None:
84 # Fallback to character-based chunking
85 return self._split_text_without_tokenizer(text)
87 # Use character-based size limit (consistent with markdown strategy)
88 if len(text) <= self.chunk_size:
89 return [text]
91 chunks = []
92 start_char = 0
94 while start_char < len(text) and len(chunks) < MAX_CHUNKS_TO_PROCESS:
95 # Calculate end position based on character count
96 end_char = min(start_char + self.chunk_size, len(text))
98 # Get the chunk text
99 chunk_text = text[start_char:end_char]
101 # If we're not at the end of the text, try to find a good token boundary
102 # to avoid splitting in the middle of words/tokens
103 if end_char < len(text):
104 # Try to encode/decode to find a clean token boundary
105 try:
106 # Get tokens for the chunk
107 tokens = self.encoding.encode(chunk_text)
109 # If the chunk ends mid-token, back up to the last complete token
110 # by decoding and checking if we get the same text
111 decoded_text = self.encoding.decode(tokens)
112 if len(decoded_text) < len(chunk_text):
113 # The last token was incomplete, use the decoded text
114 chunk_text = decoded_text
115 end_char = start_char + len(chunk_text)
117 # Alternatively, try to find a word boundary near the target end
118 remaining_chars = self.chunk_size - len(chunk_text)
119 if remaining_chars > 0 and end_char < len(text):
120 # Look ahead a bit to find a word boundary
121 lookahead_end = min(end_char + min(100, remaining_chars), len(text))
122 lookahead_text = text[end_char:lookahead_end]
124 # Find the first word boundary (space, newline, punctuation)
125 import re
126 word_boundary_match = re.search(r'[\s\n\.\!\?\;]', lookahead_text)
127 if word_boundary_match:
128 boundary_pos = word_boundary_match.start()
129 end_char = end_char + boundary_pos + 1
130 chunk_text = text[start_char:end_char]
132 except Exception:
133 # If tokenizer operations fail, stick with character-based splitting
134 pass
136 chunks.append(chunk_text)
138 # Calculate overlap in characters and move start position forward
139 if self.chunk_overlap > 0 and end_char < len(text):
140 # Calculate how much to advance (chunk_size - overlap)
141 advance = max(1, self.chunk_size - self.chunk_overlap)
142 start_char += advance
143 else:
144 # No overlap, advance by full chunk size
145 start_char = end_char
147 # If we're near the end and the remaining text is small, include it in the last chunk
148 if start_char < len(text) and (len(text) - start_char) <= self.chunk_overlap:
149 # Create final chunk with remaining text
150 final_chunk_text = text[start_char:]
151 if final_chunk_text.strip(): # Only add if there's meaningful content
152 chunks.append(final_chunk_text)
153 break
155 # Log warning if we hit the chunk limit
156 if len(chunks) >= MAX_CHUNKS_TO_PROCESS and start_char < len(text):
157 logger.warning(
158 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. "
159 f"Document may be truncated."
160 )
162 return chunks
164 def _split_text_without_tokenizer(self, text: str) -> list[str]:
165 """Split text without tokenizer using character-based chunking.
167 Args:
168 text: The text to split
170 Returns:
171 List of text chunks
172 """
173 # Safety check: if chunk_size is invalid, use a reasonable default
174 if self.chunk_size <= 0:
175 logger.warning(f"Invalid chunk_size {self.chunk_size}, using default 1000")
176 effective_chunk_size = 1000
177 else:
178 effective_chunk_size = self.chunk_size
180 if len(text) <= effective_chunk_size:
181 return [text]
183 # First, try to split by paragraphs (double newlines)
184 paragraphs = re.split(r"\n\s*\n", text.strip())
185 chunks = []
186 current_chunk = ""
188 for paragraph in paragraphs:
189 paragraph = paragraph.strip()
190 if not paragraph:
191 continue
193 # If adding this paragraph would exceed chunk size, finalize current chunk
194 if (
195 current_chunk
196 and len(current_chunk) + len(paragraph) + 2 > effective_chunk_size
197 ):
198 chunks.append(current_chunk.strip())
199 current_chunk = paragraph
200 else:
201 if current_chunk:
202 current_chunk += "\n\n" + paragraph
203 else:
204 current_chunk = paragraph
206 # Add the last chunk if it exists
207 if current_chunk.strip():
208 chunks.append(current_chunk.strip())
210 # If we still have chunks that are too large, split them further
211 final_chunks = []
212 for chunk in chunks:
213 if len(chunk) <= effective_chunk_size:
214 final_chunks.append(chunk)
215 else:
216 # Split large chunks by sentences
217 sentences = re.split(r"(?<=[.!?])\s+", chunk)
218 current_subchunk = ""
220 for sentence in sentences:
221 if (
222 current_subchunk
223 and len(current_subchunk) + len(sentence) + 1
224 > effective_chunk_size
225 ):
226 if current_subchunk.strip():
227 final_chunks.append(current_subchunk.strip())
228 current_subchunk = sentence
229 else:
230 if current_subchunk:
231 current_subchunk += " " + sentence
232 else:
233 current_subchunk = sentence
235 if current_subchunk.strip():
236 final_chunks.append(current_subchunk.strip())
238 # Final fallback: if chunks are still too large, split by character count
239 result_chunks = []
240 for chunk in final_chunks:
241 if len(chunk) <= effective_chunk_size:
242 result_chunks.append(chunk)
243 else:
244 # Split by character count with word boundaries
245 words = chunk.split()
246 current_word_chunk = ""
248 for word in words:
249 if (
250 current_word_chunk
251 and len(current_word_chunk) + len(word) + 1
252 > effective_chunk_size
253 ):
254 if current_word_chunk.strip():
255 result_chunks.append(current_word_chunk.strip())
256 current_word_chunk = word
257 else:
258 if current_word_chunk:
259 current_word_chunk += " " + word
260 else:
261 current_word_chunk = word
263 if current_word_chunk.strip():
264 result_chunks.append(current_word_chunk.strip())
266 # Ultimate fallback: if chunks are still too large (no word boundaries), split by character count
267 final_result_chunks = []
268 for chunk in result_chunks:
269 if len(chunk) <= effective_chunk_size:
270 final_result_chunks.append(chunk)
271 else:
272 # Split by pure character count as last resort
273 for i in range(0, len(chunk), effective_chunk_size):
274 char_chunk = chunk[i : i + effective_chunk_size]
275 if char_chunk.strip():
276 final_result_chunks.append(char_chunk)
278 # Safety check: if we somehow generated too many chunks from a small document, something is wrong
279 if len(text) < 1000 and len(final_result_chunks) > 100:
280 logger.error(
281 f"Suspicious chunking result: {len(text)} chars generated {len(final_result_chunks)} chunks. "
282 f"Chunk size: {effective_chunk_size}. Returning single chunk as fallback."
283 )
284 return [text]
286 # Apply chunk limit
287 if len(final_result_chunks) > MAX_CHUNKS_TO_PROCESS:
288 logger.warning(
289 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. "
290 f"Document may be truncated. Text length: {len(text)}, Chunk size: {effective_chunk_size}"
291 )
292 final_result_chunks = final_result_chunks[:MAX_CHUNKS_TO_PROCESS]
294 return [chunk for chunk in final_result_chunks if chunk.strip()]
296 def chunk_document(self, document: Document) -> list[Document]:
297 """Split a document into chunks while preserving metadata.
299 Args:
300 document: The document to chunk
302 Returns:
303 List of chunked documents with preserved metadata
304 """
305 file_name = (
306 document.metadata.get("file_name")
307 or document.metadata.get("original_filename")
308 or document.title
309 or f"{document.source_type}:{document.source}"
310 )
312 # Start progress tracking
313 self.progress_tracker.start_chunking(
314 document.id,
315 document.source,
316 document.source_type,
317 len(document.content),
318 file_name,
319 )
321 logger.debug(
322 "Starting default chunking",
323 document_id=document.id,
324 content_length=len(document.content),
325 chunk_size=self.chunk_size,
326 chunk_overlap=self.chunk_overlap,
327 )
329 try:
330 # Split the text into chunks
331 text_chunks = self._split_text(document.content)
333 if not text_chunks:
334 self.progress_tracker.finish_chunking(document.id, 0, "default")
335 return []
337 # Apply chunk limit at document level too
338 if len(text_chunks) > MAX_CHUNKS_TO_PROCESS:
339 logger.warning(
340 f"Document {document.id} generated {len(text_chunks)} chunks, "
341 f"limiting to {MAX_CHUNKS_TO_PROCESS}"
342 )
343 text_chunks = text_chunks[:MAX_CHUNKS_TO_PROCESS]
345 # Create Document objects for each chunk using base class method
346 chunk_documents = []
347 for i, chunk_text in enumerate(text_chunks):
348 chunk_doc = self._create_chunk_document(
349 original_doc=document,
350 chunk_content=chunk_text,
351 chunk_index=i,
352 total_chunks=len(text_chunks),
353 skip_nlp=False,
354 )
356 # Generate unique chunk ID
357 chunk_doc.id = Document.generate_chunk_id(document.id, i)
359 # Add strategy-specific metadata
360 chunk_doc.metadata.update(
361 {
362 "chunking_strategy": "default",
363 "parent_document_id": document.id,
364 }
365 )
367 chunk_documents.append(chunk_doc)
369 # Finish progress tracking
370 self.progress_tracker.finish_chunking(
371 document.id, len(chunk_documents), "default"
372 )
374 logger.debug(
375 "Successfully chunked document",
376 document_id=document.id,
377 num_chunks=len(chunk_documents),
378 strategy="default",
379 )
381 return chunk_documents
383 except Exception as e:
384 self.progress_tracker.log_error(document.id, str(e))
385 raise