Coverage for src/qdrant_loader/core/chunking/strategy/base_strategy.py: 95%
165 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Base abstract class for chunking strategies."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6import tiktoken
8from qdrant_loader.config import Settings
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.text_processing.text_processor import TextProcessor
11from qdrant_loader.utils.logging import LoggingConfig
13if TYPE_CHECKING:
14 from qdrant_loader.config import Settings
16logger = LoggingConfig.get_logger(__name__)
19class BaseChunkingStrategy(ABC):
20 """Base abstract class for all chunking strategies.
22 This class defines the interface that all chunking strategies must implement.
23 Each strategy should provide its own implementation of how to split documents
24 into chunks while preserving their semantic meaning and structure.
25 """
27 def __init__(
28 self,
29 settings: "Settings",
30 chunk_size: int | None = None,
31 chunk_overlap: int | None = None,
32 ):
33 """Initialize the chunking strategy.
35 Args:
36 settings: Application settings containing configuration for the strategy
37 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value)
38 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value)
39 """
40 self.settings = settings
41 self.logger = LoggingConfig.get_logger(self.__class__.__name__)
43 # Initialize token-based chunking parameters
44 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size
45 self.chunk_overlap = (
46 chunk_overlap or settings.global_config.chunking.chunk_overlap
47 )
48 self.tokenizer = settings.global_config.embedding.tokenizer
50 # Initialize tokenizer based on configuration
51 if self.tokenizer == "none":
52 self.encoding = None
53 else:
54 try:
55 self.encoding = tiktoken.get_encoding(self.tokenizer)
56 except Exception as e:
57 logger.warning(
58 "Failed to initialize tokenizer, falling back to simple character counting",
59 error=str(e),
60 tokenizer=self.tokenizer,
61 )
62 self.encoding = None
64 if self.chunk_overlap >= self.chunk_size:
65 raise ValueError("Chunk overlap must be less than chunk size")
67 # Initialize text processor
68 self.text_processor = TextProcessor(settings)
70 def _count_tokens(self, text: str) -> int:
71 """Count the number of tokens in a text string."""
72 if self.encoding is None:
73 # Fallback to character count if no tokenizer is available
74 return len(text)
75 return len(self.encoding.encode(text))
77 def _process_text(self, text: str) -> dict:
78 """Process text using the text processor.
80 Args:
81 text: Text to process
83 Returns:
84 dict: Processed text features
85 """
86 return self.text_processor.process_text(text)
88 def _should_apply_nlp(
89 self, content: str, file_path: str = "", content_type: str = ""
90 ) -> bool:
91 """Determine if NLP processing should be applied to content.
93 Args:
94 content: The content to analyze
95 file_path: File path for extension-based detection
96 content_type: Content type if available
98 Returns:
99 bool: True if NLP processing would be valuable
100 """
101 # Skip NLP for very large content (performance)
102 if len(content) > 20000: # 20KB limit
103 return False
105 # Get file extension
106 ext = ""
107 if file_path and "." in file_path:
108 ext = f".{file_path.lower().split('.')[-1]}"
110 # Skip NLP for code files (except comments/docstrings)
111 code_extensions = {
112 ".py",
113 ".pyx",
114 ".pyi",
115 ".java",
116 ".js",
117 ".jsx",
118 ".mjs",
119 ".ts",
120 ".tsx",
121 ".go",
122 ".rs",
123 ".cpp",
124 ".cc",
125 ".cxx",
126 ".c",
127 ".h",
128 ".cs",
129 ".php",
130 ".rb",
131 ".kt",
132 ".scala",
133 ".swift",
134 ".dart",
135 ".sh",
136 ".bash",
137 ".zsh",
138 ".sql",
139 ".r",
140 ".m",
141 ".pl",
142 ".lua",
143 ".vim",
144 ".asm",
145 }
146 if ext in code_extensions:
147 return False
149 # Skip NLP for structured data files
150 structured_extensions = {
151 ".json",
152 ".xml",
153 ".yaml",
154 ".yml",
155 ".toml",
156 ".ini",
157 ".cfg",
158 ".conf",
159 ".csv",
160 ".tsv",
161 ".log",
162 ".properties",
163 }
164 if ext in structured_extensions:
165 return False
167 # Skip NLP for binary/encoded content
168 binary_extensions = {
169 ".pdf",
170 ".doc",
171 ".docx",
172 ".xls",
173 ".xlsx",
174 ".ppt",
175 ".pptx",
176 ".zip",
177 ".tar",
178 ".gz",
179 ".bz2",
180 ".7z",
181 ".rar",
182 ".jpg",
183 ".jpeg",
184 ".png",
185 ".gif",
186 ".bmp",
187 ".svg",
188 ".mp3",
189 ".mp4",
190 ".avi",
191 ".mov",
192 ".wav",
193 ".flac",
194 }
195 if ext in binary_extensions:
196 return False
198 # Apply NLP for documentation and text files
199 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"}
200 if ext in text_extensions:
201 return True
203 # Apply NLP for HTML content (but be selective)
204 if ext in {".html", ".htm"} or content_type == "html":
205 return True
207 # For unknown extensions, check content characteristics
208 if not ext:
209 # Skip if content looks like code (high ratio of special characters)
210 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`")
211 if len(content) > 0 and special_chars / len(content) > 0.15:
212 return False
214 # Skip if content looks like structured data
215 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]:
216 return False
218 # Default to applying NLP for text-like content
219 return True
221 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str:
222 """Extract only the parts of content that are worth NLP processing.
224 For code files, this extracts comments and docstrings.
225 For other files, returns the full content.
227 Args:
228 content: The content to process
229 element_type: Type of code element (if applicable)
231 Returns:
232 str: Content suitable for NLP processing
233 """
234 # For code elements, only process comments and docstrings
235 if element_type in ["comment", "docstring"]:
236 return content
238 # For other code elements, extract comments
239 if element_type in ["function", "method", "class", "module"]:
240 return self._extract_comments_and_docstrings(content)
242 # For non-code content, return as-is
243 return content
245 def _extract_comments_and_docstrings(self, code_content: str) -> str:
246 """Extract comments and docstrings from code content.
248 Args:
249 code_content: Code content to extract from
251 Returns:
252 str: Extracted comments and docstrings
253 """
254 extracted_text = []
255 lines = code_content.split("\n")
257 in_multiline_comment = False
258 in_docstring = False
259 docstring_delimiter = None
261 for line in lines:
262 stripped = line.strip()
264 # Python/Shell style comments
265 if stripped.startswith("#"):
266 comment = stripped[1:].strip()
267 if comment: # Skip empty comments
268 extracted_text.append(comment)
270 # C/Java/JS style single line comments
271 elif "//" in stripped:
272 comment_start = stripped.find("//")
273 comment = stripped[comment_start + 2 :].strip()
274 if comment:
275 extracted_text.append(comment)
277 # C/Java/JS style multiline comments
278 elif "/*" in stripped and not in_multiline_comment:
279 in_multiline_comment = True
280 comment_start = stripped.find("/*")
281 comment = stripped[comment_start + 2 :]
282 if "*/" in comment:
283 comment = comment[: comment.find("*/")]
284 in_multiline_comment = False
285 comment = comment.strip()
286 if comment:
287 extracted_text.append(comment)
289 elif in_multiline_comment:
290 if "*/" in stripped:
291 comment = stripped[: stripped.find("*/")]
292 in_multiline_comment = False
293 else:
294 comment = stripped
295 comment = comment.strip("* \t")
296 if comment:
297 extracted_text.append(comment)
299 # Python docstrings
300 elif ('"""' in stripped or "'''" in stripped) and not in_docstring:
301 for delimiter in ['"""', "'''"]:
302 if delimiter in stripped:
303 in_docstring = True
304 docstring_delimiter = delimiter
305 start_idx = stripped.find(delimiter)
306 docstring_content = stripped[start_idx + 3 :]
308 # Check if docstring ends on same line
309 if delimiter in docstring_content:
310 end_idx = docstring_content.find(delimiter)
311 docstring_text = docstring_content[:end_idx].strip()
312 if docstring_text:
313 extracted_text.append(docstring_text)
314 in_docstring = False
315 docstring_delimiter = None
316 else:
317 if docstring_content.strip():
318 extracted_text.append(docstring_content.strip())
319 break
321 elif in_docstring and docstring_delimiter:
322 if docstring_delimiter in stripped:
323 end_idx = stripped.find(docstring_delimiter)
324 docstring_text = stripped[:end_idx].strip()
325 if docstring_text:
326 extracted_text.append(docstring_text)
327 in_docstring = False
328 docstring_delimiter = None
329 else:
330 if stripped:
331 extracted_text.append(stripped)
333 return "\n".join(extracted_text)
335 def _create_chunk_document(
336 self,
337 original_doc: Document,
338 chunk_content: str,
339 chunk_index: int,
340 total_chunks: int,
341 skip_nlp: bool = False,
342 ) -> Document:
343 """Create a new document for a chunk with enhanced metadata.
345 Args:
346 original_doc: Original document
347 chunk_content: Content of the chunk
348 chunk_index: Index of the chunk
349 total_chunks: Total number of chunks
350 skip_nlp: Whether to skip expensive NLP processing
352 Returns:
353 Document: New document instance for the chunk
354 """
355 # Create enhanced metadata
356 metadata = original_doc.metadata.copy()
357 metadata.update(
358 {
359 "chunk_index": chunk_index,
360 "total_chunks": total_chunks,
361 }
362 )
364 # Smart NLP decision based on content type and characteristics
365 file_path = original_doc.metadata.get("file_name", "") or original_doc.source
366 content_type = original_doc.content_type or ""
367 element_type = metadata.get("element_type", "")
369 # For converted files, use the converted content type instead of original file extension
370 conversion_method = metadata.get("conversion_method")
371 if conversion_method == "markitdown":
372 # File was converted to markdown, so treat it as markdown for NLP purposes
373 file_path = "converted.md" # Use .md extension for NLP decision
374 content_type = "md"
376 should_apply_nlp = (
377 not skip_nlp
378 and len(chunk_content) <= 10000 # Size limit
379 and total_chunks <= 50 # Chunk count limit
380 and self._should_apply_nlp(chunk_content, file_path, content_type)
381 )
383 if not should_apply_nlp:
384 # Skip NLP processing
385 skip_reason = "performance_optimization"
386 if len(chunk_content) > 10000:
387 skip_reason = "chunk_too_large"
388 elif total_chunks > 50:
389 skip_reason = "too_many_chunks"
390 elif not self._should_apply_nlp(chunk_content, file_path, content_type):
391 skip_reason = "content_type_inappropriate"
393 metadata.update(
394 {
395 "entities": [],
396 "pos_tags": [],
397 "nlp_skipped": True,
398 "skip_reason": skip_reason,
399 }
400 )
401 else:
402 try:
403 # For code content, only process comments/docstrings
404 nlp_content = self._extract_nlp_worthy_content(
405 chunk_content, element_type
406 )
408 if nlp_content.strip():
409 # Process the NLP-worthy content
410 processed = self._process_text(nlp_content)
411 metadata.update(
412 {
413 "entities": processed["entities"],
414 "pos_tags": processed["pos_tags"],
415 "nlp_skipped": False,
416 "nlp_content_extracted": len(nlp_content)
417 < len(chunk_content),
418 "nlp_content_ratio": (
419 len(nlp_content) / len(chunk_content)
420 if chunk_content
421 else 0
422 ),
423 }
424 )
425 else:
426 # No NLP-worthy content found
427 metadata.update(
428 {
429 "entities": [],
430 "pos_tags": [],
431 "nlp_skipped": True,
432 "skip_reason": "no_nlp_worthy_content",
433 }
434 )
435 except Exception as e:
436 self.logger.warning(
437 f"NLP processing failed for chunk {chunk_index}: {e}"
438 )
439 metadata.update(
440 {
441 "entities": [],
442 "pos_tags": [],
443 "nlp_skipped": True,
444 "skip_reason": "nlp_error",
445 }
446 )
448 return Document(
449 content=chunk_content,
450 metadata=metadata,
451 source=original_doc.source,
452 source_type=original_doc.source_type,
453 url=original_doc.url,
454 title=original_doc.title,
455 content_type=original_doc.content_type,
456 )
458 @abstractmethod
459 def chunk_document(self, document: Document) -> list[Document]:
460 """Split a document into chunks while preserving metadata.
462 This method should:
463 1. Split the document content into appropriate chunks
464 2. Preserve all metadata from the original document
465 3. Add chunk-specific metadata (e.g., chunk index, total chunks)
466 4. Return a list of new Document instances
468 Args:
469 document: The document to chunk
471 Returns:
472 List of chunked documents with preserved metadata
474 Raises:
475 NotImplementedError: If the strategy doesn't implement this method
476 """
477 raise NotImplementedError(
478 "Chunking strategy must implement chunk_document method"
479 )
481 @abstractmethod
482 def _split_text(self, text: str) -> list[str]:
483 """Split text into chunks based on strategy-specific rules.
485 This method should:
486 1. Implement the specific chunking logic for the strategy
487 2. Return a list of text chunks
488 3. Preserve the semantic meaning of the content
490 Args:
491 text: The text to split into chunks
493 Returns:
494 List of text chunks
496 Raises:
497 NotImplementedError: If the strategy doesn't implement this method
498 """
499 raise NotImplementedError("Chunking strategy must implement _split_text method")