Coverage for src / qdrant_loader / core / chunking / strategy / base_strategy.py: 96%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Base abstract class for chunking strategies.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6import tiktoken 

7 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.text_processing.text_processor import TextProcessor 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12if TYPE_CHECKING: 

13 from qdrant_loader.config import Settings 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18class BaseChunkingStrategy(ABC): 

19 """Base abstract class for all chunking strategies. 

20 

21 This class defines the interface that all chunking strategies must implement. 

22 Each strategy should provide its own implementation of how to split documents 

23 into chunks while preserving their semantic meaning and structure. 

24 """ 

25 

26 def __init__( 

27 self, 

28 settings: "Settings", 

29 chunk_size: int | None = None, 

30 chunk_overlap: int | None = None, 

31 ): 

32 """Initialize the chunking strategy. 

33 

34 Args: 

35 settings: Application settings containing configuration for the strategy 

36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value) 

37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value) 

38 """ 

39 self.settings = settings 

40 self.logger = LoggingConfig.get_logger(self.__class__.__name__) 

41 

42 # Initialize token-based chunking parameters 

43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size 

44 self.chunk_overlap = ( 

45 chunk_overlap or settings.global_config.chunking.chunk_overlap 

46 ) 

47 self.tokenizer = settings.global_config.embedding.tokenizer 

48 

49 # Initialize tokenizer based on configuration 

50 if self.tokenizer == "none": 

51 self.encoding = None 

52 else: 

53 try: 

54 self.encoding = tiktoken.get_encoding(self.tokenizer) 

55 except Exception as e: 

56 logger.warning( 

57 "Failed to initialize tokenizer, falling back to simple character counting", 

58 error=str(e), 

59 tokenizer=self.tokenizer, 

60 ) 

61 self.encoding = None 

62 

63 if self.chunk_overlap >= self.chunk_size: 

64 raise ValueError("Chunk overlap must be less than chunk size") 

65 

66 # Master switch for NLP metadata extraction across all strategies. 

67 self._semantic_analysis_enabled = bool( 

68 getattr(settings.global_config.chunking, "enable_semantic_analysis", True) 

69 ) 

70 

71 # Initialize text processor only when NLP metadata extraction is enabled. 

72 self.text_processor = ( 

73 TextProcessor(settings) if self._semantic_analysis_enabled else None 

74 ) 

75 

76 def _count_tokens(self, text: str) -> int: 

77 """Count the number of tokens in a text string.""" 

78 if self.encoding is None: 

79 # Fallback to character count if no tokenizer is available 

80 return len(text) 

81 return len(self.encoding.encode(text)) 

82 

83 def _process_text(self, text: str) -> dict: 

84 """Process text using the text processor. 

85 

86 Args: 

87 text: Text to process 

88 

89 Returns: 

90 dict: Processed text features 

91 """ 

92 if self.text_processor is None: 

93 return {"tokens": [], "entities": [], "pos_tags": [], "chunks": []} 

94 

95 return self.text_processor.process_text(text) 

96 

97 def _should_apply_nlp( 

98 self, content: str, file_path: str = "", content_type: str = "" 

99 ) -> bool: 

100 """Determine if NLP processing should be applied to content. 

101 

102 Args: 

103 content: The content to analyze 

104 file_path: File path for extension-based detection 

105 content_type: Content type if available 

106 

107 Returns: 

108 bool: True if NLP processing would be valuable 

109 """ 

110 # Skip NLP for very large content (performance) 

111 if len(content) > 20000: # 20KB limit 

112 return False 

113 

114 # Get file extension 

115 ext = "" 

116 if file_path and "." in file_path: 

117 ext = f".{file_path.lower().split('.')[-1]}" 

118 

119 # Skip NLP for code files (except comments/docstrings) 

120 code_extensions = { 

121 ".py", 

122 ".pyx", 

123 ".pyi", 

124 ".java", 

125 ".js", 

126 ".jsx", 

127 ".mjs", 

128 ".ts", 

129 ".tsx", 

130 ".go", 

131 ".rs", 

132 ".cpp", 

133 ".cc", 

134 ".cxx", 

135 ".c", 

136 ".h", 

137 ".cs", 

138 ".php", 

139 ".rb", 

140 ".kt", 

141 ".scala", 

142 ".swift", 

143 ".dart", 

144 ".sh", 

145 ".bash", 

146 ".zsh", 

147 ".sql", 

148 ".r", 

149 ".m", 

150 ".pl", 

151 ".lua", 

152 ".vim", 

153 ".asm", 

154 } 

155 if ext in code_extensions: 

156 return False 

157 

158 # Skip NLP for structured data files 

159 structured_extensions = { 

160 ".json", 

161 ".xml", 

162 ".yaml", 

163 ".yml", 

164 ".toml", 

165 ".ini", 

166 ".cfg", 

167 ".conf", 

168 ".csv", 

169 ".tsv", 

170 ".log", 

171 ".properties", 

172 } 

173 if ext in structured_extensions: 

174 return False 

175 

176 # Skip NLP for binary/encoded content 

177 binary_extensions = { 

178 ".pdf", 

179 ".docx", 

180 ".xls", 

181 ".xlsx", 

182 ".pptx", 

183 ".zip", 

184 ".tar", 

185 ".gz", 

186 ".bz2", 

187 ".7z", 

188 ".rar", 

189 ".jpg", 

190 ".jpeg", 

191 ".png", 

192 ".gif", 

193 ".bmp", 

194 ".svg", 

195 ".mp3", 

196 ".mp4", 

197 ".avi", 

198 ".mov", 

199 ".wav", 

200 ".flac", 

201 } 

202 if ext in binary_extensions: 

203 return False 

204 

205 # Apply NLP for documentation and text files 

206 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"} 

207 if ext in text_extensions: 

208 return True 

209 

210 # Apply NLP for HTML content (but be selective) 

211 if ext in {".html", ".htm"} or content_type == "html": 

212 return True 

213 

214 # For unknown extensions, check content characteristics 

215 if not ext: 

216 # Skip if content looks like code (high ratio of special characters) 

217 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`") 

218 if len(content) > 0 and special_chars / len(content) > 0.15: 

219 return False 

220 

221 # Skip if content looks like structured data 

222 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]: 

223 return False 

224 

225 # Default to applying NLP for text-like content 

226 return True 

227 

228 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str: 

229 """Extract only the parts of content that are worth NLP processing. 

230 

231 For code files, this extracts comments and docstrings. 

232 For other files, returns the full content. 

233 

234 Args: 

235 content: The content to process 

236 element_type: Type of code element (if applicable) 

237 

238 Returns: 

239 str: Content suitable for NLP processing 

240 """ 

241 # For code elements, only process comments and docstrings 

242 if element_type in ["comment", "docstring"]: 

243 return content 

244 

245 # For other code elements, extract comments 

246 if element_type in ["function", "method", "class", "module"]: 

247 return self._extract_comments_and_docstrings(content) 

248 

249 # For non-code content, return as-is 

250 return content 

251 

252 def _extract_comments_and_docstrings(self, code_content: str) -> str: 

253 """Extract comments and docstrings from code content. 

254 

255 Args: 

256 code_content: Code content to extract from 

257 

258 Returns: 

259 str: Extracted comments and docstrings 

260 """ 

261 extracted_text = [] 

262 lines = code_content.split("\n") 

263 

264 in_multiline_comment = False 

265 in_docstring = False 

266 docstring_delimiter = None 

267 

268 for line in lines: 

269 stripped = line.strip() 

270 

271 # Python/Shell style comments 

272 if stripped.startswith("#"): 

273 comment = stripped[1:].strip() 

274 if comment: # Skip empty comments 

275 extracted_text.append(comment) 

276 

277 # C/Java/JS style single line comments 

278 elif "//" in stripped: 

279 comment_start = stripped.find("//") 

280 comment = stripped[comment_start + 2 :].strip() 

281 if comment: 

282 extracted_text.append(comment) 

283 

284 # C/Java/JS style multiline comments 

285 elif "/*" in stripped and not in_multiline_comment: 

286 in_multiline_comment = True 

287 comment_start = stripped.find("/*") 

288 comment = stripped[comment_start + 2 :] 

289 if "*/" in comment: 

290 comment = comment[: comment.find("*/")] 

291 in_multiline_comment = False 

292 comment = comment.strip() 

293 if comment: 

294 extracted_text.append(comment) 

295 

296 elif in_multiline_comment: 

297 if "*/" in stripped: 

298 comment = stripped[: stripped.find("*/")] 

299 in_multiline_comment = False 

300 else: 

301 comment = stripped 

302 comment = comment.strip("* \t") 

303 if comment: 

304 extracted_text.append(comment) 

305 

306 # Python docstrings 

307 elif ('"""' in stripped or "'''" in stripped) and not in_docstring: 

308 for delimiter in ['"""', "'''"]: 

309 if delimiter in stripped: 

310 in_docstring = True 

311 docstring_delimiter = delimiter 

312 start_idx = stripped.find(delimiter) 

313 docstring_content = stripped[start_idx + 3 :] 

314 

315 # Check if docstring ends on same line 

316 if delimiter in docstring_content: 

317 end_idx = docstring_content.find(delimiter) 

318 docstring_text = docstring_content[:end_idx].strip() 

319 if docstring_text: 

320 extracted_text.append(docstring_text) 

321 in_docstring = False 

322 docstring_delimiter = None 

323 else: 

324 if docstring_content.strip(): 

325 extracted_text.append(docstring_content.strip()) 

326 break 

327 

328 elif in_docstring and docstring_delimiter: 

329 if docstring_delimiter in stripped: 

330 end_idx = stripped.find(docstring_delimiter) 

331 docstring_text = stripped[:end_idx].strip() 

332 if docstring_text: 

333 extracted_text.append(docstring_text) 

334 in_docstring = False 

335 docstring_delimiter = None 

336 else: 

337 if stripped: 

338 extracted_text.append(stripped) 

339 

340 return "\n".join(extracted_text) 

341 

342 def _create_chunk_document( 

343 self, 

344 original_doc: Document, 

345 chunk_content: str, 

346 chunk_index: int, 

347 total_chunks: int, 

348 skip_nlp: bool = False, 

349 ) -> Document: 

350 """Create a new document for a chunk with enhanced metadata. 

351 

352 Args: 

353 original_doc: Original document 

354 chunk_content: Content of the chunk 

355 chunk_index: Index of the chunk 

356 total_chunks: Total number of chunks 

357 skip_nlp: Whether to skip expensive NLP processing 

358 

359 Returns: 

360 Document: New document instance for the chunk 

361 """ 

362 # Create enhanced metadata 

363 metadata = original_doc.metadata.copy() 

364 metadata.update( 

365 { 

366 "chunk_index": chunk_index, 

367 "total_chunks": total_chunks, 

368 } 

369 ) 

370 

371 # Smart NLP decision based on content type and characteristics 

372 file_path = original_doc.metadata.get("file_name", "") or original_doc.source 

373 content_type = original_doc.content_type or "" 

374 element_type = metadata.get("element_type", "") 

375 

376 # For converted files, use the converted content type instead of original file extension 

377 conversion_method = metadata.get("conversion_method") 

378 if conversion_method == "markitdown": 

379 # File was converted to markdown, so treat it as markdown for NLP purposes 

380 file_path = "converted.md" # Use .md extension for NLP decision 

381 content_type = "md" 

382 

383 nlp_applicable = self._should_apply_nlp(chunk_content, file_path, content_type) 

384 

385 should_apply_nlp = ( 

386 self._semantic_analysis_enabled 

387 and not skip_nlp 

388 and len(chunk_content) <= 10000 # Size limit 

389 and total_chunks <= 50 # Chunk count limit 

390 and nlp_applicable 

391 ) 

392 

393 if not should_apply_nlp: 

394 # Skip NLP processing 

395 skip_reason = "performance_optimization" 

396 if not self._semantic_analysis_enabled: 

397 skip_reason = "semantic_analysis_disabled" 

398 elif len(chunk_content) > 10000: 

399 skip_reason = "chunk_too_large" 

400 elif total_chunks > 50: 

401 skip_reason = "too_many_chunks" 

402 elif not nlp_applicable: 

403 skip_reason = "content_type_inappropriate" 

404 

405 metadata.update( 

406 { 

407 "entities": [], 

408 "pos_tags": [], 

409 "nlp_skipped": True, 

410 "skip_reason": skip_reason, 

411 } 

412 ) 

413 else: 

414 try: 

415 # For code content, only process comments/docstrings 

416 nlp_content = self._extract_nlp_worthy_content( 

417 chunk_content, element_type 

418 ) 

419 

420 if nlp_content.strip(): 

421 # Process the NLP-worthy content 

422 processed = self._process_text(nlp_content) 

423 metadata.update( 

424 { 

425 "entities": processed["entities"], 

426 "pos_tags": processed["pos_tags"], 

427 "nlp_skipped": False, 

428 "nlp_content_extracted": len(nlp_content) 

429 < len(chunk_content), 

430 "nlp_content_ratio": ( 

431 len(nlp_content) / len(chunk_content) 

432 if chunk_content 

433 else 0 

434 ), 

435 } 

436 ) 

437 else: 

438 # No NLP-worthy content found 

439 metadata.update( 

440 { 

441 "entities": [], 

442 "pos_tags": [], 

443 "nlp_skipped": True, 

444 "skip_reason": "no_nlp_worthy_content", 

445 } 

446 ) 

447 except Exception as e: 

448 self.logger.warning( 

449 f"NLP processing failed for chunk {chunk_index}: {e}" 

450 ) 

451 metadata.update( 

452 { 

453 "entities": [], 

454 "pos_tags": [], 

455 "nlp_skipped": True, 

456 "skip_reason": "nlp_error", 

457 } 

458 ) 

459 

460 return Document( 

461 content=chunk_content, 

462 metadata=metadata, 

463 source=original_doc.source, 

464 source_type=original_doc.source_type, 

465 url=original_doc.url, 

466 title=original_doc.title, 

467 content_type=original_doc.content_type, 

468 ) 

469 

470 @abstractmethod 

471 def chunk_document(self, document: Document) -> list[Document]: 

472 """Split a document into chunks while preserving metadata. 

473 

474 This method should: 

475 1. Split the document content into appropriate chunks 

476 2. Preserve all metadata from the original document 

477 3. Add chunk-specific metadata (e.g., chunk index, total chunks) 

478 4. Return a list of new Document instances 

479 

480 Args: 

481 document: The document to chunk 

482 

483 Returns: 

484 List of chunked documents with preserved metadata 

485 

486 Raises: 

487 NotImplementedError: If the strategy doesn't implement this method 

488 """ 

489 raise NotImplementedError( 

490 "Chunking strategy must implement chunk_document method" 

491 )