Coverage for src/qdrant_loader/core/chunking/strategy/base_strategy.py: 95%

165 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Base abstract class for chunking strategies.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6import tiktoken 

7 

8from qdrant_loader.config import Settings 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.text_processing.text_processor import TextProcessor 

11from qdrant_loader.utils.logging import LoggingConfig 

12 

13if TYPE_CHECKING: 

14 from qdrant_loader.config import Settings 

15 

16logger = LoggingConfig.get_logger(__name__) 

17 

18 

19class BaseChunkingStrategy(ABC): 

20 """Base abstract class for all chunking strategies. 

21 

22 This class defines the interface that all chunking strategies must implement. 

23 Each strategy should provide its own implementation of how to split documents 

24 into chunks while preserving their semantic meaning and structure. 

25 """ 

26 

27 def __init__( 

28 self, 

29 settings: "Settings", 

30 chunk_size: int | None = None, 

31 chunk_overlap: int | None = None, 

32 ): 

33 """Initialize the chunking strategy. 

34 

35 Args: 

36 settings: Application settings containing configuration for the strategy 

37 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value) 

38 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value) 

39 """ 

40 self.settings = settings 

41 self.logger = LoggingConfig.get_logger(self.__class__.__name__) 

42 

43 # Initialize token-based chunking parameters 

44 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size 

45 self.chunk_overlap = ( 

46 chunk_overlap or settings.global_config.chunking.chunk_overlap 

47 ) 

48 self.tokenizer = settings.global_config.embedding.tokenizer 

49 

50 # Initialize tokenizer based on configuration 

51 if self.tokenizer == "none": 

52 self.encoding = None 

53 else: 

54 try: 

55 self.encoding = tiktoken.get_encoding(self.tokenizer) 

56 except Exception as e: 

57 logger.warning( 

58 "Failed to initialize tokenizer, falling back to simple character counting", 

59 error=str(e), 

60 tokenizer=self.tokenizer, 

61 ) 

62 self.encoding = None 

63 

64 if self.chunk_overlap >= self.chunk_size: 

65 raise ValueError("Chunk overlap must be less than chunk size") 

66 

67 # Initialize text processor 

68 self.text_processor = TextProcessor(settings) 

69 

70 def _count_tokens(self, text: str) -> int: 

71 """Count the number of tokens in a text string.""" 

72 if self.encoding is None: 

73 # Fallback to character count if no tokenizer is available 

74 return len(text) 

75 return len(self.encoding.encode(text)) 

76 

77 def _process_text(self, text: str) -> dict: 

78 """Process text using the text processor. 

79 

80 Args: 

81 text: Text to process 

82 

83 Returns: 

84 dict: Processed text features 

85 """ 

86 return self.text_processor.process_text(text) 

87 

88 def _should_apply_nlp( 

89 self, content: str, file_path: str = "", content_type: str = "" 

90 ) -> bool: 

91 """Determine if NLP processing should be applied to content. 

92 

93 Args: 

94 content: The content to analyze 

95 file_path: File path for extension-based detection 

96 content_type: Content type if available 

97 

98 Returns: 

99 bool: True if NLP processing would be valuable 

100 """ 

101 # Skip NLP for very large content (performance) 

102 if len(content) > 20000: # 20KB limit 

103 return False 

104 

105 # Get file extension 

106 ext = "" 

107 if file_path and "." in file_path: 

108 ext = f".{file_path.lower().split('.')[-1]}" 

109 

110 # Skip NLP for code files (except comments/docstrings) 

111 code_extensions = { 

112 ".py", 

113 ".pyx", 

114 ".pyi", 

115 ".java", 

116 ".js", 

117 ".jsx", 

118 ".mjs", 

119 ".ts", 

120 ".tsx", 

121 ".go", 

122 ".rs", 

123 ".cpp", 

124 ".cc", 

125 ".cxx", 

126 ".c", 

127 ".h", 

128 ".cs", 

129 ".php", 

130 ".rb", 

131 ".kt", 

132 ".scala", 

133 ".swift", 

134 ".dart", 

135 ".sh", 

136 ".bash", 

137 ".zsh", 

138 ".sql", 

139 ".r", 

140 ".m", 

141 ".pl", 

142 ".lua", 

143 ".vim", 

144 ".asm", 

145 } 

146 if ext in code_extensions: 

147 return False 

148 

149 # Skip NLP for structured data files 

150 structured_extensions = { 

151 ".json", 

152 ".xml", 

153 ".yaml", 

154 ".yml", 

155 ".toml", 

156 ".ini", 

157 ".cfg", 

158 ".conf", 

159 ".csv", 

160 ".tsv", 

161 ".log", 

162 ".properties", 

163 } 

164 if ext in structured_extensions: 

165 return False 

166 

167 # Skip NLP for binary/encoded content 

168 binary_extensions = { 

169 ".pdf", 

170 ".doc", 

171 ".docx", 

172 ".xls", 

173 ".xlsx", 

174 ".ppt", 

175 ".pptx", 

176 ".zip", 

177 ".tar", 

178 ".gz", 

179 ".bz2", 

180 ".7z", 

181 ".rar", 

182 ".jpg", 

183 ".jpeg", 

184 ".png", 

185 ".gif", 

186 ".bmp", 

187 ".svg", 

188 ".mp3", 

189 ".mp4", 

190 ".avi", 

191 ".mov", 

192 ".wav", 

193 ".flac", 

194 } 

195 if ext in binary_extensions: 

196 return False 

197 

198 # Apply NLP for documentation and text files 

199 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"} 

200 if ext in text_extensions: 

201 return True 

202 

203 # Apply NLP for HTML content (but be selective) 

204 if ext in {".html", ".htm"} or content_type == "html": 

205 return True 

206 

207 # For unknown extensions, check content characteristics 

208 if not ext: 

209 # Skip if content looks like code (high ratio of special characters) 

210 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`") 

211 if len(content) > 0 and special_chars / len(content) > 0.15: 

212 return False 

213 

214 # Skip if content looks like structured data 

215 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]: 

216 return False 

217 

218 # Default to applying NLP for text-like content 

219 return True 

220 

221 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str: 

222 """Extract only the parts of content that are worth NLP processing. 

223 

224 For code files, this extracts comments and docstrings. 

225 For other files, returns the full content. 

226 

227 Args: 

228 content: The content to process 

229 element_type: Type of code element (if applicable) 

230 

231 Returns: 

232 str: Content suitable for NLP processing 

233 """ 

234 # For code elements, only process comments and docstrings 

235 if element_type in ["comment", "docstring"]: 

236 return content 

237 

238 # For other code elements, extract comments 

239 if element_type in ["function", "method", "class", "module"]: 

240 return self._extract_comments_and_docstrings(content) 

241 

242 # For non-code content, return as-is 

243 return content 

244 

245 def _extract_comments_and_docstrings(self, code_content: str) -> str: 

246 """Extract comments and docstrings from code content. 

247 

248 Args: 

249 code_content: Code content to extract from 

250 

251 Returns: 

252 str: Extracted comments and docstrings 

253 """ 

254 extracted_text = [] 

255 lines = code_content.split("\n") 

256 

257 in_multiline_comment = False 

258 in_docstring = False 

259 docstring_delimiter = None 

260 

261 for line in lines: 

262 stripped = line.strip() 

263 

264 # Python/Shell style comments 

265 if stripped.startswith("#"): 

266 comment = stripped[1:].strip() 

267 if comment: # Skip empty comments 

268 extracted_text.append(comment) 

269 

270 # C/Java/JS style single line comments 

271 elif "//" in stripped: 

272 comment_start = stripped.find("//") 

273 comment = stripped[comment_start + 2 :].strip() 

274 if comment: 

275 extracted_text.append(comment) 

276 

277 # C/Java/JS style multiline comments 

278 elif "/*" in stripped and not in_multiline_comment: 

279 in_multiline_comment = True 

280 comment_start = stripped.find("/*") 

281 comment = stripped[comment_start + 2 :] 

282 if "*/" in comment: 

283 comment = comment[: comment.find("*/")] 

284 in_multiline_comment = False 

285 comment = comment.strip() 

286 if comment: 

287 extracted_text.append(comment) 

288 

289 elif in_multiline_comment: 

290 if "*/" in stripped: 

291 comment = stripped[: stripped.find("*/")] 

292 in_multiline_comment = False 

293 else: 

294 comment = stripped 

295 comment = comment.strip("* \t") 

296 if comment: 

297 extracted_text.append(comment) 

298 

299 # Python docstrings 

300 elif ('"""' in stripped or "'''" in stripped) and not in_docstring: 

301 for delimiter in ['"""', "'''"]: 

302 if delimiter in stripped: 

303 in_docstring = True 

304 docstring_delimiter = delimiter 

305 start_idx = stripped.find(delimiter) 

306 docstring_content = stripped[start_idx + 3 :] 

307 

308 # Check if docstring ends on same line 

309 if delimiter in docstring_content: 

310 end_idx = docstring_content.find(delimiter) 

311 docstring_text = docstring_content[:end_idx].strip() 

312 if docstring_text: 

313 extracted_text.append(docstring_text) 

314 in_docstring = False 

315 docstring_delimiter = None 

316 else: 

317 if docstring_content.strip(): 

318 extracted_text.append(docstring_content.strip()) 

319 break 

320 

321 elif in_docstring and docstring_delimiter: 

322 if docstring_delimiter in stripped: 

323 end_idx = stripped.find(docstring_delimiter) 

324 docstring_text = stripped[:end_idx].strip() 

325 if docstring_text: 

326 extracted_text.append(docstring_text) 

327 in_docstring = False 

328 docstring_delimiter = None 

329 else: 

330 if stripped: 

331 extracted_text.append(stripped) 

332 

333 return "\n".join(extracted_text) 

334 

335 def _create_chunk_document( 

336 self, 

337 original_doc: Document, 

338 chunk_content: str, 

339 chunk_index: int, 

340 total_chunks: int, 

341 skip_nlp: bool = False, 

342 ) -> Document: 

343 """Create a new document for a chunk with enhanced metadata. 

344 

345 Args: 

346 original_doc: Original document 

347 chunk_content: Content of the chunk 

348 chunk_index: Index of the chunk 

349 total_chunks: Total number of chunks 

350 skip_nlp: Whether to skip expensive NLP processing 

351 

352 Returns: 

353 Document: New document instance for the chunk 

354 """ 

355 # Create enhanced metadata 

356 metadata = original_doc.metadata.copy() 

357 metadata.update( 

358 { 

359 "chunk_index": chunk_index, 

360 "total_chunks": total_chunks, 

361 } 

362 ) 

363 

364 # Smart NLP decision based on content type and characteristics 

365 file_path = original_doc.metadata.get("file_name", "") or original_doc.source 

366 content_type = original_doc.content_type or "" 

367 element_type = metadata.get("element_type", "") 

368 

369 # For converted files, use the converted content type instead of original file extension 

370 conversion_method = metadata.get("conversion_method") 

371 if conversion_method == "markitdown": 

372 # File was converted to markdown, so treat it as markdown for NLP purposes 

373 file_path = "converted.md" # Use .md extension for NLP decision 

374 content_type = "md" 

375 

376 should_apply_nlp = ( 

377 not skip_nlp 

378 and len(chunk_content) <= 10000 # Size limit 

379 and total_chunks <= 50 # Chunk count limit 

380 and self._should_apply_nlp(chunk_content, file_path, content_type) 

381 ) 

382 

383 if not should_apply_nlp: 

384 # Skip NLP processing 

385 skip_reason = "performance_optimization" 

386 if len(chunk_content) > 10000: 

387 skip_reason = "chunk_too_large" 

388 elif total_chunks > 50: 

389 skip_reason = "too_many_chunks" 

390 elif not self._should_apply_nlp(chunk_content, file_path, content_type): 

391 skip_reason = "content_type_inappropriate" 

392 

393 metadata.update( 

394 { 

395 "entities": [], 

396 "pos_tags": [], 

397 "nlp_skipped": True, 

398 "skip_reason": skip_reason, 

399 } 

400 ) 

401 else: 

402 try: 

403 # For code content, only process comments/docstrings 

404 nlp_content = self._extract_nlp_worthy_content( 

405 chunk_content, element_type 

406 ) 

407 

408 if nlp_content.strip(): 

409 # Process the NLP-worthy content 

410 processed = self._process_text(nlp_content) 

411 metadata.update( 

412 { 

413 "entities": processed["entities"], 

414 "pos_tags": processed["pos_tags"], 

415 "nlp_skipped": False, 

416 "nlp_content_extracted": len(nlp_content) 

417 < len(chunk_content), 

418 "nlp_content_ratio": ( 

419 len(nlp_content) / len(chunk_content) 

420 if chunk_content 

421 else 0 

422 ), 

423 } 

424 ) 

425 else: 

426 # No NLP-worthy content found 

427 metadata.update( 

428 { 

429 "entities": [], 

430 "pos_tags": [], 

431 "nlp_skipped": True, 

432 "skip_reason": "no_nlp_worthy_content", 

433 } 

434 ) 

435 except Exception as e: 

436 self.logger.warning( 

437 f"NLP processing failed for chunk {chunk_index}: {e}" 

438 ) 

439 metadata.update( 

440 { 

441 "entities": [], 

442 "pos_tags": [], 

443 "nlp_skipped": True, 

444 "skip_reason": "nlp_error", 

445 } 

446 ) 

447 

448 return Document( 

449 content=chunk_content, 

450 metadata=metadata, 

451 source=original_doc.source, 

452 source_type=original_doc.source_type, 

453 url=original_doc.url, 

454 title=original_doc.title, 

455 content_type=original_doc.content_type, 

456 ) 

457 

458 @abstractmethod 

459 def chunk_document(self, document: Document) -> list[Document]: 

460 """Split a document into chunks while preserving metadata. 

461 

462 This method should: 

463 1. Split the document content into appropriate chunks 

464 2. Preserve all metadata from the original document 

465 3. Add chunk-specific metadata (e.g., chunk index, total chunks) 

466 4. Return a list of new Document instances 

467 

468 Args: 

469 document: The document to chunk 

470 

471 Returns: 

472 List of chunked documents with preserved metadata 

473 

474 Raises: 

475 NotImplementedError: If the strategy doesn't implement this method 

476 """ 

477 raise NotImplementedError( 

478 "Chunking strategy must implement chunk_document method" 

479 ) 

480 

481 @abstractmethod 

482 def _split_text(self, text: str) -> list[str]: 

483 """Split text into chunks based on strategy-specific rules. 

484 

485 This method should: 

486 1. Implement the specific chunking logic for the strategy 

487 2. Return a list of text chunks 

488 3. Preserve the semantic meaning of the content 

489 

490 Args: 

491 text: The text to split into chunks 

492 

493 Returns: 

494 List of text chunks 

495 

496 Raises: 

497 NotImplementedError: If the strategy doesn't implement this method 

498 """ 

499 raise NotImplementedError("Chunking strategy must implement _split_text method")