Coverage for src/qdrant_loader/core/chunking/strategy/base_strategy.py: 96%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Base abstract class for chunking strategies.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6import tiktoken 

7 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.text_processing.text_processor import TextProcessor 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12if TYPE_CHECKING: 

13 from qdrant_loader.config import Settings 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18class BaseChunkingStrategy(ABC): 

19 """Base abstract class for all chunking strategies. 

20 

21 This class defines the interface that all chunking strategies must implement. 

22 Each strategy should provide its own implementation of how to split documents 

23 into chunks while preserving their semantic meaning and structure. 

24 """ 

25 

26 def __init__( 

27 self, 

28 settings: "Settings", 

29 chunk_size: int | None = None, 

30 chunk_overlap: int | None = None, 

31 ): 

32 """Initialize the chunking strategy. 

33 

34 Args: 

35 settings: Application settings containing configuration for the strategy 

36 chunk_size: Maximum number of tokens per chunk (optional, defaults to settings value) 

37 chunk_overlap: Number of tokens to overlap between chunks (optional, defaults to settings value) 

38 """ 

39 self.settings = settings 

40 self.logger = LoggingConfig.get_logger(self.__class__.__name__) 

41 

42 # Initialize token-based chunking parameters 

43 self.chunk_size = chunk_size or settings.global_config.chunking.chunk_size 

44 self.chunk_overlap = ( 

45 chunk_overlap or settings.global_config.chunking.chunk_overlap 

46 ) 

47 self.tokenizer = settings.global_config.embedding.tokenizer 

48 

49 # Initialize tokenizer based on configuration 

50 if self.tokenizer == "none": 

51 self.encoding = None 

52 else: 

53 try: 

54 self.encoding = tiktoken.get_encoding(self.tokenizer) 

55 except Exception as e: 

56 logger.warning( 

57 "Failed to initialize tokenizer, falling back to simple character counting", 

58 error=str(e), 

59 tokenizer=self.tokenizer, 

60 ) 

61 self.encoding = None 

62 

63 if self.chunk_overlap >= self.chunk_size: 

64 raise ValueError("Chunk overlap must be less than chunk size") 

65 

66 # Initialize text processor 

67 self.text_processor = TextProcessor(settings) 

68 

69 def _count_tokens(self, text: str) -> int: 

70 """Count the number of tokens in a text string.""" 

71 if self.encoding is None: 

72 # Fallback to character count if no tokenizer is available 

73 return len(text) 

74 return len(self.encoding.encode(text)) 

75 

76 def _process_text(self, text: str) -> dict: 

77 """Process text using the text processor. 

78 

79 Args: 

80 text: Text to process 

81 

82 Returns: 

83 dict: Processed text features 

84 """ 

85 return self.text_processor.process_text(text) 

86 

87 def _should_apply_nlp( 

88 self, content: str, file_path: str = "", content_type: str = "" 

89 ) -> bool: 

90 """Determine if NLP processing should be applied to content. 

91 

92 Args: 

93 content: The content to analyze 

94 file_path: File path for extension-based detection 

95 content_type: Content type if available 

96 

97 Returns: 

98 bool: True if NLP processing would be valuable 

99 """ 

100 # Skip NLP for very large content (performance) 

101 if len(content) > 20000: # 20KB limit 

102 return False 

103 

104 # Get file extension 

105 ext = "" 

106 if file_path and "." in file_path: 

107 ext = f".{file_path.lower().split('.')[-1]}" 

108 

109 # Skip NLP for code files (except comments/docstrings) 

110 code_extensions = { 

111 ".py", 

112 ".pyx", 

113 ".pyi", 

114 ".java", 

115 ".js", 

116 ".jsx", 

117 ".mjs", 

118 ".ts", 

119 ".tsx", 

120 ".go", 

121 ".rs", 

122 ".cpp", 

123 ".cc", 

124 ".cxx", 

125 ".c", 

126 ".h", 

127 ".cs", 

128 ".php", 

129 ".rb", 

130 ".kt", 

131 ".scala", 

132 ".swift", 

133 ".dart", 

134 ".sh", 

135 ".bash", 

136 ".zsh", 

137 ".sql", 

138 ".r", 

139 ".m", 

140 ".pl", 

141 ".lua", 

142 ".vim", 

143 ".asm", 

144 } 

145 if ext in code_extensions: 

146 return False 

147 

148 # Skip NLP for structured data files 

149 structured_extensions = { 

150 ".json", 

151 ".xml", 

152 ".yaml", 

153 ".yml", 

154 ".toml", 

155 ".ini", 

156 ".cfg", 

157 ".conf", 

158 ".csv", 

159 ".tsv", 

160 ".log", 

161 ".properties", 

162 } 

163 if ext in structured_extensions: 

164 return False 

165 

166 # Skip NLP for binary/encoded content 

167 binary_extensions = { 

168 ".pdf", 

169 ".doc", 

170 ".docx", 

171 ".xls", 

172 ".xlsx", 

173 ".ppt", 

174 ".pptx", 

175 ".zip", 

176 ".tar", 

177 ".gz", 

178 ".bz2", 

179 ".7z", 

180 ".rar", 

181 ".jpg", 

182 ".jpeg", 

183 ".png", 

184 ".gif", 

185 ".bmp", 

186 ".svg", 

187 ".mp3", 

188 ".mp4", 

189 ".avi", 

190 ".mov", 

191 ".wav", 

192 ".flac", 

193 } 

194 if ext in binary_extensions: 

195 return False 

196 

197 # Apply NLP for documentation and text files 

198 text_extensions = {".md", ".txt", ".rst", ".adoc", ".tex", ".rtf"} 

199 if ext in text_extensions: 

200 return True 

201 

202 # Apply NLP for HTML content (but be selective) 

203 if ext in {".html", ".htm"} or content_type == "html": 

204 return True 

205 

206 # For unknown extensions, check content characteristics 

207 if not ext: 

208 # Skip if content looks like code (high ratio of special characters) 

209 special_chars = sum(1 for c in content if c in "{}[]();,=<>!&|+-*/%^~`") 

210 if len(content) > 0 and special_chars / len(content) > 0.15: 

211 return False 

212 

213 # Skip if content looks like structured data 

214 if content.strip().startswith(("{", "[", "<")) or "=" in content[:100]: 

215 return False 

216 

217 # Default to applying NLP for text-like content 

218 return True 

219 

220 def _extract_nlp_worthy_content(self, content: str, element_type: str = "") -> str: 

221 """Extract only the parts of content that are worth NLP processing. 

222 

223 For code files, this extracts comments and docstrings. 

224 For other files, returns the full content. 

225 

226 Args: 

227 content: The content to process 

228 element_type: Type of code element (if applicable) 

229 

230 Returns: 

231 str: Content suitable for NLP processing 

232 """ 

233 # For code elements, only process comments and docstrings 

234 if element_type in ["comment", "docstring"]: 

235 return content 

236 

237 # For other code elements, extract comments 

238 if element_type in ["function", "method", "class", "module"]: 

239 return self._extract_comments_and_docstrings(content) 

240 

241 # For non-code content, return as-is 

242 return content 

243 

244 def _extract_comments_and_docstrings(self, code_content: str) -> str: 

245 """Extract comments and docstrings from code content. 

246 

247 Args: 

248 code_content: Code content to extract from 

249 

250 Returns: 

251 str: Extracted comments and docstrings 

252 """ 

253 extracted_text = [] 

254 lines = code_content.split("\n") 

255 

256 in_multiline_comment = False 

257 in_docstring = False 

258 docstring_delimiter = None 

259 

260 for line in lines: 

261 stripped = line.strip() 

262 

263 # Python/Shell style comments 

264 if stripped.startswith("#"): 

265 comment = stripped[1:].strip() 

266 if comment: # Skip empty comments 

267 extracted_text.append(comment) 

268 

269 # C/Java/JS style single line comments 

270 elif "//" in stripped: 

271 comment_start = stripped.find("//") 

272 comment = stripped[comment_start + 2 :].strip() 

273 if comment: 

274 extracted_text.append(comment) 

275 

276 # C/Java/JS style multiline comments 

277 elif "/*" in stripped and not in_multiline_comment: 

278 in_multiline_comment = True 

279 comment_start = stripped.find("/*") 

280 comment = stripped[comment_start + 2 :] 

281 if "*/" in comment: 

282 comment = comment[: comment.find("*/")] 

283 in_multiline_comment = False 

284 comment = comment.strip() 

285 if comment: 

286 extracted_text.append(comment) 

287 

288 elif in_multiline_comment: 

289 if "*/" in stripped: 

290 comment = stripped[: stripped.find("*/")] 

291 in_multiline_comment = False 

292 else: 

293 comment = stripped 

294 comment = comment.strip("* \t") 

295 if comment: 

296 extracted_text.append(comment) 

297 

298 # Python docstrings 

299 elif ('"""' in stripped or "'''" in stripped) and not in_docstring: 

300 for delimiter in ['"""', "'''"]: 

301 if delimiter in stripped: 

302 in_docstring = True 

303 docstring_delimiter = delimiter 

304 start_idx = stripped.find(delimiter) 

305 docstring_content = stripped[start_idx + 3 :] 

306 

307 # Check if docstring ends on same line 

308 if delimiter in docstring_content: 

309 end_idx = docstring_content.find(delimiter) 

310 docstring_text = docstring_content[:end_idx].strip() 

311 if docstring_text: 

312 extracted_text.append(docstring_text) 

313 in_docstring = False 

314 docstring_delimiter = None 

315 else: 

316 if docstring_content.strip(): 

317 extracted_text.append(docstring_content.strip()) 

318 break 

319 

320 elif in_docstring and docstring_delimiter: 

321 if docstring_delimiter in stripped: 

322 end_idx = stripped.find(docstring_delimiter) 

323 docstring_text = stripped[:end_idx].strip() 

324 if docstring_text: 

325 extracted_text.append(docstring_text) 

326 in_docstring = False 

327 docstring_delimiter = None 

328 else: 

329 if stripped: 

330 extracted_text.append(stripped) 

331 

332 return "\n".join(extracted_text) 

333 

334 def _create_chunk_document( 

335 self, 

336 original_doc: Document, 

337 chunk_content: str, 

338 chunk_index: int, 

339 total_chunks: int, 

340 skip_nlp: bool = False, 

341 ) -> Document: 

342 """Create a new document for a chunk with enhanced metadata. 

343 

344 Args: 

345 original_doc: Original document 

346 chunk_content: Content of the chunk 

347 chunk_index: Index of the chunk 

348 total_chunks: Total number of chunks 

349 skip_nlp: Whether to skip expensive NLP processing 

350 

351 Returns: 

352 Document: New document instance for the chunk 

353 """ 

354 # Create enhanced metadata 

355 metadata = original_doc.metadata.copy() 

356 metadata.update( 

357 { 

358 "chunk_index": chunk_index, 

359 "total_chunks": total_chunks, 

360 } 

361 ) 

362 

363 # Smart NLP decision based on content type and characteristics 

364 file_path = original_doc.metadata.get("file_name", "") or original_doc.source 

365 content_type = original_doc.content_type or "" 

366 element_type = metadata.get("element_type", "") 

367 

368 # For converted files, use the converted content type instead of original file extension 

369 conversion_method = metadata.get("conversion_method") 

370 if conversion_method == "markitdown": 

371 # File was converted to markdown, so treat it as markdown for NLP purposes 

372 file_path = "converted.md" # Use .md extension for NLP decision 

373 content_type = "md" 

374 

375 should_apply_nlp = ( 

376 not skip_nlp 

377 and len(chunk_content) <= 10000 # Size limit 

378 and total_chunks <= 50 # Chunk count limit 

379 and self._should_apply_nlp(chunk_content, file_path, content_type) 

380 ) 

381 

382 if not should_apply_nlp: 

383 # Skip NLP processing 

384 skip_reason = "performance_optimization" 

385 if len(chunk_content) > 10000: 

386 skip_reason = "chunk_too_large" 

387 elif total_chunks > 50: 

388 skip_reason = "too_many_chunks" 

389 elif not self._should_apply_nlp(chunk_content, file_path, content_type): 

390 skip_reason = "content_type_inappropriate" 

391 

392 metadata.update( 

393 { 

394 "entities": [], 

395 "pos_tags": [], 

396 "nlp_skipped": True, 

397 "skip_reason": skip_reason, 

398 } 

399 ) 

400 else: 

401 try: 

402 # For code content, only process comments/docstrings 

403 nlp_content = self._extract_nlp_worthy_content( 

404 chunk_content, element_type 

405 ) 

406 

407 if nlp_content.strip(): 

408 # Process the NLP-worthy content 

409 processed = self._process_text(nlp_content) 

410 metadata.update( 

411 { 

412 "entities": processed["entities"], 

413 "pos_tags": processed["pos_tags"], 

414 "nlp_skipped": False, 

415 "nlp_content_extracted": len(nlp_content) 

416 < len(chunk_content), 

417 "nlp_content_ratio": ( 

418 len(nlp_content) / len(chunk_content) 

419 if chunk_content 

420 else 0 

421 ), 

422 } 

423 ) 

424 else: 

425 # No NLP-worthy content found 

426 metadata.update( 

427 { 

428 "entities": [], 

429 "pos_tags": [], 

430 "nlp_skipped": True, 

431 "skip_reason": "no_nlp_worthy_content", 

432 } 

433 ) 

434 except Exception as e: 

435 self.logger.warning( 

436 f"NLP processing failed for chunk {chunk_index}: {e}" 

437 ) 

438 metadata.update( 

439 { 

440 "entities": [], 

441 "pos_tags": [], 

442 "nlp_skipped": True, 

443 "skip_reason": "nlp_error", 

444 } 

445 ) 

446 

447 return Document( 

448 content=chunk_content, 

449 metadata=metadata, 

450 source=original_doc.source, 

451 source_type=original_doc.source_type, 

452 url=original_doc.url, 

453 title=original_doc.title, 

454 content_type=original_doc.content_type, 

455 ) 

456 

457 @abstractmethod 

458 def chunk_document(self, document: Document) -> list[Document]: 

459 """Split a document into chunks while preserving metadata. 

460 

461 This method should: 

462 1. Split the document content into appropriate chunks 

463 2. Preserve all metadata from the original document 

464 3. Add chunk-specific metadata (e.g., chunk index, total chunks) 

465 4. Return a list of new Document instances 

466 

467 Args: 

468 document: The document to chunk 

469 

470 Returns: 

471 List of chunked documents with preserved metadata 

472 

473 Raises: 

474 NotImplementedError: If the strategy doesn't implement this method 

475 """ 

476 raise NotImplementedError( 

477 "Chunking strategy must implement chunk_document method" 

478 )