Coverage for src/qdrant_loader/core/chunking/strategy/base_strategy.py: 96%
159 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Base abstract class for chunking strategies."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6import tiktoken
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.text_processing.text_processor import TextProcessor
10from qdrant_loader.utils.logging import LoggingConfig
12if TYPE_CHECKING:
13 from qdrant_loader.config import Settings
15logger = LoggingConfig.get_logger(__name__)
18class BaseChunkingStrategy(ABC):
19 """Base abstract class for all chunking strategies.
21 This class defines the interface that all chunking strategies must implement.
22 Each strategy should provide its own implementation of how to split documents
23 into chunks while preserving their semantic meaning and structure.
24 """
26 def __init__(
27 self,
28 settings: "Settings",
29 chunk_size: int | None = None,
30 chunk_overlap: int | None = None,
31 ):
32 """Initialize the chunking strategy.
34 Args:
35 settings: Application settings containing configuration for the strategy
36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value)
37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value)
38 """
39 self.settings = settings
40 self.logger = LoggingConfig.get_logger(self.__class__.__name__)
42 # Initialize token-based chunking parameters
43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size
44 self.chunk_overlap = (
45 chunk_overlap or settings.global_config.chunking.chunk_overlap
46 )
47 self.tokenizer = settings.global_config.embedding.tokenizer
49 # Initialize tokenizer based on configuration
50 if self.tokenizer == "none":
51 self.encoding = None
52 else:
53 try:
54 self.encoding = tiktoken.get_encoding(self.tokenizer)
55 except Exception as e:
56 logger.warning(
57 "Failed to initialize tokenizer, falling back to simple character counting",
58 error=str(e),
59 tokenizer=self.tokenizer,
60 )
61 self.encoding = None
63 if self.chunk_overlap >= self.chunk_size:
64 raise ValueError("Chunk overlap must be less than chunk size")
66 # Initialize text processor
67 self.text_processor = TextProcessor(settings)
69 def _count_tokens(self, text: str) -> int:
70 """Count the number of tokens in a text string."""
71 if self.encoding is None:
72 # Fallback to character count if no tokenizer is available
73 return len(text)
74 return len(self.encoding.encode(text))
76 def _process_text(self, text: str) -> dict:
77 """Process text using the text processor.
79 Args:
80 text: Text to process
82 Returns:
83 dict: Processed text features
84 """
85 return self.text_processor.process_text(text)
87 def _should_apply_nlp(
88 self, content: str, file_path: str = "", content_type: str = ""
89 ) -> bool:
90 """Determine if NLP processing should be applied to content.
92 Args:
93 content: The content to analyze
94 file_path: File path for extension-based detection
95 content_type: Content type if available
97 Returns:
98 bool: True if NLP processing would be valuable
99 """
100 # Skip NLP for very large content (performance)
101 if len(content) > 20000: # 20KB limit
102 return False
104 # Get file extension
105 ext = ""
106 if file_path and "." in file_path:
107 ext = f".{file_path.lower().split('.')[-1]}"
109 # Skip NLP for code files (except comments/docstrings)
110 code_extensions = {
111 ".py",
112 ".pyx",
113 ".pyi",
114 ".java",
115 ".js",
116 ".jsx",
117 ".mjs",
118 ".ts",
119 ".tsx",
120 ".go",
121 ".rs",
122 ".cpp",
123 ".cc",
124 ".cxx",
125 ".c",
126 ".h",
127 ".cs",
128 ".php",
129 ".rb",
130 ".kt",
131 ".scala",
132 ".swift",
133 ".dart",
134 ".sh",
135 ".bash",
136 ".zsh",
137 ".sql",
138 ".r",
139 ".m",
140 ".pl",
141 ".lua",
142 ".vim",
143 ".asm",
144 }
145 if ext in code_extensions:
146 return False
148 # Skip NLP for structured data files
149 structured_extensions = {
150 ".json",
151 ".xml",
152 ".yaml",
153 ".yml",
154 ".toml",
155 ".ini",
156 ".cfg",
157 ".conf",
158 ".csv",
159 ".tsv",
160 ".log",
161 ".properties",
162 }
163 if ext in structured_extensions:
164 return False
166 # Skip NLP for binary/encoded content
167 binary_extensions = {
168 ".pdf",
169 ".doc",
170 ".docx",
171 ".xls",
172 ".xlsx",
173 ".ppt",
174 ".pptx",
175 ".zip",
176 ".tar",
177 ".gz",
178 ".bz2",
179 ".7z",
180 ".rar",
181 ".jpg",
182 ".jpeg",
183 ".png",
184 ".gif",
185 ".bmp",
186 ".svg",
187 ".mp3",
188 ".mp4",
189 ".avi",
190 ".mov",
191 ".wav",
192 ".flac",
193 }
194 if ext in binary_extensions:
195 return False
197 # Apply NLP for documentation and text files
198 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"}
199 if ext in text_extensions:
200 return True
202 # Apply NLP for HTML content (but be selective)
203 if ext in {".html", ".htm"} or content_type == "html":
204 return True
206 # For unknown extensions, check content characteristics
207 if not ext:
208 # Skip if content looks like code (high ratio of special characters)
209 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`")
210 if len(content) > 0 and special_chars / len(content) > 0.15:
211 return False
213 # Skip if content looks like structured data
214 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]:
215 return False
217 # Default to applying NLP for text-like content
218 return True
220 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str:
221 """Extract only the parts of content that are worth NLP processing.
223 For code files, this extracts comments and docstrings.
224 For other files, returns the full content.
226 Args:
227 content: The content to process
228 element_type: Type of code element (if applicable)
230 Returns:
231 str: Content suitable for NLP processing
232 """
233 # For code elements, only process comments and docstrings
234 if element_type in ["comment", "docstring"]:
235 return content
237 # For other code elements, extract comments
238 if element_type in ["function", "method", "class", "module"]:
239 return self._extract_comments_and_docstrings(content)
241 # For non-code content, return as-is
242 return content
244 def _extract_comments_and_docstrings(self, code_content: str) -> str:
245 """Extract comments and docstrings from code content.
247 Args:
248 code_content: Code content to extract from
250 Returns:
251 str: Extracted comments and docstrings
252 """
253 extracted_text = []
254 lines = code_content.split("\n")
256 in_multiline_comment = False
257 in_docstring = False
258 docstring_delimiter = None
260 for line in lines:
261 stripped = line.strip()
263 # Python/Shell style comments
264 if stripped.startswith("#"):
265 comment = stripped[1:].strip()
266 if comment: # Skip empty comments
267 extracted_text.append(comment)
269 # C/Java/JS style single line comments
270 elif "//" in stripped:
271 comment_start = stripped.find("//")
272 comment = stripped[comment_start + 2 :].strip()
273 if comment:
274 extracted_text.append(comment)
276 # C/Java/JS style multiline comments
277 elif "/*" in stripped and not in_multiline_comment:
278 in_multiline_comment = True
279 comment_start = stripped.find("/*")
280 comment = stripped[comment_start + 2 :]
281 if "*/" in comment:
282 comment = comment[: comment.find("*/")]
283 in_multiline_comment = False
284 comment = comment.strip()
285 if comment:
286 extracted_text.append(comment)
288 elif in_multiline_comment:
289 if "*/" in stripped:
290 comment = stripped[: stripped.find("*/")]
291 in_multiline_comment = False
292 else:
293 comment = stripped
294 comment = comment.strip("* \t")
295 if comment:
296 extracted_text.append(comment)
298 # Python docstrings
299 elif ('"""' in stripped or "'''" in stripped) and not in_docstring:
300 for delimiter in ['"""', "'''"]:
301 if delimiter in stripped:
302 in_docstring = True
303 docstring_delimiter = delimiter
304 start_idx = stripped.find(delimiter)
305 docstring_content = stripped[start_idx + 3 :]
307 # Check if docstring ends on same line
308 if delimiter in docstring_content:
309 end_idx = docstring_content.find(delimiter)
310 docstring_text = docstring_content[:end_idx].strip()
311 if docstring_text:
312 extracted_text.append(docstring_text)
313 in_docstring = False
314 docstring_delimiter = None
315 else:
316 if docstring_content.strip():
317 extracted_text.append(docstring_content.strip())
318 break
320 elif in_docstring and docstring_delimiter:
321 if docstring_delimiter in stripped:
322 end_idx = stripped.find(docstring_delimiter)
323 docstring_text = stripped[:end_idx].strip()
324 if docstring_text:
325 extracted_text.append(docstring_text)
326 in_docstring = False
327 docstring_delimiter = None
328 else:
329 if stripped:
330 extracted_text.append(stripped)
332 return "\n".join(extracted_text)
334 def _create_chunk_document(
335 self,
336 original_doc: Document,
337 chunk_content: str,
338 chunk_index: int,
339 total_chunks: int,
340 skip_nlp: bool = False,
341 ) -> Document:
342 """Create a new document for a chunk with enhanced metadata.
344 Args:
345 original_doc: Original document
346 chunk_content: Content of the chunk
347 chunk_index: Index of the chunk
348 total_chunks: Total number of chunks
349 skip_nlp: Whether to skip expensive NLP processing
351 Returns:
352 Document: New document instance for the chunk
353 """
354 # Create enhanced metadata
355 metadata = original_doc.metadata.copy()
356 metadata.update(
357 {
358 "chunk_index": chunk_index,
359 "total_chunks": total_chunks,
360 }
361 )
363 # Smart NLP decision based on content type and characteristics
364 file_path = original_doc.metadata.get("file_name", "") or original_doc.source
365 content_type = original_doc.content_type or ""
366 element_type = metadata.get("element_type", "")
368 # For converted files, use the converted content type instead of original file extension
369 conversion_method = metadata.get("conversion_method")
370 if conversion_method == "markitdown":
371 # File was converted to markdown, so treat it as markdown for NLP purposes
372 file_path = "converted.md" # Use .md extension for NLP decision
373 content_type = "md"
375 should_apply_nlp = (
376 not skip_nlp
377 and len(chunk_content) <= 10000 # Size limit
378 and total_chunks <= 50 # Chunk count limit
379 and self._should_apply_nlp(chunk_content, file_path, content_type)
380 )
382 if not should_apply_nlp:
383 # Skip NLP processing
384 skip_reason = "performance_optimization"
385 if len(chunk_content) > 10000:
386 skip_reason = "chunk_too_large"
387 elif total_chunks > 50:
388 skip_reason = "too_many_chunks"
389 elif not self._should_apply_nlp(chunk_content, file_path, content_type):
390 skip_reason = "content_type_inappropriate"
392 metadata.update(
393 {
394 "entities": [],
395 "pos_tags": [],
396 "nlp_skipped": True,
397 "skip_reason": skip_reason,
398 }
399 )
400 else:
401 try:
402 # For code content, only process comments/docstrings
403 nlp_content = self._extract_nlp_worthy_content(
404 chunk_content, element_type
405 )
407 if nlp_content.strip():
408 # Process the NLP-worthy content
409 processed = self._process_text(nlp_content)
410 metadata.update(
411 {
412 "entities": processed["entities"],
413 "pos_tags": processed["pos_tags"],
414 "nlp_skipped": False,
415 "nlp_content_extracted": len(nlp_content)
416 < len(chunk_content),
417 "nlp_content_ratio": (
418 len(nlp_content) / len(chunk_content)
419 if chunk_content
420 else 0
421 ),
422 }
423 )
424 else:
425 # No NLP-worthy content found
426 metadata.update(
427 {
428 "entities": [],
429 "pos_tags": [],
430 "nlp_skipped": True,
431 "skip_reason": "no_nlp_worthy_content",
432 }
433 )
434 except Exception as e:
435 self.logger.warning(
436 f"NLP processing failed for chunk {chunk_index}: {e}"
437 )
438 metadata.update(
439 {
440 "entities": [],
441 "pos_tags": [],
442 "nlp_skipped": True,
443 "skip_reason": "nlp_error",
444 }
445 )
447 return Document(
448 content=chunk_content,
449 metadata=metadata,
450 source=original_doc.source,
451 source_type=original_doc.source_type,
452 url=original_doc.url,
453 title=original_doc.title,
454 content_type=original_doc.content_type,
455 )
457 @abstractmethod
458 def chunk_document(self, document: Document) -> list[Document]:
459 """Split a document into chunks while preserving metadata.
461 This method should:
462 1. Split the document content into appropriate chunks
463 2. Preserve all metadata from the original document
464 3. Add chunk-specific metadata (e.g., chunk index, total chunks)
465 4. Return a list of new Document instances
467 Args:
468 document: The document to chunk
470 Returns:
471 List of chunked documents with preserved metadata
473 Raises:
474 NotImplementedError: If the strategy doesn't implement this method
475 """
476 raise NotImplementedError(
477 "Chunking strategy must implement chunk_document method"
478 )