Coverage for src/qdrant_loader/core/chunking/strategy/markdown/section_splitter.py: 89%

219 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Section splitting strategies for markdown chunking.""" 

2 

3import re 

4from dataclasses import dataclass 

5from typing import TYPE_CHECKING, Any 

6 

7import structlog 

8 

9if TYPE_CHECKING: 

10 from qdrant_loader.config import Settings 

11 

12# Re-export classes and local dependencies at top to satisfy E402 

13from .document_parser import DocumentParser, HierarchyBuilder # noqa: F401 

14from .splitters.base import BaseSplitter # re-export base class # noqa: F401 

15from .splitters.excel import ExcelSplitter # re-export # noqa: F401 

16from .splitters.fallback import FallbackSplitter # re-export # noqa: F401 

17from .splitters.standard import StandardSplitter # re-export # noqa: F401 

18 

19logger = structlog.get_logger(__name__) 

20 

21 

22# Markdown configuration placeholder - can be imported from settings if needed 

23class MarkdownConfig: 

24 """Configuration for markdown processing.""" 

25 

26 words_per_minute_reading = 200 

27 

28 

29markdown_config = MarkdownConfig() 

30 

31 

32@dataclass 

33class HeaderAnalysis: 

34 """Analysis of header distribution in a document.""" 

35 

36 h1: int = 0 

37 h2: int = 0 

38 h3: int = 0 

39 h4: int = 0 

40 h5: int = 0 

41 h6: int = 0 

42 total_headers: int = 0 

43 content_length: int = 0 

44 avg_section_size: int = 0 

45 

46 def __post_init__(self): 

47 """Calculate derived metrics.""" 

48 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6 

49 if self.total_headers > 0: 

50 self.avg_section_size = self.content_length // self.total_headers 

51 

52 

53@dataclass 

54class SectionMetadata: 

55 """Enhanced section metadata with hierarchical relationships.""" 

56 

57 title: str 

58 level: int 

59 content: str 

60 order: int 

61 start_line: int 

62 end_line: int 

63 parent_section: str = None 

64 breadcrumb: str = "" 

65 anchor: str = "" 

66 previous_section: str = None 

67 next_section: str = None 

68 sibling_sections: list[str] = None 

69 subsections: list[str] = None 

70 content_analysis: dict = None 

71 

72 def __post_init__(self): 

73 """Initialize default values.""" 

74 if self.sibling_sections is None: 

75 self.sibling_sections = [] 

76 if self.subsections is None: 

77 self.subsections = [] 

78 if self.content_analysis is None: 

79 self.content_analysis = {} 

80 if not self.anchor: 

81 self.anchor = self._generate_anchor() 

82 

83 def _generate_anchor(self) -> str: 

84 """Generate URL anchor from title.""" 

85 import re 

86 

87 # Convert title to lowercase, replace spaces and special chars with hyphens 

88 anchor = re.sub(r"[^\w\s-]", "", self.title.lower()) 

89 anchor = re.sub(r"[-\s]+", "-", anchor) 

90 return anchor.strip("-") 

91 

92 

93class SectionSplitter: 

94 """Main section splitter that coordinates different splitting strategies.""" 

95 

96 def __init__(self, settings: "Settings"): 

97 """Initialize the section splitter. 

98 

99 Args: 

100 settings: Configuration settings 

101 """ 

102 self.settings = settings 

103 self.standard_splitter = StandardSplitter(settings) 

104 self.excel_splitter = ExcelSplitter(settings) 

105 self.fallback_splitter = FallbackSplitter(settings) 

106 

107 def analyze_header_distribution(self, text: str) -> HeaderAnalysis: 

108 """Analyze header distribution to guide splitting decisions. 

109 

110 Args: 

111 text: Document content to analyze 

112 

113 Returns: 

114 HeaderAnalysis with distribution metrics 

115 """ 

116 analysis = HeaderAnalysis() 

117 analysis.content_length = len(text) 

118 

119 lines = text.split("\n") 

120 for line in lines: 

121 line = line.strip() 

122 header_match = re.match(r"^(#{1,6})\s+(.+)", line) 

123 if header_match: 

124 level = len(header_match.group(1)) 

125 if level == 1: 

126 analysis.h1 += 1 

127 elif level == 2: 

128 analysis.h2 += 1 

129 elif level == 3: 

130 analysis.h3 += 1 

131 elif level == 4: 

132 analysis.h4 += 1 

133 elif level == 5: 

134 analysis.h5 += 1 

135 elif level == 6: 

136 analysis.h6 += 1 

137 

138 # Let __post_init__ calculate derived metrics 

139 analysis.__post_init__() 

140 

141 logger.debug( 

142 "Header distribution analysis", 

143 extra={ 

144 "h1": analysis.h1, 

145 "h2": analysis.h2, 

146 "h3": analysis.h3, 

147 "total_headers": analysis.total_headers, 

148 "content_length": analysis.content_length, 

149 "avg_section_size": analysis.avg_section_size, 

150 }, 

151 ) 

152 

153 return analysis 

154 

155 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]: 

156 """Intelligently determine optimal split levels based on document characteristics. 

157 

158 Args: 

159 text: Document content 

160 document: Optional document for context 

161 

162 Returns: 

163 Set of header levels to split on 

164 """ 

165 header_analysis = self.analyze_header_distribution(text) 

166 

167 # Check if this is a converted Excel file 

168 is_converted_excel = ( 

169 document and document.metadata.get("original_file_type") == "xlsx" 

170 ) 

171 

172 if is_converted_excel: 

173 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets 

174 if header_analysis.h3 > 10: 

175 return {1, 2, 3} 

176 else: 

177 return {1, 2} 

178 

179 # Get configured thresholds 

180 markdown_config = self.settings.global_config.chunking.strategies.markdown 

181 h1_threshold = markdown_config.header_analysis_threshold_h1 

182 h3_threshold = markdown_config.header_analysis_threshold_h3 

183 

184 # Regular markdown: Intelligent granularity based on structure 

185 if header_analysis.h1 <= 1 and header_analysis.h2 >= h1_threshold: 

186 # Single H1 with multiple H2s - the common case requiring granular splitting! 

187 logger.info( 

188 "Detected single H1 with multiple H2 sections - applying granular splitting", 

189 extra={ 

190 "h1_count": header_analysis.h1, 

191 "h2_count": header_analysis.h2, 

192 "h3_count": header_analysis.h3, 

193 }, 

194 ) 

195 # Split on H2 and H3 if there are many H3s 

196 if header_analysis.h3 >= h3_threshold: 

197 return {1, 2, 3} 

198 else: 

199 return {1, 2} 

200 elif header_analysis.h1 >= h1_threshold: 

201 # Multiple H1s - keep traditional splitting to avoid over-fragmentation 

202 logger.info( 

203 "Multiple H1 sections detected - using traditional H1-only splitting", 

204 extra={"h1_count": header_analysis.h1}, 

205 ) 

206 return {1} 

207 elif ( 

208 header_analysis.h1 == 0 

209 and header_analysis.h2 == 0 

210 and header_analysis.h3 >= 1 

211 ): 

212 # 🔥 FIX: Converted documents often have only H3+ headers 

213 logger.info( 

214 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting", 

215 extra={ 

216 "h1_count": header_analysis.h1, 

217 "h2_count": header_analysis.h2, 

218 "h3_count": header_analysis.h3, 

219 "h4_count": header_analysis.h4, 

220 "total_headers": header_analysis.total_headers, 

221 }, 

222 ) 

223 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure 

224 if header_analysis.h3 == 1 and header_analysis.h4 >= h1_threshold: 

225 # Single H3 with multiple H4s (common DOCX pattern) - split on both 

226 return {3, 4} 

227 elif header_analysis.h3 >= h1_threshold: 

228 # Multiple H3s - split on H3 primarily, H4 if many 

229 if header_analysis.h4 >= h3_threshold: 

230 return {3, 4} 

231 else: 

232 return {3} 

233 elif header_analysis.total_headers >= h3_threshold: 

234 # Many headers total - split on H3 and H4 

235 return {3, 4} 

236 else: 

237 # Default - split on H3 only 

238 return {3} 

239 elif header_analysis.total_headers <= 3: 

240 # Very small document - minimal splitting 

241 logger.info( 

242 "Small document detected - minimal splitting", 

243 extra={"total_headers": header_analysis.total_headers}, 

244 ) 

245 return {1, 2} 

246 else: 

247 # Default case - moderate granularity 

248 return {1, 2} 

249 

250 def build_enhanced_section_metadata( 

251 self, sections: list[dict] 

252 ) -> list[SectionMetadata]: 

253 """Build enhanced section metadata with hierarchical relationships. 

254 

255 Args: 

256 sections: Basic section data from split_sections 

257 

258 Returns: 

259 List of enhanced SectionMetadata objects 

260 """ 

261 enhanced_sections: list[SectionMetadata] = [] 

262 

263 for i, section in enumerate(sections): 

264 breadcrumb_parts = section.get("path", []) 

265 if section.get("title"): 

266 breadcrumb_parts = breadcrumb_parts + [section["title"]] 

267 breadcrumb = " > ".join(breadcrumb_parts) 

268 

269 parent_section = None 

270 if section.get("path"): 

271 parent_section = section["path"][-1] 

272 

273 current_level = section.get("level", 0) 

274 current_path = section.get("path", []) 

275 sibling_sections: list[str] = [] 

276 

277 for other_section in sections: 

278 if ( 

279 other_section != section 

280 and other_section.get("level") == current_level 

281 and other_section.get("path", []) == current_path 

282 ): 

283 sibling_sections.append(other_section.get("title", "")) 

284 

285 previous_section = sections[i - 1].get("title") if i > 0 else None 

286 next_section = ( 

287 sections[i + 1].get("title") if i < len(sections) - 1 else None 

288 ) 

289 

290 subsections: list[str] = [] 

291 current_title = section.get("title", "") 

292 for other_section in sections[i + 1 :]: 

293 other_path = other_section.get("path", []) 

294 if len(other_path) > len(current_path) and other_path[ 

295 :-1 

296 ] == current_path + [current_title]: 

297 subsections.append(other_section.get("title", "")) 

298 elif len(other_path) <= len(current_path): 

299 break 

300 

301 content = section.get("content", "") 

302 content_analysis = { 

303 "has_code_blocks": bool(re.search(r"```", content)), 

304 "has_tables": bool(re.search(r"\|.*\|", content)), 

305 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)), 

306 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)), 

307 "word_count": len(content.split()), 

308 "estimated_read_time": max( 

309 1, len(content.split()) // markdown_config.words_per_minute_reading 

310 ), 

311 "char_count": len(content), 

312 } 

313 

314 enhanced_section = SectionMetadata( 

315 title=section.get("title", "Untitled"), 

316 level=section.get("level", 0), 

317 content=content, 

318 order=i, 

319 start_line=0, 

320 end_line=0, 

321 parent_section=parent_section, 

322 breadcrumb=breadcrumb, 

323 previous_section=previous_section, 

324 next_section=next_section, 

325 sibling_sections=sibling_sections, 

326 subsections=subsections, 

327 content_analysis=content_analysis, 

328 ) 

329 

330 enhanced_sections.append(enhanced_section) 

331 

332 return enhanced_sections 

333 

334 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]: 

335 """Split text into sections based on headers and document type. 

336 

337 Args: 

338 text: Text to split 

339 document: Optional document for context 

340 

341 Returns: 

342 List of section dictionaries 

343 """ 

344 

345 parser = DocumentParser() 

346 hierarchy_builder = HierarchyBuilder() 

347 

348 structure = parser.parse_document_structure(text) 

349 sections: list[dict[str, Any]] = [] 

350 current_section = None 

351 current_level = None 

352 current_title = None 

353 current_path: list[str] = [] 

354 

355 split_levels = self.determine_optimal_split_levels(text, document) 

356 

357 logger.debug( 

358 "Determined optimal split levels", 

359 extra={ 

360 "split_levels": list(split_levels), 

361 "document_type": ( 

362 "excel" 

363 if document 

364 and document.metadata.get("original_file_type") == "xlsx" 

365 else "markdown" 

366 ), 

367 }, 

368 ) 

369 

370 for item in structure: 

371 if item["type"] == "header": 

372 level = item["level"] 

373 

374 if level in split_levels or (level == 0 and not sections): 

375 if current_section is not None: 

376 sections.append( 

377 { 

378 "content": current_section, 

379 "level": current_level, 

380 "title": current_title, 

381 "path": list(current_path), 

382 "is_excel_sheet": document 

383 and document.metadata.get("original_file_type") 

384 == "xlsx" 

385 and level == 2, 

386 } 

387 ) 

388 current_section = item["text"] + "\n" 

389 current_level = level 

390 current_title = item["title"] 

391 current_path = hierarchy_builder.get_section_path(item, structure) 

392 else: 

393 if current_section is not None: 

394 current_section += item["text"] + "\n" 

395 else: 

396 if current_section is not None: 

397 current_section += item["text"] + "\n" 

398 else: 

399 current_section = item["text"] + "\n" 

400 current_level = 0 

401 current_title = ( 

402 "Preamble" 

403 if not ( 

404 document 

405 and document.metadata.get("original_file_type") == "xlsx" 

406 ) 

407 else "Sheet Data" 

408 ) 

409 current_path = [] 

410 

411 if current_section is not None: 

412 sections.append( 

413 { 

414 "content": current_section, 

415 "level": current_level, 

416 "title": current_title, 

417 "path": list(current_path), 

418 "is_excel_sheet": document 

419 and document.metadata.get("original_file_type") == "xlsx" 

420 and current_level == 2, 

421 } 

422 ) 

423 

424 chunk_size = self.settings.global_config.chunking.chunk_size 

425 final_sections: list[dict[str, Any]] = [] 

426 

427 for section in sections: 

428 if len(section["content"]) > chunk_size: 

429 logger.debug( 

430 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks", 

431 extra={ 

432 "section_title": section.get("title", "Unknown"), 

433 "section_size": len(section["content"]), 

434 "chunk_size_limit": chunk_size, 

435 "is_excel_sheet": section.get("is_excel_sheet", False), 

436 }, 

437 ) 

438 

439 if section.get("is_excel_sheet", False): 

440 sub_chunks = self.excel_splitter.split_content( 

441 section["content"], chunk_size 

442 ) 

443 else: 

444 sub_chunks = self.standard_splitter.split_content( 

445 section["content"], chunk_size 

446 ) 

447 

448 for i, sub_chunk in enumerate(sub_chunks): 

449 sub_section = { 

450 "content": sub_chunk, 

451 "level": section["level"], 

452 "title": ( 

453 f"{section['title']} (Part {i+1})" 

454 if section.get("title") 

455 else f"Part {i+1}" 

456 ), 

457 "path": section["path"], 

458 "parent_section": section.get("title", "Unknown"), 

459 "sub_chunk_index": i, 

460 "total_sub_chunks": len(sub_chunks), 

461 "is_excel_sheet": section.get("is_excel_sheet", False), 

462 } 

463 final_sections.append(sub_section) 

464 else: 

465 final_sections.append(section) 

466 

467 for section in final_sections: 

468 if "level" not in section: 

469 section["level"] = 0 

470 if "title" not in section: 

471 section["title"] = parser.extract_section_title(section["content"]) 

472 if "path" not in section: 

473 section["path"] = [] 

474 if "is_excel_sheet" not in section: 

475 section["is_excel_sheet"] = False 

476 

477 return final_sections 

478 

479 def merge_related_sections( 

480 self, sections: list[dict[str, Any]] 

481 ) -> list[dict[str, Any]]: 

482 """Merge small related sections to maintain context. 

483 

484 Args: 

485 sections: List of section dictionaries 

486 

487 Returns: 

488 List of merged section dictionaries 

489 """ 

490 if not sections: 

491 return [] 

492 

493 merged: list[dict[str, Any]] = [] 

494 current_section = sections[0].copy() 

495 min_section_size = ( 

496 self.settings.global_config.chunking.strategies.markdown.min_section_size 

497 ) 

498 

499 for i in range(1, len(sections)): 

500 next_section = sections[i] 

501 

502 if ( 

503 len(current_section["content"]) < min_section_size 

504 and next_section["level"] > current_section["level"] 

505 ): 

506 current_section["content"] += "\n" + next_section["content"] 

507 else: 

508 merged.append(current_section) 

509 current_section = next_section.copy() 

510 

511 merged.append(current_section) 

512 return merged