Coverage for src / qdrant_loader / core / chunking / strategy / base_strategy.py: 96%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
1"""Base abstract class for chunking strategies."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6import tiktoken
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.text_processing.text_processor import TextProcessor
10from qdrant_loader.utils.logging import LoggingConfig
12if TYPE_CHECKING:
13 from qdrant_loader.config import Settings
15logger = LoggingConfig.get_logger(__name__)
18class BaseChunkingStrategy(ABC):
19 """Base abstract class for all chunking strategies.
21 This class defines the interface that all chunking strategies must implement.
22 Each strategy should provide its own implementation of how to split documents
23 into chunks while preserving their semantic meaning and structure.
24 """
26 def __init__(
27 self,
28 settings: "Settings",
29 chunk_size: int | None = None,
30 chunk_overlap: int | None = None,
31 ):
32 """Initialize the chunking strategy.
34 Args:
35 settings: Application settings containing configuration for the strategy
36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value)
37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value)
38 """
39 self.settings = settings
40 self.logger = LoggingConfig.get_logger(self.__class__.__name__)
42 # Initialize token-based chunking parameters
43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size
44 self.chunk_overlap = (
45 chunk_overlap or settings.global_config.chunking.chunk_overlap
46 )
47 self.tokenizer = settings.global_config.embedding.tokenizer
49 # Initialize tokenizer based on configuration
50 if self.tokenizer == "none":
51 self.encoding = None
52 else:
53 try:
54 self.encoding = tiktoken.get_encoding(self.tokenizer)
55 except Exception as e:
56 logger.warning(
57 "Failed to initialize tokenizer, falling back to simple character counting",
58 error=str(e),
59 tokenizer=self.tokenizer,
60 )
61 self.encoding = None
63 if self.chunk_overlap >= self.chunk_size:
64 raise ValueError("Chunk overlap must be less than chunk size")
66 # Initialize text processor
67 self.text_processor = TextProcessor(settings)
69 def _count_tokens(self, text: str) -> int:
70 """Count the number of tokens in a text string."""
71 if self.encoding is None:
72 # Fallback to character count if no tokenizer is available
73 return len(text)
74 return len(self.encoding.encode(text))
76 def _process_text(self, text: str) -> dict:
77 """Process text using the text processor.
79 Args:
80 text: Text to process
82 Returns:
83 dict: Processed text features
84 """
85 return self.text_processor.process_text(text)
87 def _should_apply_nlp(
88 self, content: str, file_path: str = "", content_type: str = ""
89 ) -> bool:
90 """Determine if NLP processing should be applied to content.
92 Args:
93 content: The content to analyze
94 file_path: File path for extension-based detection
95 content_type: Content type if available
97 Returns:
98 bool: True if NLP processing would be valuable
99 """
100 # Skip NLP for very large content (performance)
101 if len(content) > 20000: # 20KB limit
102 return False
104 # Get file extension
105 ext = ""
106 if file_path and "." in file_path:
107 ext = f".{file_path.lower().split('.')[-1]}"
109 # Skip NLP for code files (except comments/docstrings)
110 code_extensions = {
111 ".py",
112 ".pyx",
113 ".pyi",
114 ".java",
115 ".js",
116 ".jsx",
117 ".mjs",
118 ".ts",
119 ".tsx",
120 ".go",
121 ".rs",
122 ".cpp",
123 ".cc",
124 ".cxx",
125 ".c",
126 ".h",
127 ".cs",
128 ".php",
129 ".rb",
130 ".kt",
131 ".scala",
132 ".swift",
133 ".dart",
134 ".sh",
135 ".bash",
136 ".zsh",
137 ".sql",
138 ".r",
139 ".m",
140 ".pl",
141 ".lua",
142 ".vim",
143 ".asm",
144 }
145 if ext in code_extensions:
146 return False
148 # Skip NLP for structured data files
149 structured_extensions = {
150 ".json",
151 ".xml",
152 ".yaml",
153 ".yml",
154 ".toml",
155 ".ini",
156 ".cfg",
157 ".conf",
158 ".csv",
159 ".tsv",
160 ".log",
161 ".properties",
162 }
163 if ext in structured_extensions:
164 return False
166 # Skip NLP for binary/encoded content
167 binary_extensions = {
168 ".pdf",
169 ".docx",
170 ".xls",
171 ".xlsx",
172 ".pptx",
173 ".zip",
174 ".tar",
175 ".gz",
176 ".bz2",
177 ".7z",
178 ".rar",
179 ".jpg",
180 ".jpeg",
181 ".png",
182 ".gif",
183 ".bmp",
184 ".svg",
185 ".mp3",
186 ".mp4",
187 ".avi",
188 ".mov",
189 ".wav",
190 ".flac",
191 }
192 if ext in binary_extensions:
193 return False
195 # Apply NLP for documentation and text files
196 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"}
197 if ext in text_extensions:
198 return True
200 # Apply NLP for HTML content (but be selective)
201 if ext in {".html", ".htm"} or content_type == "html":
202 return True
204 # For unknown extensions, check content characteristics
205 if not ext:
206 # Skip if content looks like code (high ratio of special characters)
207 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`")
208 if len(content) > 0 and special_chars / len(content) > 0.15:
209 return False
211 # Skip if content looks like structured data
212 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]:
213 return False
215 # Default to applying NLP for text-like content
216 return True
218 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str:
219 """Extract only the parts of content that are worth NLP processing.
221 For code files, this extracts comments and docstrings.
222 For other files, returns the full content.
224 Args:
225 content: The content to process
226 element_type: Type of code element (if applicable)
228 Returns:
229 str: Content suitable for NLP processing
230 """
231 # For code elements, only process comments and docstrings
232 if element_type in ["comment", "docstring"]:
233 return content
235 # For other code elements, extract comments
236 if element_type in ["function", "method", "class", "module"]:
237 return self._extract_comments_and_docstrings(content)
239 # For non-code content, return as-is
240 return content
242 def _extract_comments_and_docstrings(self, code_content: str) -> str:
243 """Extract comments and docstrings from code content.
245 Args:
246 code_content: Code content to extract from
248 Returns:
249 str: Extracted comments and docstrings
250 """
251 extracted_text = []
252 lines = code_content.split("\n")
254 in_multiline_comment = False
255 in_docstring = False
256 docstring_delimiter = None
258 for line in lines:
259 stripped = line.strip()
261 # Python/Shell style comments
262 if stripped.startswith("#"):
263 comment = stripped[1:].strip()
264 if comment: # Skip empty comments
265 extracted_text.append(comment)
267 # C/Java/JS style single line comments
268 elif "//" in stripped:
269 comment_start = stripped.find("//")
270 comment = stripped[comment_start + 2 :].strip()
271 if comment:
272 extracted_text.append(comment)
274 # C/Java/JS style multiline comments
275 elif "/*" in stripped and not in_multiline_comment:
276 in_multiline_comment = True
277 comment_start = stripped.find("/*")
278 comment = stripped[comment_start + 2 :]
279 if "*/" in comment:
280 comment = comment[: comment.find("*/")]
281 in_multiline_comment = False
282 comment = comment.strip()
283 if comment:
284 extracted_text.append(comment)
286 elif in_multiline_comment:
287 if "*/" in stripped:
288 comment = stripped[: stripped.find("*/")]
289 in_multiline_comment = False
290 else:
291 comment = stripped
292 comment = comment.strip("* \t")
293 if comment:
294 extracted_text.append(comment)
296 # Python docstrings
297 elif ('"""' in stripped or "'''" in stripped) and not in_docstring:
298 for delimiter in ['"""', "'''"]:
299 if delimiter in stripped:
300 in_docstring = True
301 docstring_delimiter = delimiter
302 start_idx = stripped.find(delimiter)
303 docstring_content = stripped[start_idx + 3 :]
305 # Check if docstring ends on same line
306 if delimiter in docstring_content:
307 end_idx = docstring_content.find(delimiter)
308 docstring_text = docstring_content[:end_idx].strip()
309 if docstring_text:
310 extracted_text.append(docstring_text)
311 in_docstring = False
312 docstring_delimiter = None
313 else:
314 if docstring_content.strip():
315 extracted_text.append(docstring_content.strip())
316 break
318 elif in_docstring and docstring_delimiter:
319 if docstring_delimiter in stripped:
320 end_idx = stripped.find(docstring_delimiter)
321 docstring_text = stripped[:end_idx].strip()
322 if docstring_text:
323 extracted_text.append(docstring_text)
324 in_docstring = False
325 docstring_delimiter = None
326 else:
327 if stripped:
328 extracted_text.append(stripped)
330 return "\n".join(extracted_text)
332 def _create_chunk_document(
333 self,
334 original_doc: Document,
335 chunk_content: str,
336 chunk_index: int,
337 total_chunks: int,
338 skip_nlp: bool = False,
339 ) -> Document:
340 """Create a new document for a chunk with enhanced metadata.
342 Args:
343 original_doc: Original document
344 chunk_content: Content of the chunk
345 chunk_index: Index of the chunk
346 total_chunks: Total number of chunks
347 skip_nlp: Whether to skip expensive NLP processing
349 Returns:
350 Document: New document instance for the chunk
351 """
352 # Create enhanced metadata
353 metadata = original_doc.metadata.copy()
354 metadata.update(
355 {
356 "chunk_index": chunk_index,
357 "total_chunks": total_chunks,
358 }
359 )
361 # Smart NLP decision based on content type and characteristics
362 file_path = original_doc.metadata.get("file_name", "") or original_doc.source
363 content_type = original_doc.content_type or ""
364 element_type = metadata.get("element_type", "")
366 # For converted files, use the converted content type instead of original file extension
367 conversion_method = metadata.get("conversion_method")
368 if conversion_method == "markitdown":
369 # File was converted to markdown, so treat it as markdown for NLP purposes
370 file_path = "converted.md" # Use .md extension for NLP decision
371 content_type = "md"
373 should_apply_nlp = (
374 not skip_nlp
375 and len(chunk_content) <= 10000 # Size limit
376 and total_chunks <= 50 # Chunk count limit
377 and self._should_apply_nlp(chunk_content, file_path, content_type)
378 )
380 if not should_apply_nlp:
381 # Skip NLP processing
382 skip_reason = "performance_optimization"
383 if len(chunk_content) > 10000:
384 skip_reason = "chunk_too_large"
385 elif total_chunks > 50:
386 skip_reason = "too_many_chunks"
387 elif not self._should_apply_nlp(chunk_content, file_path, content_type):
388 skip_reason = "content_type_inappropriate"
390 metadata.update(
391 {
392 "entities": [],
393 "pos_tags": [],
394 "nlp_skipped": True,
395 "skip_reason": skip_reason,
396 }
397 )
398 else:
399 try:
400 # For code content, only process comments/docstrings
401 nlp_content = self._extract_nlp_worthy_content(
402 chunk_content, element_type
403 )
405 if nlp_content.strip():
406 # Process the NLP-worthy content
407 processed = self._process_text(nlp_content)
408 metadata.update(
409 {
410 "entities": processed["entities"],
411 "pos_tags": processed["pos_tags"],
412 "nlp_skipped": False,
413 "nlp_content_extracted": len(nlp_content)
414 < len(chunk_content),
415 "nlp_content_ratio": (
416 len(nlp_content) / len(chunk_content)
417 if chunk_content
418 else 0
419 ),
420 }
421 )
422 else:
423 # No NLP-worthy content found
424 metadata.update(
425 {
426 "entities": [],
427 "pos_tags": [],
428 "nlp_skipped": True,
429 "skip_reason": "no_nlp_worthy_content",
430 }
431 )
432 except Exception as e:
433 self.logger.warning(
434 f"NLP processing failed for chunk {chunk_index}: {e}"
435 )
436 metadata.update(
437 {
438 "entities": [],
439 "pos_tags": [],
440 "nlp_skipped": True,
441 "skip_reason": "nlp_error",
442 }
443 )
445 return Document(
446 content=chunk_content,
447 metadata=metadata,
448 source=original_doc.source,
449 source_type=original_doc.source_type,
450 url=original_doc.url,
451 title=original_doc.title,
452 content_type=original_doc.content_type,
453 )
455 @abstractmethod
456 def chunk_document(self, document: Document) -> list[Document]:
457 """Split a document into chunks while preserving metadata.
459 This method should:
460 1. Split the document content into appropriate chunks
461 2. Preserve all metadata from the original document
462 3. Add chunk-specific metadata (e.g., chunk index, total chunks)
463 4. Return a list of new Document instances
465 Args:
466 document: The document to chunk
468 Returns:
469 List of chunked documents with preserved metadata
471 Raises:
472 NotImplementedError: If the strategy doesn't implement this method
473 """
474 raise NotImplementedError(
475 "Chunking strategy must implement chunk_document method"
476 )