Coverage for src / qdrant_loader / core / chunking / strategy / markdown / section_splitter.py: 89%

226 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:48 +0000

1"""Section splitting strategies for markdown chunking.""" 

2 

3import re 

4from dataclasses import dataclass 

5from typing import TYPE_CHECKING, Any 

6 

7import structlog 

8 

9if TYPE_CHECKING: 

10 from qdrant_loader.config import Settings 

11 

12# Re-export classes and local dependencies at top to satisfy E402 

13from .document_parser import DocumentParser, HierarchyBuilder # noqa: F401 

14from .splitters.base import BaseSplitter # re-export base class # noqa: F401 

15from .splitters.excel import ExcelSplitter # re-export # noqa: F401 

16from .splitters.fallback import FallbackSplitter # re-export # noqa: F401 

17from .splitters.standard import StandardSplitter # re-export # noqa: F401 

18 

19logger = structlog.get_logger(__name__) 

20 

21 

22# Markdown configuration placeholder - can be imported from settings if needed 

23class MarkdownConfig: 

24 """Configuration for markdown processing.""" 

25 

26 words_per_minute_reading = 200 

27 

28 

29markdown_config = MarkdownConfig() 

30 

31 

32@dataclass 

33class HeaderAnalysis: 

34 """Analysis of header distribution in a document.""" 

35 

36 h1: int = 0 

37 h2: int = 0 

38 h3: int = 0 

39 h4: int = 0 

40 h5: int = 0 

41 h6: int = 0 

42 total_headers: int = 0 

43 content_length: int = 0 

44 avg_section_size: int = 0 

45 

46 def __post_init__(self): 

47 """Calculate derived metrics.""" 

48 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6 

49 if self.total_headers > 0: 

50 self.avg_section_size = self.content_length // self.total_headers 

51 

52 

53@dataclass 

54class SectionMetadata: 

55 """Enhanced section metadata with hierarchical relationships.""" 

56 

57 title: str 

58 level: int 

59 content: str 

60 order: int 

61 start_line: int 

62 end_line: int 

63 parent_section: str = None 

64 breadcrumb: str = "" 

65 anchor: str = "" 

66 previous_section: str = None 

67 next_section: str = None 

68 sibling_sections: list[str] = None 

69 subsections: list[str] = None 

70 content_analysis: dict = None 

71 

72 def __post_init__(self): 

73 """Initialize default values.""" 

74 if self.sibling_sections is None: 

75 self.sibling_sections = [] 

76 if self.subsections is None: 

77 self.subsections = [] 

78 if self.content_analysis is None: 

79 self.content_analysis = {} 

80 if not self.anchor: 

81 self.anchor = self._generate_anchor() 

82 

83 def _generate_anchor(self) -> str: 

84 """Generate URL anchor from title.""" 

85 import re 

86 

87 # Convert title to lowercase, replace spaces and special chars with hyphens 

88 anchor = re.sub(r"[^\w\s-]", "", self.title.lower()) 

89 anchor = re.sub(r"[-\s]+", "-", anchor) 

90 return anchor.strip("-") 

91 

92 

93class SectionSplitter: 

94 """Main section splitter that coordinates different splitting strategies.""" 

95 

96 def __init__(self, settings: "Settings"): 

97 """Initialize the section splitter. 

98 

99 Args: 

100 settings: Configuration settings 

101 """ 

102 self.settings = settings 

103 self.standard_splitter = StandardSplitter(settings) 

104 self.excel_splitter = ExcelSplitter(settings) 

105 self.fallback_splitter = FallbackSplitter(settings) 

106 

107 def _is_excel_document(self, document: Any) -> bool: 

108 """Check whether a document originated from an Excel file (.xls/.xlsx).""" 

109 if not document: 

110 return False 

111 

112 metadata = getattr(document, "metadata", None) or {} 

113 original_file_type = str(metadata.get("original_file_type", "")).lower() 

114 normalized_file_type = original_file_type.lstrip(".") 

115 

116 return normalized_file_type in {"xls", "xlsx"} 

117 

118 def analyze_header_distribution(self, text: str) -> HeaderAnalysis: 

119 """Analyze header distribution to guide splitting decisions. 

120 

121 Args: 

122 text: Document content to analyze 

123 

124 Returns: 

125 HeaderAnalysis with distribution metrics 

126 """ 

127 analysis = HeaderAnalysis() 

128 analysis.content_length = len(text) 

129 

130 lines = text.split("\n") 

131 for line in lines: 

132 line = line.strip() 

133 header_match = re.match(r"^(#{1,6})\s+(.+)", line) 

134 if header_match: 

135 level = len(header_match.group(1)) 

136 if level == 1: 

137 analysis.h1 += 1 

138 elif level == 2: 

139 analysis.h2 += 1 

140 elif level == 3: 

141 analysis.h3 += 1 

142 elif level == 4: 

143 analysis.h4 += 1 

144 elif level == 5: 

145 analysis.h5 += 1 

146 elif level == 6: 

147 analysis.h6 += 1 

148 

149 # Let __post_init__ calculate derived metrics 

150 analysis.__post_init__() 

151 

152 logger.debug( 

153 "Header distribution analysis", 

154 extra={ 

155 "h1": analysis.h1, 

156 "h2": analysis.h2, 

157 "h3": analysis.h3, 

158 "total_headers": analysis.total_headers, 

159 "content_length": analysis.content_length, 

160 "avg_section_size": analysis.avg_section_size, 

161 }, 

162 ) 

163 

164 return analysis 

165 

166 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]: 

167 """Intelligently determine optimal split levels based on document characteristics. 

168 

169 Args: 

170 text: Document content 

171 document: Optional document for context 

172 

173 Returns: 

174 Set of header levels to split on 

175 """ 

176 header_analysis = self.analyze_header_distribution(text) 

177 

178 # Check if this is a converted Excel file 

179 is_converted_excel = self._is_excel_document(document) 

180 

181 if is_converted_excel: 

182 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets 

183 if header_analysis.h3 > 10: 

184 return {1, 2, 3} 

185 else: 

186 return {1, 2} 

187 

188 # Get configured thresholds 

189 markdown_config = self.settings.global_config.chunking.strategies.markdown 

190 h1_threshold = markdown_config.header_analysis_threshold_h1 

191 h3_threshold = markdown_config.header_analysis_threshold_h3 

192 

193 # Regular markdown: Intelligent granularity based on structure 

194 if header_analysis.h1 <= 1 and header_analysis.h2 >= h1_threshold: 

195 # Single H1 with multiple H2s - the common case requiring granular splitting! 

196 logger.info( 

197 "Detected single H1 with multiple H2 sections - applying granular splitting", 

198 extra={ 

199 "h1_count": header_analysis.h1, 

200 "h2_count": header_analysis.h2, 

201 "h3_count": header_analysis.h3, 

202 }, 

203 ) 

204 # Split on H2 and H3 if there are many H3s 

205 if header_analysis.h3 >= h3_threshold: 

206 return {1, 2, 3} 

207 else: 

208 return {1, 2} 

209 elif header_analysis.h1 >= h1_threshold: 

210 # Multiple H1s - keep traditional splitting to avoid over-fragmentation 

211 logger.info( 

212 "Multiple H1 sections detected - using traditional H1-only splitting", 

213 extra={"h1_count": header_analysis.h1}, 

214 ) 

215 return {1} 

216 elif ( 

217 header_analysis.h1 == 0 

218 and header_analysis.h2 == 0 

219 and header_analysis.h3 >= 1 

220 ): 

221 # 🔥 FIX: Converted documents often have only H3+ headers 

222 logger.info( 

223 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting", 

224 extra={ 

225 "h1_count": header_analysis.h1, 

226 "h2_count": header_analysis.h2, 

227 "h3_count": header_analysis.h3, 

228 "h4_count": header_analysis.h4, 

229 "total_headers": header_analysis.total_headers, 

230 }, 

231 ) 

232 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure 

233 if header_analysis.h3 == 1 and header_analysis.h4 >= h1_threshold: 

234 # Single H3 with multiple H4s (common DOCX pattern) - split on both 

235 return {3, 4} 

236 elif header_analysis.h3 >= h1_threshold: 

237 # Multiple H3s - split on H3 primarily, H4 if many 

238 if header_analysis.h4 >= h3_threshold: 

239 return {3, 4} 

240 else: 

241 return {3} 

242 elif header_analysis.total_headers >= h3_threshold: 

243 # Many headers total - split on H3 and H4 

244 return {3, 4} 

245 else: 

246 # Default - split on H3 only 

247 return {3} 

248 elif header_analysis.total_headers <= 3: 

249 # Very small document - minimal splitting 

250 logger.info( 

251 "Small document detected - minimal splitting", 

252 extra={"total_headers": header_analysis.total_headers}, 

253 ) 

254 return {1, 2} 

255 else: 

256 # Default case - moderate granularity 

257 return {1, 2} 

258 

259 def build_enhanced_section_metadata( 

260 self, sections: list[dict] 

261 ) -> list[SectionMetadata]: 

262 """Build enhanced section metadata with hierarchical relationships. 

263 

264 Args: 

265 sections: Basic section data from split_sections 

266 

267 Returns: 

268 List of enhanced SectionMetadata objects 

269 """ 

270 enhanced_sections: list[SectionMetadata] = [] 

271 

272 for i, section in enumerate(sections): 

273 breadcrumb_parts = section.get("path", []) 

274 if section.get("title"): 

275 breadcrumb_parts = breadcrumb_parts + [section["title"]] 

276 breadcrumb = " > ".join(breadcrumb_parts) 

277 

278 parent_section = None 

279 if section.get("path"): 

280 parent_section = section["path"][-1] 

281 

282 current_level = section.get("level", 0) 

283 current_path = section.get("path", []) 

284 sibling_sections: list[str] = [] 

285 

286 for other_section in sections: 

287 if ( 

288 other_section != section 

289 and other_section.get("level") == current_level 

290 and other_section.get("path", []) == current_path 

291 ): 

292 sibling_sections.append(other_section.get("title", "")) 

293 

294 previous_section = sections[i - 1].get("title") if i > 0 else None 

295 next_section = ( 

296 sections[i + 1].get("title") if i < len(sections) - 1 else None 

297 ) 

298 

299 subsections: list[str] = [] 

300 current_title = section.get("title", "") 

301 for other_section in sections[i + 1 :]: 

302 other_path = other_section.get("path", []) 

303 if len(other_path) > len(current_path) and other_path[ 

304 :-1 

305 ] == current_path + [current_title]: 

306 subsections.append(other_section.get("title", "")) 

307 elif len(other_path) <= len(current_path): 

308 break 

309 

310 content = section.get("content", "") 

311 content_analysis = { 

312 "has_code_blocks": bool(re.search(r"```", content)), 

313 "has_tables": bool(re.search(r"\|.*\|", content)), 

314 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)), 

315 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)), 

316 "word_count": len(content.split()), 

317 "estimated_read_time": max( 

318 1, len(content.split()) // markdown_config.words_per_minute_reading 

319 ), 

320 "char_count": len(content), 

321 } 

322 

323 enhanced_section = SectionMetadata( 

324 title=section.get("title", "Untitled"), 

325 level=section.get("level", 0), 

326 content=content, 

327 order=i, 

328 start_line=0, 

329 end_line=0, 

330 parent_section=parent_section, 

331 breadcrumb=breadcrumb, 

332 previous_section=previous_section, 

333 next_section=next_section, 

334 sibling_sections=sibling_sections, 

335 subsections=subsections, 

336 content_analysis=content_analysis, 

337 ) 

338 

339 enhanced_sections.append(enhanced_section) 

340 

341 return enhanced_sections 

342 

343 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]: 

344 """Split text into sections based on headers and document type. 

345 

346 Args: 

347 text: Text to split 

348 document: Optional document for context 

349 

350 Returns: 

351 List of section dictionaries 

352 """ 

353 

354 parser = DocumentParser() 

355 hierarchy_builder = HierarchyBuilder() 

356 

357 structure = parser.parse_document_structure(text) 

358 sections: list[dict[str, Any]] = [] 

359 current_section = None 

360 current_level = None 

361 current_title = None 

362 current_path: list[str] = [] 

363 

364 split_levels = self.determine_optimal_split_levels(text, document) 

365 

366 logger.debug( 

367 "Determined optimal split levels", 

368 extra={ 

369 "split_levels": list(split_levels), 

370 "document_type": ( 

371 "excel" if self._is_excel_document(document) else "markdown" 

372 ), 

373 }, 

374 ) 

375 

376 for item in structure: 

377 if item["type"] == "header": 

378 level = item["level"] 

379 

380 if level in split_levels or (level == 0 and not sections): 

381 if current_section is not None: 

382 sections.append( 

383 { 

384 "content": current_section, 

385 "level": current_level, 

386 "title": current_title, 

387 "path": list(current_path), 

388 "is_excel_sheet": self._is_excel_document(document) 

389 and level == 2, 

390 } 

391 ) 

392 current_section = item["text"] + "\n" 

393 current_level = level 

394 current_title = item["title"] 

395 current_path = hierarchy_builder.get_section_path(item, structure) 

396 else: 

397 if current_section is not None: 

398 current_section += item["text"] + "\n" 

399 else: 

400 if current_section is not None: 

401 current_section += item["text"] + "\n" 

402 else: 

403 current_section = item["text"] + "\n" 

404 current_level = 0 

405 current_title = ( 

406 "Preamble" 

407 if not self._is_excel_document(document) 

408 else "Sheet Data" 

409 ) 

410 current_path = [] 

411 

412 if current_section is not None: 

413 sections.append( 

414 { 

415 "content": current_section, 

416 "level": current_level, 

417 "title": current_title, 

418 "path": list(current_path), 

419 "is_excel_sheet": self._is_excel_document(document) 

420 and current_level == 2, 

421 } 

422 ) 

423 

424 chunk_size = self.settings.global_config.chunking.chunk_size 

425 final_sections: list[dict[str, Any]] = [] 

426 

427 for section in sections: 

428 if len(section["content"]) > chunk_size: 

429 logger.debug( 

430 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks", 

431 extra={ 

432 "section_title": section.get("title", "Unknown"), 

433 "section_size": len(section["content"]), 

434 "chunk_size_limit": chunk_size, 

435 "is_excel_sheet": section.get("is_excel_sheet", False), 

436 }, 

437 ) 

438 

439 if section.get("is_excel_sheet", False): 

440 sub_chunks = self.excel_splitter.split_content( 

441 section["content"], chunk_size 

442 ) 

443 else: 

444 sub_chunks = self.standard_splitter.split_content( 

445 section["content"], chunk_size 

446 ) 

447 

448 for i, sub_chunk in enumerate(sub_chunks): 

449 sub_section = { 

450 "content": sub_chunk, 

451 "level": section["level"], 

452 "title": ( 

453 f"{section['title']} (Part {i+1})" 

454 if section.get("title") 

455 else f"Part {i+1}" 

456 ), 

457 "path": section["path"], 

458 "parent_section": section.get("title", "Unknown"), 

459 "sub_chunk_index": i, 

460 "total_sub_chunks": len(sub_chunks), 

461 "is_excel_sheet": section.get("is_excel_sheet", False), 

462 } 

463 final_sections.append(sub_section) 

464 else: 

465 final_sections.append(section) 

466 

467 for section in final_sections: 

468 if "level" not in section: 

469 section["level"] = 0 

470 if "title" not in section: 

471 section["title"] = parser.extract_section_title(section["content"]) 

472 if "path" not in section: 

473 section["path"] = [] 

474 if "is_excel_sheet" not in section: 

475 section["is_excel_sheet"] = False 

476 

477 return final_sections 

478 

479 def merge_related_sections( 

480 self, sections: list[dict[str, Any]] 

481 ) -> list[dict[str, Any]]: 

482 """Merge small related sections to maintain context. 

483 

484 Args: 

485 sections: List of section dictionaries 

486 

487 Returns: 

488 List of merged section dictionaries 

489 """ 

490 if not sections: 

491 return [] 

492 

493 merged: list[dict[str, Any]] = [] 

494 current_section = sections[0].copy() 

495 min_section_size = ( 

496 self.settings.global_config.chunking.strategies.markdown.min_section_size 

497 ) 

498 

499 for i in range(1, len(sections)): 

500 next_section = sections[i] 

501 

502 if ( 

503 len(current_section["content"]) < min_section_size 

504 and next_section["level"] > current_section["level"] 

505 ): 

506 current_section["content"] += "\n" + next_section["content"] 

507 else: 

508 merged.append(current_section) 

509 current_section = next_section.copy() 

510 

511 merged.append(current_section) 

512 return merged