Coverage for src / qdrant_loader / core / chunking / strategy / base_strategy.py: 96%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:48 +0000

1"""Base abstract class for chunking strategies.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6import tiktoken 

7 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.text_processing.text_processor import TextProcessor 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12if TYPE_CHECKING: 

13 from qdrant_loader.config import Settings 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18class BaseChunkingStrategy(ABC): 

19 """Base abstract class for all chunking strategies. 

20 

21 This class defines the interface that all chunking strategies must implement. 

22 Each strategy should provide its own implementation of how to split documents 

23 into chunks while preserving their semantic meaning and structure. 

24 """ 

25 

26 def __init__( 

27 self, 

28 settings: "Settings", 

29 chunk_size: int | None = None, 

30 chunk_overlap: int | None = None, 

31 ): 

32 """Initialize the chunking strategy. 

33 

34 Args: 

35 settings: Application settings containing configuration for the strategy 

36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value) 

37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value) 

38 """ 

39 self.settings = settings 

40 self.logger = LoggingConfig.get_logger(self.__class__.__name__) 

41 

42 # Initialize token-based chunking parameters 

43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size 

44 self.chunk_overlap = ( 

45 chunk_overlap or settings.global_config.chunking.chunk_overlap 

46 ) 

47 self.tokenizer = settings.global_config.embedding.tokenizer 

48 

49 # Initialize tokenizer based on configuration 

50 if self.tokenizer == "none": 

51 self.encoding = None 

52 else: 

53 try: 

54 self.encoding = tiktoken.get_encoding(self.tokenizer) 

55 except Exception as e: 

56 logger.warning( 

57 "Failed to initialize tokenizer, falling back to simple character counting", 

58 error=str(e), 

59 tokenizer=self.tokenizer, 

60 ) 

61 self.encoding = None 

62 

63 if self.chunk_overlap >= self.chunk_size: 

64 raise ValueError("Chunk overlap must be less than chunk size") 

65 

66 # Initialize text processor 

67 self.text_processor = TextProcessor(settings) 

68 

69 def _count_tokens(self, text: str) -> int: 

70 """Count the number of tokens in a text string.""" 

71 if self.encoding is None: 

72 # Fallback to character count if no tokenizer is available 

73 return len(text) 

74 return len(self.encoding.encode(text)) 

75 

76 def _process_text(self, text: str) -> dict: 

77 """Process text using the text processor. 

78 

79 Args: 

80 text: Text to process 

81 

82 Returns: 

83 dict: Processed text features 

84 """ 

85 return self.text_processor.process_text(text) 

86 

87 def _should_apply_nlp( 

88 self, content: str, file_path: str = "", content_type: str = "" 

89 ) -> bool: 

90 """Determine if NLP processing should be applied to content. 

91 

92 Args: 

93 content: The content to analyze 

94 file_path: File path for extension-based detection 

95 content_type: Content type if available 

96 

97 Returns: 

98 bool: True if NLP processing would be valuable 

99 """ 

100 # Skip NLP for very large content (performance) 

101 if len(content) > 20000: # 20KB limit 

102 return False 

103 

104 # Get file extension 

105 ext = "" 

106 if file_path and "." in file_path: 

107 ext = f".{file_path.lower().split('.')[-1]}" 

108 

109 # Skip NLP for code files (except comments/docstrings) 

110 code_extensions = { 

111 ".py", 

112 ".pyx", 

113 ".pyi", 

114 ".java", 

115 ".js", 

116 ".jsx", 

117 ".mjs", 

118 ".ts", 

119 ".tsx", 

120 ".go", 

121 ".rs", 

122 ".cpp", 

123 ".cc", 

124 ".cxx", 

125 ".c", 

126 ".h", 

127 ".cs", 

128 ".php", 

129 ".rb", 

130 ".kt", 

131 ".scala", 

132 ".swift", 

133 ".dart", 

134 ".sh", 

135 ".bash", 

136 ".zsh", 

137 ".sql", 

138 ".r", 

139 ".m", 

140 ".pl", 

141 ".lua", 

142 ".vim", 

143 ".asm", 

144 } 

145 if ext in code_extensions: 

146 return False 

147 

148 # Skip NLP for structured data files 

149 structured_extensions = { 

150 ".json", 

151 ".xml", 

152 ".yaml", 

153 ".yml", 

154 ".toml", 

155 ".ini", 

156 ".cfg", 

157 ".conf", 

158 ".csv", 

159 ".tsv", 

160 ".log", 

161 ".properties", 

162 } 

163 if ext in structured_extensions: 

164 return False 

165 

166 # Skip NLP for binary/encoded content 

167 binary_extensions = { 

168 ".pdf", 

169 ".docx", 

170 ".xls", 

171 ".xlsx", 

172 ".pptx", 

173 ".zip", 

174 ".tar", 

175 ".gz", 

176 ".bz2", 

177 ".7z", 

178 ".rar", 

179 ".jpg", 

180 ".jpeg", 

181 ".png", 

182 ".gif", 

183 ".bmp", 

184 ".svg", 

185 ".mp3", 

186 ".mp4", 

187 ".avi", 

188 ".mov", 

189 ".wav", 

190 ".flac", 

191 } 

192 if ext in binary_extensions: 

193 return False 

194 

195 # Apply NLP for documentation and text files 

196 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"} 

197 if ext in text_extensions: 

198 return True 

199 

200 # Apply NLP for HTML content (but be selective) 

201 if ext in {".html", ".htm"} or content_type == "html": 

202 return True 

203 

204 # For unknown extensions, check content characteristics 

205 if not ext: 

206 # Skip if content looks like code (high ratio of special characters) 

207 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`") 

208 if len(content) > 0 and special_chars / len(content) > 0.15: 

209 return False 

210 

211 # Skip if content looks like structured data 

212 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]: 

213 return False 

214 

215 # Default to applying NLP for text-like content 

216 return True 

217 

218 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str: 

219 """Extract only the parts of content that are worth NLP processing. 

220 

221 For code files, this extracts comments and docstrings. 

222 For other files, returns the full content. 

223 

224 Args: 

225 content: The content to process 

226 element_type: Type of code element (if applicable) 

227 

228 Returns: 

229 str: Content suitable for NLP processing 

230 """ 

231 # For code elements, only process comments and docstrings 

232 if element_type in ["comment", "docstring"]: 

233 return content 

234 

235 # For other code elements, extract comments 

236 if element_type in ["function", "method", "class", "module"]: 

237 return self._extract_comments_and_docstrings(content) 

238 

239 # For non-code content, return as-is 

240 return content 

241 

242 def _extract_comments_and_docstrings(self, code_content: str) -> str: 

243 """Extract comments and docstrings from code content. 

244 

245 Args: 

246 code_content: Code content to extract from 

247 

248 Returns: 

249 str: Extracted comments and docstrings 

250 """ 

251 extracted_text = [] 

252 lines = code_content.split("\n") 

253 

254 in_multiline_comment = False 

255 in_docstring = False 

256 docstring_delimiter = None 

257 

258 for line in lines: 

259 stripped = line.strip() 

260 

261 # Python/Shell style comments 

262 if stripped.startswith("#"): 

263 comment = stripped[1:].strip() 

264 if comment: # Skip empty comments 

265 extracted_text.append(comment) 

266 

267 # C/Java/JS style single line comments 

268 elif "//" in stripped: 

269 comment_start = stripped.find("//") 

270 comment = stripped[comment_start + 2 :].strip() 

271 if comment: 

272 extracted_text.append(comment) 

273 

274 # C/Java/JS style multiline comments 

275 elif "/*" in stripped and not in_multiline_comment: 

276 in_multiline_comment = True 

277 comment_start = stripped.find("/*") 

278 comment = stripped[comment_start + 2 :] 

279 if "*/" in comment: 

280 comment = comment[: comment.find("*/")] 

281 in_multiline_comment = False 

282 comment = comment.strip() 

283 if comment: 

284 extracted_text.append(comment) 

285 

286 elif in_multiline_comment: 

287 if "*/" in stripped: 

288 comment = stripped[: stripped.find("*/")] 

289 in_multiline_comment = False 

290 else: 

291 comment = stripped 

292 comment = comment.strip("* \t") 

293 if comment: 

294 extracted_text.append(comment) 

295 

296 # Python docstrings 

297 elif ('"""' in stripped or "'''" in stripped) and not in_docstring: 

298 for delimiter in ['"""', "'''"]: 

299 if delimiter in stripped: 

300 in_docstring = True 

301 docstring_delimiter = delimiter 

302 start_idx = stripped.find(delimiter) 

303 docstring_content = stripped[start_idx + 3 :] 

304 

305 # Check if docstring ends on same line 

306 if delimiter in docstring_content: 

307 end_idx = docstring_content.find(delimiter) 

308 docstring_text = docstring_content[:end_idx].strip() 

309 if docstring_text: 

310 extracted_text.append(docstring_text) 

311 in_docstring = False 

312 docstring_delimiter = None 

313 else: 

314 if docstring_content.strip(): 

315 extracted_text.append(docstring_content.strip()) 

316 break 

317 

318 elif in_docstring and docstring_delimiter: 

319 if docstring_delimiter in stripped: 

320 end_idx = stripped.find(docstring_delimiter) 

321 docstring_text = stripped[:end_idx].strip() 

322 if docstring_text: 

323 extracted_text.append(docstring_text) 

324 in_docstring = False 

325 docstring_delimiter = None 

326 else: 

327 if stripped: 

328 extracted_text.append(stripped) 

329 

330 return "\n".join(extracted_text) 

331 

332 def _create_chunk_document( 

333 self, 

334 original_doc: Document, 

335 chunk_content: str, 

336 chunk_index: int, 

337 total_chunks: int, 

338 skip_nlp: bool = False, 

339 ) -> Document: 

340 """Create a new document for a chunk with enhanced metadata. 

341 

342 Args: 

343 original_doc: Original document 

344 chunk_content: Content of the chunk 

345 chunk_index: Index of the chunk 

346 total_chunks: Total number of chunks 

347 skip_nlp: Whether to skip expensive NLP processing 

348 

349 Returns: 

350 Document: New document instance for the chunk 

351 """ 

352 # Create enhanced metadata 

353 metadata = original_doc.metadata.copy() 

354 metadata.update( 

355 { 

356 "chunk_index": chunk_index, 

357 "total_chunks": total_chunks, 

358 } 

359 ) 

360 

361 # Smart NLP decision based on content type and characteristics 

362 file_path = original_doc.metadata.get("file_name", "") or original_doc.source 

363 content_type = original_doc.content_type or "" 

364 element_type = metadata.get("element_type", "") 

365 

366 # For converted files, use the converted content type instead of original file extension 

367 conversion_method = metadata.get("conversion_method") 

368 if conversion_method == "markitdown": 

369 # File was converted to markdown, so treat it as markdown for NLP purposes 

370 file_path = "converted.md" # Use .md extension for NLP decision 

371 content_type = "md" 

372 

373 should_apply_nlp = ( 

374 not skip_nlp 

375 and len(chunk_content) <= 10000 # Size limit 

376 and total_chunks <= 50 # Chunk count limit 

377 and self._should_apply_nlp(chunk_content, file_path, content_type) 

378 ) 

379 

380 if not should_apply_nlp: 

381 # Skip NLP processing 

382 skip_reason = "performance_optimization" 

383 if len(chunk_content) > 10000: 

384 skip_reason = "chunk_too_large" 

385 elif total_chunks > 50: 

386 skip_reason = "too_many_chunks" 

387 elif not self._should_apply_nlp(chunk_content, file_path, content_type): 

388 skip_reason = "content_type_inappropriate" 

389 

390 metadata.update( 

391 { 

392 "entities": [], 

393 "pos_tags": [], 

394 "nlp_skipped": True, 

395 "skip_reason": skip_reason, 

396 } 

397 ) 

398 else: 

399 try: 

400 # For code content, only process comments/docstrings 

401 nlp_content = self._extract_nlp_worthy_content( 

402 chunk_content, element_type 

403 ) 

404 

405 if nlp_content.strip(): 

406 # Process the NLP-worthy content 

407 processed = self._process_text(nlp_content) 

408 metadata.update( 

409 { 

410 "entities": processed["entities"], 

411 "pos_tags": processed["pos_tags"], 

412 "nlp_skipped": False, 

413 "nlp_content_extracted": len(nlp_content) 

414 < len(chunk_content), 

415 "nlp_content_ratio": ( 

416 len(nlp_content) / len(chunk_content) 

417 if chunk_content 

418 else 0 

419 ), 

420 } 

421 ) 

422 else: 

423 # No NLP-worthy content found 

424 metadata.update( 

425 { 

426 "entities": [], 

427 "pos_tags": [], 

428 "nlp_skipped": True, 

429 "skip_reason": "no_nlp_worthy_content", 

430 } 

431 ) 

432 except Exception as e: 

433 self.logger.warning( 

434 f"NLP processing failed for chunk {chunk_index}: {e}" 

435 ) 

436 metadata.update( 

437 { 

438 "entities": [], 

439 "pos_tags": [], 

440 "nlp_skipped": True, 

441 "skip_reason": "nlp_error", 

442 } 

443 ) 

444 

445 return Document( 

446 content=chunk_content, 

447 metadata=metadata, 

448 source=original_doc.source, 

449 source_type=original_doc.source_type, 

450 url=original_doc.url, 

451 title=original_doc.title, 

452 content_type=original_doc.content_type, 

453 ) 

454 

455 @abstractmethod 

456 def chunk_document(self, document: Document) -> list[Document]: 

457 """Split a document into chunks while preserving metadata. 

458 

459 This method should: 

460 1. Split the document content into appropriate chunks 

461 2. Preserve all metadata from the original document 

462 3. Add chunk-specific metadata (e.g., chunk index, total chunks) 

463 4. Return a list of new Document instances 

464 

465 Args: 

466 document: The document to chunk 

467 

468 Returns: 

469 List of chunked documents with preserved metadata 

470 

471 Raises: 

472 NotImplementedError: If the strategy doesn't implement this method 

473 """ 

474 raise NotImplementedError( 

475 "Chunking strategy must implement chunk_document method" 

476 )