Coverage for src/qdrant_loader/core/chunking/strategy/default_strategy.py: 78%
139 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Default chunking strategy for text documents."""
3import re
4from typing import Any
6import structlog
8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
9from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
10from qdrant_loader.core.document import Document
11from qdrant_loader.config import Settings
13logger = structlog.get_logger(__name__)
15# Maximum number of chunks to process to prevent performance issues
16MAX_CHUNKS_TO_PROCESS = 1000
19class DefaultChunkingStrategy(BaseChunkingStrategy):
20 """Default text chunking strategy using simple text splitting."""
22 def __init__(self, settings: Settings):
23 super().__init__(settings)
24 self.progress_tracker = ChunkingProgressTracker(logger)
26 # Log configuration for debugging
27 logger.info(
28 "DefaultChunkingStrategy initialized",
29 chunk_size=self.chunk_size,
30 chunk_overlap=self.chunk_overlap,
31 tokenizer=self.tokenizer,
32 has_encoding=self.encoding is not None,
33 )
35 # Warn about suspiciously small chunk sizes
36 if self.chunk_size < 100:
37 logger.warning(
38 f"Very small chunk_size detected: {self.chunk_size}. "
39 f"This may cause performance issues and excessive chunking. "
40 f"Consider using a larger value (e.g., 1000-1500 characters)."
41 )
43 def _split_text(self, text: str) -> list[str]:
44 """Split text into chunks using sentence boundaries and size limits.
46 Args:
47 text: The text to split
49 Returns:
50 List of text chunks
51 """
52 if not text.strip():
53 return [""]
55 # Use tokenizer-based chunking if available
56 if self.encoding is not None:
57 return self._split_text_with_tokenizer(text)
58 else:
59 return self._split_text_without_tokenizer(text)
61 def _split_text_with_tokenizer(self, text: str) -> list[str]:
62 """Split text using tokenizer for accurate token counting.
64 Args:
65 text: The text to split
67 Returns:
68 List of text chunks
69 """
70 if self.encoding is None:
71 # Fallback to character-based chunking
72 return self._split_text_without_tokenizer(text)
74 tokens = self.encoding.encode(text)
76 if len(tokens) <= self.chunk_size:
77 return [text]
79 chunks = []
80 start = 0
82 while start < len(tokens) and len(chunks) < MAX_CHUNKS_TO_PROCESS:
83 # Calculate end position
84 end = min(start + self.chunk_size, len(tokens))
86 # Extract chunk tokens
87 chunk_tokens = tokens[start:end]
89 # Decode tokens back to text
90 chunk_text = self.encoding.decode(chunk_tokens)
91 chunks.append(chunk_text)
93 # Move start position forward, accounting for overlap
94 # Ensure we always make progress by advancing at least 1 token
95 advance = max(1, self.chunk_size - self.chunk_overlap)
96 start += advance
98 # If we're near the end and the remaining tokens are small, include them in the last chunk
99 if start < len(tokens) and (len(tokens) - start) <= self.chunk_overlap:
100 # Create final chunk with remaining tokens
101 final_chunk_tokens = tokens[start:]
102 if final_chunk_tokens: # Only add if there are tokens
103 final_chunk_text = self.encoding.decode(final_chunk_tokens)
104 chunks.append(final_chunk_text)
105 break
107 # Log warning if we hit the chunk limit
108 if len(chunks) >= MAX_CHUNKS_TO_PROCESS and start < len(tokens):
109 logger.warning(
110 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. "
111 f"Document may be truncated."
112 )
114 return chunks
116 def _split_text_without_tokenizer(self, text: str) -> list[str]:
117 """Split text without tokenizer using character-based chunking.
119 Args:
120 text: The text to split
122 Returns:
123 List of text chunks
124 """
125 # Safety check: if chunk_size is invalid, use a reasonable default
126 if self.chunk_size <= 0:
127 logger.warning(f"Invalid chunk_size {self.chunk_size}, using default 1000")
128 effective_chunk_size = 1000
129 else:
130 effective_chunk_size = self.chunk_size
132 if len(text) <= effective_chunk_size:
133 return [text]
135 # First, try to split by paragraphs (double newlines)
136 paragraphs = re.split(r"\n\s*\n", text.strip())
137 chunks = []
138 current_chunk = ""
140 for paragraph in paragraphs:
141 paragraph = paragraph.strip()
142 if not paragraph:
143 continue
145 # If adding this paragraph would exceed chunk size, finalize current chunk
146 if (
147 current_chunk
148 and len(current_chunk) + len(paragraph) + 2 > effective_chunk_size
149 ):
150 chunks.append(current_chunk.strip())
151 current_chunk = paragraph
152 else:
153 if current_chunk:
154 current_chunk += "\n\n" + paragraph
155 else:
156 current_chunk = paragraph
158 # Add the last chunk if it exists
159 if current_chunk.strip():
160 chunks.append(current_chunk.strip())
162 # If we still have chunks that are too large, split them further
163 final_chunks = []
164 for chunk in chunks:
165 if len(chunk) <= effective_chunk_size:
166 final_chunks.append(chunk)
167 else:
168 # Split large chunks by sentences
169 sentences = re.split(r"(?<=[.!?])\s+", chunk)
170 current_subchunk = ""
172 for sentence in sentences:
173 if (
174 current_subchunk
175 and len(current_subchunk) + len(sentence) + 1
176 > effective_chunk_size
177 ):
178 if current_subchunk.strip():
179 final_chunks.append(current_subchunk.strip())
180 current_subchunk = sentence
181 else:
182 if current_subchunk:
183 current_subchunk += " " + sentence
184 else:
185 current_subchunk = sentence
187 if current_subchunk.strip():
188 final_chunks.append(current_subchunk.strip())
190 # Final fallback: if chunks are still too large, split by character count
191 result_chunks = []
192 for chunk in final_chunks:
193 if len(chunk) <= effective_chunk_size:
194 result_chunks.append(chunk)
195 else:
196 # Split by character count with word boundaries
197 words = chunk.split()
198 current_word_chunk = ""
200 for word in words:
201 if (
202 current_word_chunk
203 and len(current_word_chunk) + len(word) + 1
204 > effective_chunk_size
205 ):
206 if current_word_chunk.strip():
207 result_chunks.append(current_word_chunk.strip())
208 current_word_chunk = word
209 else:
210 if current_word_chunk:
211 current_word_chunk += " " + word
212 else:
213 current_word_chunk = word
215 if current_word_chunk.strip():
216 result_chunks.append(current_word_chunk.strip())
218 # Ultimate fallback: if chunks are still too large (no word boundaries), split by character count
219 final_result_chunks = []
220 for chunk in result_chunks:
221 if len(chunk) <= effective_chunk_size:
222 final_result_chunks.append(chunk)
223 else:
224 # Split by pure character count as last resort
225 for i in range(0, len(chunk), effective_chunk_size):
226 char_chunk = chunk[i : i + effective_chunk_size]
227 if char_chunk.strip():
228 final_result_chunks.append(char_chunk)
230 # Safety check: if we somehow generated too many chunks from a small document, something is wrong
231 if len(text) < 1000 and len(final_result_chunks) > 100:
232 logger.error(
233 f"Suspicious chunking result: {len(text)} chars generated {len(final_result_chunks)} chunks. "
234 f"Chunk size: {effective_chunk_size}. Returning single chunk as fallback."
235 )
236 return [text]
238 # Apply chunk limit
239 if len(final_result_chunks) > MAX_CHUNKS_TO_PROCESS:
240 logger.warning(
241 f"Reached maximum chunk limit of {MAX_CHUNKS_TO_PROCESS}. "
242 f"Document may be truncated. Text length: {len(text)}, Chunk size: {effective_chunk_size}"
243 )
244 final_result_chunks = final_result_chunks[:MAX_CHUNKS_TO_PROCESS]
246 return [chunk for chunk in final_result_chunks if chunk.strip()]
248 def chunk_document(self, document: Document) -> list[Document]:
249 """Split a document into chunks while preserving metadata.
251 Args:
252 document: The document to chunk
254 Returns:
255 List of chunked documents with preserved metadata
256 """
257 file_name = (
258 document.metadata.get("file_name")
259 or document.metadata.get("original_filename")
260 or document.title
261 or f"{document.source_type}:{document.source}"
262 )
264 # Start progress tracking
265 self.progress_tracker.start_chunking(
266 document.id,
267 document.source,
268 document.source_type,
269 len(document.content),
270 file_name,
271 )
273 logger.debug(
274 "Starting default chunking",
275 document_id=document.id,
276 content_length=len(document.content),
277 chunk_size=self.chunk_size,
278 chunk_overlap=self.chunk_overlap,
279 )
281 try:
282 # Split the text into chunks
283 text_chunks = self._split_text(document.content)
285 if not text_chunks:
286 self.progress_tracker.finish_chunking(document.id, 0, "default")
287 return []
289 # Apply chunk limit at document level too
290 if len(text_chunks) > MAX_CHUNKS_TO_PROCESS:
291 logger.warning(
292 f"Document {document.id} generated {len(text_chunks)} chunks, "
293 f"limiting to {MAX_CHUNKS_TO_PROCESS}"
294 )
295 text_chunks = text_chunks[:MAX_CHUNKS_TO_PROCESS]
297 # Create Document objects for each chunk using base class method
298 chunk_documents = []
299 for i, chunk_text in enumerate(text_chunks):
300 chunk_doc = self._create_chunk_document(
301 original_doc=document,
302 chunk_content=chunk_text,
303 chunk_index=i,
304 total_chunks=len(text_chunks),
305 skip_nlp=False,
306 )
308 # Generate unique chunk ID
309 chunk_doc.id = Document.generate_chunk_id(document.id, i)
311 # Add strategy-specific metadata
312 chunk_doc.metadata.update(
313 {
314 "chunking_strategy": "default",
315 "parent_document_id": document.id,
316 }
317 )
319 chunk_documents.append(chunk_doc)
321 # Finish progress tracking
322 self.progress_tracker.finish_chunking(
323 document.id, len(chunk_documents), "default"
324 )
326 logger.debug(
327 "Successfully chunked document",
328 document_id=document.id,
329 num_chunks=len(chunk_documents),
330 strategy="default",
331 )
333 return chunk_documents
335 except Exception as e:
336 self.progress_tracker.log_error(document.id, str(e))
337 raise