Coverage for src / qdrant_loader / core / chunking / strategy / base_strategy.py: 96%
165 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Base abstract class for chunking strategies."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6import tiktoken
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.text_processing.text_processor import TextProcessor
10from qdrant_loader.utils.logging import LoggingConfig
12if TYPE_CHECKING:
13 from qdrant_loader.config import Settings
15logger = LoggingConfig.get_logger(__name__)
18class BaseChunkingStrategy(ABC):
19 """Base abstract class for all chunking strategies.
21 This class defines the interface that all chunking strategies must implement.
22 Each strategy should provide its own implementation of how to split documents
23 into chunks while preserving their semantic meaning and structure.
24 """
26 def __init__(
27 self,
28 settings: "Settings",
29 chunk_size: int | None = None,
30 chunk_overlap: int | None = None,
31 ):
32 """Initialize the chunking strategy.
34 Args:
35 settings: Application settings containing configuration for the strategy
36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value)
37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value)
38 """
39 self.settings = settings
40 self.logger = LoggingConfig.get_logger(self.__class__.__name__)
42 # Initialize token-based chunking parameters
43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size
44 self.chunk_overlap = (
45 chunk_overlap or settings.global_config.chunking.chunk_overlap
46 )
47 self.tokenizer = settings.global_config.embedding.tokenizer
49 # Initialize tokenizer based on configuration
50 if self.tokenizer == "none":
51 self.encoding = None
52 else:
53 try:
54 self.encoding = tiktoken.get_encoding(self.tokenizer)
55 except Exception as e:
56 logger.warning(
57 "Failed to initialize tokenizer, falling back to simple character counting",
58 error=str(e),
59 tokenizer=self.tokenizer,
60 )
61 self.encoding = None
63 if self.chunk_overlap >= self.chunk_size:
64 raise ValueError("Chunk overlap must be less than chunk size")
66 # Master switch for NLP metadata extraction across all strategies.
67 self._semantic_analysis_enabled = bool(
68 getattr(settings.global_config.chunking, "enable_semantic_analysis", True)
69 )
71 # Initialize text processor only when NLP metadata extraction is enabled.
72 self.text_processor = (
73 TextProcessor(settings) if self._semantic_analysis_enabled else None
74 )
76 def _count_tokens(self, text: str) -> int:
77 """Count the number of tokens in a text string."""
78 if self.encoding is None:
79 # Fallback to character count if no tokenizer is available
80 return len(text)
81 return len(self.encoding.encode(text))
83 def _process_text(self, text: str) -> dict:
84 """Process text using the text processor.
86 Args:
87 text: Text to process
89 Returns:
90 dict: Processed text features
91 """
92 if self.text_processor is None:
93 return {"tokens": [], "entities": [], "pos_tags": [], "chunks": []}
95 return self.text_processor.process_text(text)
97 def _should_apply_nlp(
98 self, content: str, file_path: str = "", content_type: str = ""
99 ) -> bool:
100 """Determine if NLP processing should be applied to content.
102 Args:
103 content: The content to analyze
104 file_path: File path for extension-based detection
105 content_type: Content type if available
107 Returns:
108 bool: True if NLP processing would be valuable
109 """
110 # Skip NLP for very large content (performance)
111 if len(content) > 20000: # 20KB limit
112 return False
114 # Get file extension
115 ext = ""
116 if file_path and "." in file_path:
117 ext = f".{file_path.lower().split('.')[-1]}"
119 # Skip NLP for code files (except comments/docstrings)
120 code_extensions = {
121 ".py",
122 ".pyx",
123 ".pyi",
124 ".java",
125 ".js",
126 ".jsx",
127 ".mjs",
128 ".ts",
129 ".tsx",
130 ".go",
131 ".rs",
132 ".cpp",
133 ".cc",
134 ".cxx",
135 ".c",
136 ".h",
137 ".cs",
138 ".php",
139 ".rb",
140 ".kt",
141 ".scala",
142 ".swift",
143 ".dart",
144 ".sh",
145 ".bash",
146 ".zsh",
147 ".sql",
148 ".r",
149 ".m",
150 ".pl",
151 ".lua",
152 ".vim",
153 ".asm",
154 }
155 if ext in code_extensions:
156 return False
158 # Skip NLP for structured data files
159 structured_extensions = {
160 ".json",
161 ".xml",
162 ".yaml",
163 ".yml",
164 ".toml",
165 ".ini",
166 ".cfg",
167 ".conf",
168 ".csv",
169 ".tsv",
170 ".log",
171 ".properties",
172 }
173 if ext in structured_extensions:
174 return False
176 # Skip NLP for binary/encoded content
177 binary_extensions = {
178 ".pdf",
179 ".docx",
180 ".xls",
181 ".xlsx",
182 ".pptx",
183 ".zip",
184 ".tar",
185 ".gz",
186 ".bz2",
187 ".7z",
188 ".rar",
189 ".jpg",
190 ".jpeg",
191 ".png",
192 ".gif",
193 ".bmp",
194 ".svg",
195 ".mp3",
196 ".mp4",
197 ".avi",
198 ".mov",
199 ".wav",
200 ".flac",
201 }
202 if ext in binary_extensions:
203 return False
205 # Apply NLP for documentation and text files
206 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"}
207 if ext in text_extensions:
208 return True
210 # Apply NLP for HTML content (but be selective)
211 if ext in {".html", ".htm"} or content_type == "html":
212 return True
214 # For unknown extensions, check content characteristics
215 if not ext:
216 # Skip if content looks like code (high ratio of special characters)
217 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`")
218 if len(content) > 0 and special_chars / len(content) > 0.15:
219 return False
221 # Skip if content looks like structured data
222 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]:
223 return False
225 # Default to applying NLP for text-like content
226 return True
228 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str:
229 """Extract only the parts of content that are worth NLP processing.
231 For code files, this extracts comments and docstrings.
232 For other files, returns the full content.
234 Args:
235 content: The content to process
236 element_type: Type of code element (if applicable)
238 Returns:
239 str: Content suitable for NLP processing
240 """
241 # For code elements, only process comments and docstrings
242 if element_type in ["comment", "docstring"]:
243 return content
245 # For other code elements, extract comments
246 if element_type in ["function", "method", "class", "module"]:
247 return self._extract_comments_and_docstrings(content)
249 # For non-code content, return as-is
250 return content
252 def _extract_comments_and_docstrings(self, code_content: str) -> str:
253 """Extract comments and docstrings from code content.
255 Args:
256 code_content: Code content to extract from
258 Returns:
259 str: Extracted comments and docstrings
260 """
261 extracted_text = []
262 lines = code_content.split("\n")
264 in_multiline_comment = False
265 in_docstring = False
266 docstring_delimiter = None
268 for line in lines:
269 stripped = line.strip()
271 # Python/Shell style comments
272 if stripped.startswith("#"):
273 comment = stripped[1:].strip()
274 if comment: # Skip empty comments
275 extracted_text.append(comment)
277 # C/Java/JS style single line comments
278 elif "//" in stripped:
279 comment_start = stripped.find("//")
280 comment = stripped[comment_start + 2 :].strip()
281 if comment:
282 extracted_text.append(comment)
284 # C/Java/JS style multiline comments
285 elif "/*" in stripped and not in_multiline_comment:
286 in_multiline_comment = True
287 comment_start = stripped.find("/*")
288 comment = stripped[comment_start + 2 :]
289 if "*/" in comment:
290 comment = comment[: comment.find("*/")]
291 in_multiline_comment = False
292 comment = comment.strip()
293 if comment:
294 extracted_text.append(comment)
296 elif in_multiline_comment:
297 if "*/" in stripped:
298 comment = stripped[: stripped.find("*/")]
299 in_multiline_comment = False
300 else:
301 comment = stripped
302 comment = comment.strip("* \t")
303 if comment:
304 extracted_text.append(comment)
306 # Python docstrings
307 elif ('"""' in stripped or "'''" in stripped) and not in_docstring:
308 for delimiter in ['"""', "'''"]:
309 if delimiter in stripped:
310 in_docstring = True
311 docstring_delimiter = delimiter
312 start_idx = stripped.find(delimiter)
313 docstring_content = stripped[start_idx + 3 :]
315 # Check if docstring ends on same line
316 if delimiter in docstring_content:
317 end_idx = docstring_content.find(delimiter)
318 docstring_text = docstring_content[:end_idx].strip()
319 if docstring_text:
320 extracted_text.append(docstring_text)
321 in_docstring = False
322 docstring_delimiter = None
323 else:
324 if docstring_content.strip():
325 extracted_text.append(docstring_content.strip())
326 break
328 elif in_docstring and docstring_delimiter:
329 if docstring_delimiter in stripped:
330 end_idx = stripped.find(docstring_delimiter)
331 docstring_text = stripped[:end_idx].strip()
332 if docstring_text:
333 extracted_text.append(docstring_text)
334 in_docstring = False
335 docstring_delimiter = None
336 else:
337 if stripped:
338 extracted_text.append(stripped)
340 return "\n".join(extracted_text)
342 def _create_chunk_document(
343 self,
344 original_doc: Document,
345 chunk_content: str,
346 chunk_index: int,
347 total_chunks: int,
348 skip_nlp: bool = False,
349 ) -> Document:
350 """Create a new document for a chunk with enhanced metadata.
352 Args:
353 original_doc: Original document
354 chunk_content: Content of the chunk
355 chunk_index: Index of the chunk
356 total_chunks: Total number of chunks
357 skip_nlp: Whether to skip expensive NLP processing
359 Returns:
360 Document: New document instance for the chunk
361 """
362 # Create enhanced metadata
363 metadata = original_doc.metadata.copy()
364 metadata.update(
365 {
366 "chunk_index": chunk_index,
367 "total_chunks": total_chunks,
368 }
369 )
371 # Smart NLP decision based on content type and characteristics
372 file_path = original_doc.metadata.get("file_name", "") or original_doc.source
373 content_type = original_doc.content_type or ""
374 element_type = metadata.get("element_type", "")
376 # For converted files, use the converted content type instead of original file extension
377 conversion_method = metadata.get("conversion_method")
378 if conversion_method == "markitdown":
379 # File was converted to markdown, so treat it as markdown for NLP purposes
380 file_path = "converted.md" # Use .md extension for NLP decision
381 content_type = "md"
383 nlp_applicable = self._should_apply_nlp(chunk_content, file_path, content_type)
385 should_apply_nlp = (
386 self._semantic_analysis_enabled
387 and not skip_nlp
388 and len(chunk_content) <= 10000 # Size limit
389 and total_chunks <= 50 # Chunk count limit
390 and nlp_applicable
391 )
393 if not should_apply_nlp:
394 # Skip NLP processing
395 skip_reason = "performance_optimization"
396 if not self._semantic_analysis_enabled:
397 skip_reason = "semantic_analysis_disabled"
398 elif len(chunk_content) > 10000:
399 skip_reason = "chunk_too_large"
400 elif total_chunks > 50:
401 skip_reason = "too_many_chunks"
402 elif not nlp_applicable:
403 skip_reason = "content_type_inappropriate"
405 metadata.update(
406 {
407 "entities": [],
408 "pos_tags": [],
409 "nlp_skipped": True,
410 "skip_reason": skip_reason,
411 }
412 )
413 else:
414 try:
415 # For code content, only process comments/docstrings
416 nlp_content = self._extract_nlp_worthy_content(
417 chunk_content, element_type
418 )
420 if nlp_content.strip():
421 # Process the NLP-worthy content
422 processed = self._process_text(nlp_content)
423 metadata.update(
424 {
425 "entities": processed["entities"],
426 "pos_tags": processed["pos_tags"],
427 "nlp_skipped": False,
428 "nlp_content_extracted": len(nlp_content)
429 < len(chunk_content),
430 "nlp_content_ratio": (
431 len(nlp_content) / len(chunk_content)
432 if chunk_content
433 else 0
434 ),
435 }
436 )
437 else:
438 # No NLP-worthy content found
439 metadata.update(
440 {
441 "entities": [],
442 "pos_tags": [],
443 "nlp_skipped": True,
444 "skip_reason": "no_nlp_worthy_content",
445 }
446 )
447 except Exception as e:
448 self.logger.warning(
449 f"NLP processing failed for chunk {chunk_index}: {e}"
450 )
451 metadata.update(
452 {
453 "entities": [],
454 "pos_tags": [],
455 "nlp_skipped": True,
456 "skip_reason": "nlp_error",
457 }
458 )
460 return Document(
461 content=chunk_content,
462 metadata=metadata,
463 source=original_doc.source,
464 source_type=original_doc.source_type,
465 url=original_doc.url,
466 title=original_doc.title,
467 content_type=original_doc.content_type,
468 )
470 @abstractmethod
471 def chunk_document(self, document: Document) -> list[Document]:
472 """Split a document into chunks while preserving metadata.
474 This method should:
475 1. Split the document content into appropriate chunks
476 2. Preserve all metadata from the original document
477 3. Add chunk-specific metadata (e.g., chunk index, total chunks)
478 4. Return a list of new Document instances
480 Args:
481 document: The document to chunk
483 Returns:
484 List of chunked documents with preserved metadata
486 Raises:
487 NotImplementedError: If the strategy doesn't implement this method
488 """
489 raise NotImplementedError(
490 "Chunking strategy must implement chunk_document method"
491 )