Coverage for src/qdrant_loader/core/chunking/strategy/markdown/section_splitter.py: 75%

353 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Section splitting strategies for markdown chunking.""" 

2 

3import re 

4from abc import ABC, abstractmethod 

5from dataclasses import dataclass 

6from typing import TYPE_CHECKING, Any 

7 

8import structlog 

9 

10if TYPE_CHECKING: 

11 from qdrant_loader.config import Settings 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16@dataclass 

17class HeaderAnalysis: 

18 """Analysis of header distribution in a document.""" 

19 h1: int = 0 

20 h2: int = 0 

21 h3: int = 0 

22 h4: int = 0 

23 h5: int = 0 

24 h6: int = 0 

25 total_headers: int = 0 

26 content_length: int = 0 

27 avg_section_size: int = 0 

28 

29 def __post_init__(self): 

30 """Calculate derived metrics.""" 

31 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6 

32 if self.total_headers > 0: 

33 self.avg_section_size = self.content_length // self.total_headers 

34 

35 

36@dataclass 

37class SectionMetadata: 

38 """Enhanced section metadata with hierarchical relationships.""" 

39 title: str 

40 level: int 

41 content: str 

42 order: int 

43 start_line: int 

44 end_line: int 

45 parent_section: str = None 

46 breadcrumb: str = "" 

47 anchor: str = "" 

48 previous_section: str = None 

49 next_section: str = None 

50 sibling_sections: list[str] = None 

51 subsections: list[str] = None 

52 content_analysis: dict = None 

53 

54 def __post_init__(self): 

55 """Initialize default values.""" 

56 if self.sibling_sections is None: 

57 self.sibling_sections = [] 

58 if self.subsections is None: 

59 self.subsections = [] 

60 if self.content_analysis is None: 

61 self.content_analysis = {} 

62 if not self.anchor: 

63 self.anchor = self._generate_anchor() 

64 

65 def _generate_anchor(self) -> str: 

66 """Generate URL anchor from title.""" 

67 import re 

68 # Convert title to lowercase, replace spaces and special chars with hyphens 

69 anchor = re.sub(r'[^\w\s-]', '', self.title.lower()) 

70 anchor = re.sub(r'[-\s]+', '-', anchor) 

71 return anchor.strip('-') 

72 

73 

74class BaseSplitter(ABC): 

75 """Base class for section splitting strategies.""" 

76 

77 def __init__(self, settings: "Settings"): 

78 """Initialize the splitter. 

79 

80 Args: 

81 settings: Configuration settings 

82 """ 

83 self.settings = settings 

84 self.chunk_size = settings.global_config.chunking.chunk_size 

85 self.chunk_overlap = settings.global_config.chunking.chunk_overlap 

86 

87 @abstractmethod 

88 def split_content(self, content: str, max_size: int) -> list[str]: 

89 """Split content into chunks. 

90 

91 Args: 

92 content: Content to split 

93 max_size: Maximum chunk size 

94 

95 Returns: 

96 List of content chunks 

97 """ 

98 pass 

99 

100 

101class StandardSplitter(BaseSplitter): 

102 """Standard markdown text splitter that preserves structure.""" 

103 

104 def split_content(self, content: str, max_size: int) -> list[str]: 

105 """Split a large section into smaller chunks while preserving markdown structure. 

106 

107 Args: 

108 content: Section content to split 

109 max_size: Maximum chunk size 

110 

111 Returns: 

112 List of content chunks 

113 """ 

114 chunks = [] 

115 

116 # Calculate dynamic safety limit based on configuration 

117 # Allow up to 50% of max_chunks_per_document for a single section 

118 max_chunks_per_section = min( 

119 self.settings.global_config.chunking.max_chunks_per_document // 2, 

120 1000 # Absolute maximum to prevent runaway chunking 

121 ) 

122 

123 # Split by paragraphs first 

124 paragraphs = re.split(r"\n\s*\n", content) 

125 

126 # Flatten paragraphs into manageable text units 

127 text_units = [] 

128 for para in paragraphs: 

129 para = para.strip() 

130 if not para: 

131 continue 

132 

133 # If paragraph is too large, split by sentences 

134 if len(para) > max_size: 

135 sentences = re.split(r"(?<=[.!?])\s+", para) 

136 text_units.extend([s.strip() for s in sentences if s.strip()]) 

137 else: 

138 text_units.append(para) 

139 

140 # Build chunks with overlap 

141 i = 0 

142 while i < len(text_units) and len(chunks) < max_chunks_per_section: 

143 current_chunk = "" 

144 units_in_chunk = 0 

145 

146 # Build the current chunk 

147 j = i 

148 while j < len(text_units): 

149 unit = text_units[j] 

150 

151 # Check if adding this unit would exceed max_size 

152 if current_chunk and len(current_chunk) + len(unit) + 2 > max_size: 

153 break 

154 

155 # Add unit to chunk 

156 if current_chunk: 

157 current_chunk += "\n\n" + unit 

158 else: 

159 current_chunk = unit 

160 

161 units_in_chunk += 1 

162 j += 1 

163 

164 # Add chunk if it has content 

165 if current_chunk.strip(): 

166 chunks.append(current_chunk.strip()) 

167 

168 # Calculate overlap and advance position 

169 if units_in_chunk > 0: 

170 # When overlap is 0, advance by all units to avoid any overlap 

171 if self.chunk_overlap == 0: 

172 advance = units_in_chunk 

173 else: 

174 # Calculate how many characters of overlap we want 

175 overlap_chars = min(self.chunk_overlap, len(current_chunk) // 4) # Max 25% overlap 

176 

177 # Find a good overlap point by counting back from the end 

178 if overlap_chars > 0 and len(current_chunk) > overlap_chars: 

179 # Count how many text units should be included in overlap 

180 overlap_units = 0 

181 overlap_size = 0 

182 for k in range(j - 1, i - 1, -1): # Go backwards 

183 unit_size = len(text_units[k]) 

184 if overlap_size + unit_size <= overlap_chars: 

185 overlap_size += unit_size 

186 overlap_units += 1 

187 else: 

188 break 

189 

190 # Advance by total units minus overlap units, ensuring progress 

191 advance = max(1, units_in_chunk - overlap_units) 

192 else: 

193 # No overlap possible, advance by all units 

194 advance = max(1, units_in_chunk) 

195 

196 i += advance 

197 else: 

198 # Safety: ensure we make progress even if no units were added 

199 i += 1 

200 

201 # Handle remaining units if we hit the chunk limit 

202 if i < len(text_units) and len(chunks) >= max_chunks_per_section: 

203 logger.warning( 

204 f"Section reached maximum chunks limit ({max_chunks_per_section}), truncating remaining content", 

205 extra={ 

206 "remaining_units": len(text_units) - i, 

207 "max_chunks_per_section": max_chunks_per_section, 

208 }, 

209 ) 

210 

211 return chunks 

212 

213 

214class ExcelSplitter(BaseSplitter): 

215 """Excel-specific splitter that preserves table structure.""" 

216 

217 def split_content(self, content: str, max_size: int) -> list[str]: 

218 """Split Excel sheet content into chunks, preserving table structure where possible. 

219 

220 Args: 

221 content: Excel sheet content to split 

222 max_size: Maximum chunk size 

223 

224 Returns: 

225 List of content chunks 

226 """ 

227 chunks = [] 

228 

229 # Calculate dynamic safety limit 

230 max_chunks_per_section = min( 

231 self.settings.global_config.chunking.max_chunks_per_document // 2, 

232 1000 

233 ) 

234 

235 # Split content into logical units: headers, tables, and text blocks 

236 logical_units = [] 

237 lines = content.split("\n") 

238 current_unit = [] 

239 in_table = False 

240 

241 for line in lines: 

242 line = line.strip() 

243 

244 # Detect table boundaries 

245 is_table_line = bool(re.match(r"^\|.*\|$", line)) or bool(re.match(r"^[|\-\s:]+$", line)) 

246 

247 if is_table_line and not in_table: 

248 # Starting a new table 

249 if current_unit: 

250 logical_units.append("\n".join(current_unit)) 

251 current_unit = [] 

252 in_table = True 

253 current_unit.append(line) 

254 elif not is_table_line and in_table: 

255 # Ending a table 

256 if current_unit: 

257 logical_units.append("\n".join(current_unit)) 

258 current_unit = [] 

259 in_table = False 

260 if line: # Don't add empty lines 

261 current_unit.append(line) 

262 else: 

263 # Continue current unit 

264 if line or current_unit: # Don't start with empty lines 

265 current_unit.append(line) 

266 

267 # Add final unit 

268 if current_unit: 

269 logical_units.append("\n".join(current_unit)) 

270 

271 # Split large logical units that exceed max_size 

272 split_logical_units = [] 

273 for unit in logical_units: 

274 if len(unit) > max_size: 

275 # Split large unit by lines to preserve table structure 

276 lines = unit.split('\n') 

277 current_sub_unit = [] 

278 

279 for line in lines: 

280 # Check if adding this line would exceed max_size 

281 test_unit = '\n'.join(current_sub_unit + [line]) 

282 if current_sub_unit and len(test_unit) > max_size: 

283 # Save current sub-unit and start new one 

284 split_logical_units.append('\n'.join(current_sub_unit)) 

285 current_sub_unit = [line] 

286 else: 

287 current_sub_unit.append(line) 

288 

289 # Add the last sub-unit 

290 if current_sub_unit: 

291 split_logical_units.append('\n'.join(current_sub_unit)) 

292 else: 

293 split_logical_units.append(unit) 

294 

295 # Use the split logical units 

296 logical_units = split_logical_units 

297 

298 # Group logical units into chunks 

299 i = 0 

300 while i < len(logical_units) and len(chunks) < max_chunks_per_section: 

301 current_chunk = "" 

302 units_in_chunk = 0 

303 

304 # Build the current chunk 

305 j = i 

306 while j < len(logical_units): 

307 unit = logical_units[j] 

308 

309 # Check if adding this unit would exceed max_size 

310 if current_chunk and len(current_chunk) + len(unit) + 2 > max_size: 

311 break 

312 

313 # Add unit to chunk 

314 if current_chunk: 

315 current_chunk += "\n\n" + unit 

316 else: 

317 current_chunk = unit 

318 

319 units_in_chunk += 1 

320 j += 1 

321 

322 # Add chunk if it has content 

323 if current_chunk.strip(): 

324 chunks.append(current_chunk.strip()) 

325 

326 # Calculate overlap and advance position 

327 if units_in_chunk > 0: 

328 if self.chunk_overlap == 0: 

329 advance = units_in_chunk 

330 else: 

331 # For Excel content, use more conservative overlap (preserve table boundaries) 

332 overlap_units = min(1, units_in_chunk // 2) # At most 1 unit overlap 

333 advance = max(1, units_in_chunk - overlap_units) 

334 

335 i += advance 

336 else: 

337 i += 1 

338 

339 # Handle remaining units if we hit the chunk limit 

340 if i < len(logical_units) and len(chunks) >= max_chunks_per_section: 

341 logger.warning( 

342 f"Excel sheet reached maximum chunks limit ({max_chunks_per_section}), truncating remaining content", 

343 extra={ 

344 "remaining_units": len(logical_units) - i, 

345 "max_chunks_per_section": max_chunks_per_section, 

346 }, 

347 ) 

348 

349 return chunks 

350 

351 

352class FallbackSplitter(BaseSplitter): 

353 """Simple fallback splitter for when other strategies fail.""" 

354 

355 def split_content(self, content: str, max_size: int) -> list[str]: 

356 """Simple chunking implementation based on fixed size. 

357 

358 Args: 

359 content: Content to split 

360 max_size: Maximum chunk size 

361 

362 Returns: 

363 List of content chunks 

364 """ 

365 chunks = [] 

366 

367 # Split by paragraphs first 

368 paragraphs = re.split(r"\n\s*\n", content) 

369 current_chunk = "" 

370 

371 for para in paragraphs: 

372 if len(current_chunk) + len(para) <= max_size: 

373 current_chunk += para + "\n\n" 

374 else: 

375 if current_chunk: 

376 chunks.append(current_chunk.strip()) 

377 current_chunk = para + "\n\n" 

378 

379 # Add the last chunk if not empty 

380 if current_chunk: 

381 chunks.append(current_chunk.strip()) 

382 

383 return chunks 

384 

385 

386class SectionSplitter: 

387 """Main section splitter that coordinates different splitting strategies.""" 

388 

389 def __init__(self, settings: "Settings"): 

390 """Initialize the section splitter. 

391 

392 Args: 

393 settings: Configuration settings 

394 """ 

395 self.settings = settings 

396 self.standard_splitter = StandardSplitter(settings) 

397 self.excel_splitter = ExcelSplitter(settings) 

398 self.fallback_splitter = FallbackSplitter(settings) 

399 

400 def analyze_header_distribution(self, text: str) -> HeaderAnalysis: 

401 """Analyze header distribution to guide splitting decisions. 

402  

403 Args: 

404 text: Document content to analyze 

405  

406 Returns: 

407 HeaderAnalysis with distribution metrics 

408 """ 

409 analysis = HeaderAnalysis() 

410 analysis.content_length = len(text) 

411 

412 lines = text.split('\n') 

413 for line in lines: 

414 line = line.strip() 

415 header_match = re.match(r'^(#{1,6})\s+(.+)', line) 

416 if header_match: 

417 level = len(header_match.group(1)) 

418 if level == 1: 

419 analysis.h1 += 1 

420 elif level == 2: 

421 analysis.h2 += 1 

422 elif level == 3: 

423 analysis.h3 += 1 

424 elif level == 4: 

425 analysis.h4 += 1 

426 elif level == 5: 

427 analysis.h5 += 1 

428 elif level == 6: 

429 analysis.h6 += 1 

430 

431 # Let __post_init__ calculate derived metrics 

432 analysis.__post_init__() 

433 

434 logger.debug( 

435 "Header distribution analysis", 

436 extra={ 

437 "h1": analysis.h1, 

438 "h2": analysis.h2, 

439 "h3": analysis.h3, 

440 "total_headers": analysis.total_headers, 

441 "content_length": analysis.content_length, 

442 "avg_section_size": analysis.avg_section_size, 

443 } 

444 ) 

445 

446 return analysis 

447 

448 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]: 

449 """Intelligently determine optimal split levels based on document characteristics. 

450  

451 Args: 

452 text: Document content 

453 document: Optional document for context 

454  

455 Returns: 

456 Set of header levels to split on 

457 """ 

458 header_analysis = self.analyze_header_distribution(text) 

459 

460 # Check if this is a converted Excel file 

461 is_converted_excel = ( 

462 document and 

463 document.metadata.get("original_file_type") == "xlsx" 

464 ) 

465 

466 if is_converted_excel: 

467 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets 

468 if header_analysis.h3 > 10: 

469 return {1, 2, 3} 

470 else: 

471 return {1, 2} 

472 

473 # Regular markdown: Intelligent granularity based on structure 

474 if header_analysis.h1 <= 1 and header_analysis.h2 >= 3: 

475 # Single H1 with multiple H2s - the common case requiring granular splitting! 

476 logger.info( 

477 "Detected single H1 with multiple H2 sections - applying granular splitting", 

478 extra={ 

479 "h1_count": header_analysis.h1, 

480 "h2_count": header_analysis.h2, 

481 "h3_count": header_analysis.h3, 

482 } 

483 ) 

484 # Split on H2 and H3 if there are many H3s 

485 if header_analysis.h3 >= 8: 

486 return {1, 2, 3} 

487 else: 

488 return {1, 2} 

489 elif header_analysis.h1 >= 3: 

490 # Multiple H1s - keep traditional splitting to avoid over-fragmentation 

491 logger.info( 

492 "Multiple H1 sections detected - using traditional H1-only splitting", 

493 extra={"h1_count": header_analysis.h1} 

494 ) 

495 return {1} 

496 elif header_analysis.h1 == 0 and header_analysis.h2 == 0 and header_analysis.h3 >= 1: 

497 # 🔥 FIX: Converted documents often have only H3+ headers 

498 logger.info( 

499 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting", 

500 extra={ 

501 "h1_count": header_analysis.h1, 

502 "h2_count": header_analysis.h2, 

503 "h3_count": header_analysis.h3, 

504 "h4_count": header_analysis.h4, 

505 "total_headers": header_analysis.total_headers, 

506 } 

507 ) 

508 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure 

509 if header_analysis.h3 == 1 and header_analysis.h4 >= 3: 

510 # Single H3 with multiple H4s (common DOCX pattern) - split on both 

511 return {3, 4} 

512 elif header_analysis.h3 >= 3: 

513 # Multiple H3s - split on H3 primarily, H4 if many 

514 if header_analysis.h4 >= 8: 

515 return {3, 4} 

516 else: 

517 return {3} 

518 elif header_analysis.total_headers >= 8: 

519 # Many headers total - split on H3 and H4 

520 return {3, 4} 

521 else: 

522 # Default - split on H3 only 

523 return {3} 

524 elif header_analysis.total_headers <= 3: 

525 # Very small document - minimal splitting 

526 logger.info( 

527 "Small document detected - minimal splitting", 

528 extra={"total_headers": header_analysis.total_headers} 

529 ) 

530 return {1, 2} 

531 else: 

532 # Default case - moderate granularity 

533 return {1, 2} 

534 

535 def build_enhanced_section_metadata(self, sections: list[dict]) -> list[SectionMetadata]: 

536 """Build enhanced section metadata with hierarchical relationships. 

537  

538 Args: 

539 sections: Basic section data from split_sections 

540  

541 Returns: 

542 List of enhanced SectionMetadata objects 

543 """ 

544 enhanced_sections = [] 

545 

546 for i, section in enumerate(sections): 

547 # Build breadcrumb from hierarchy path 

548 breadcrumb_parts = section.get("path", []) 

549 if section.get("title"): 

550 breadcrumb_parts = breadcrumb_parts + [section["title"]] 

551 breadcrumb = " > ".join(breadcrumb_parts) 

552 

553 # Find parent section 

554 parent_section = None 

555 if section.get("path"): 

556 parent_section = section["path"][-1] 

557 

558 # Find siblings (sections at same level with same parent) 

559 current_level = section.get("level", 0) 

560 current_path = section.get("path", []) 

561 sibling_sections = [] 

562 

563 for other_section in sections: 

564 if (other_section != section and 

565 other_section.get("level") == current_level and 

566 other_section.get("path", []) == current_path): 

567 sibling_sections.append(other_section.get("title", "")) 

568 

569 # Find previous and next sections 

570 previous_section = sections[i-1].get("title") if i > 0 else None 

571 next_section = sections[i+1].get("title") if i < len(sections) - 1 else None 

572 

573 # Find subsections (direct children) 

574 subsections = [] 

575 current_title = section.get("title", "") 

576 for other_section in sections[i+1:]: 

577 other_path = other_section.get("path", []) 

578 if (len(other_path) > len(current_path) and 

579 other_path[:-1] == current_path + [current_title]): 

580 subsections.append(other_section.get("title", "")) 

581 elif len(other_path) <= len(current_path): 

582 # We've moved to a different branch 

583 break 

584 

585 # Analyze content characteristics 

586 content = section.get("content", "") 

587 content_analysis = { 

588 "has_code_blocks": bool(re.search(r"```", content)), 

589 "has_tables": bool(re.search(r"\|.*\|", content)), 

590 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)), 

591 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)), 

592 "word_count": len(content.split()), 

593 "estimated_read_time": max(1, len(content.split()) // 200), # minutes 

594 "char_count": len(content), 

595 } 

596 

597 enhanced_section = SectionMetadata( 

598 title=section.get("title", "Untitled"), 

599 level=section.get("level", 0), 

600 content=content, 

601 order=i, 

602 start_line=0, # Could be enhanced to track actual line numbers 

603 end_line=0, 

604 parent_section=parent_section, 

605 breadcrumb=breadcrumb, 

606 previous_section=previous_section, 

607 next_section=next_section, 

608 sibling_sections=sibling_sections, 

609 subsections=subsections, 

610 content_analysis=content_analysis, 

611 ) 

612 

613 enhanced_sections.append(enhanced_section) 

614 

615 return enhanced_sections 

616 

617 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]: 

618 """Split text into sections based on headers and document type. 

619  

620 Args: 

621 text: Text to split 

622 document: Optional document for context 

623  

624 Returns: 

625 List of section dictionaries 

626 """ 

627 from .document_parser import DocumentParser, HierarchyBuilder 

628 

629 parser = DocumentParser() 

630 hierarchy_builder = HierarchyBuilder() 

631 

632 structure = parser.parse_document_structure(text) 

633 sections = [] 

634 current_section = None 

635 current_level = None 

636 current_title = None 

637 current_path = [] 

638 

639 # 🔥 ENHANCED: Use intelligent split level determination 

640 split_levels = self.determine_optimal_split_levels(text, document) 

641 

642 logger.debug( 

643 "Determined optimal split levels", 

644 extra={ 

645 "split_levels": list(split_levels), 

646 "document_type": "excel" if document and document.metadata.get("original_file_type") == "xlsx" else "markdown" 

647 } 

648 ) 

649 

650 for item in structure: 

651 if item["type"] == "header": 

652 level = item["level"] 

653 

654 # Create new section for split levels or first header (level 0) 

655 if level in split_levels or (level == 0 and not sections): 

656 # Save previous section if exists 

657 if current_section is not None: 

658 sections.append( 

659 { 

660 "content": current_section, 

661 "level": current_level, 

662 "title": current_title, 

663 "path": list(current_path), 

664 "is_excel_sheet": document and document.metadata.get("original_file_type") == "xlsx" and level == 2, 

665 } 

666 ) 

667 # Start new section 

668 current_section = item["text"] + "\n" 

669 current_level = level 

670 current_title = item["title"] 

671 current_path = hierarchy_builder.get_section_path(item, structure) 

672 else: 

673 # For deeper headers, just add to current section 

674 if current_section is not None: 

675 current_section += item["text"] + "\n" 

676 else: 

677 if current_section is not None: 

678 current_section += item["text"] + "\n" 

679 else: 

680 # If no section started yet, treat as preamble 

681 current_section = item["text"] + "\n" 

682 current_level = 0 

683 current_title = "Preamble" if not (document and document.metadata.get("original_file_type") == "xlsx") else "Sheet Data" 

684 current_path = [] 

685 

686 # Add the last section 

687 if current_section is not None: 

688 sections.append( 

689 { 

690 "content": current_section, 

691 "level": current_level, 

692 "title": current_title, 

693 "path": list(current_path), 

694 "is_excel_sheet": document and document.metadata.get("original_file_type") == "xlsx" and current_level == 2, 

695 } 

696 ) 

697 

698 # Check if sections are too large and split them 

699 chunk_size = self.settings.global_config.chunking.chunk_size 

700 final_sections = [] 

701 

702 for section in sections: 

703 if len(section["content"]) > chunk_size: 

704 # Split large section into smaller chunks 

705 logger.debug( 

706 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks", 

707 extra={ 

708 "section_title": section.get("title", "Unknown"), 

709 "section_size": len(section["content"]), 

710 "chunk_size_limit": chunk_size, 

711 "is_excel_sheet": section.get("is_excel_sheet", False), 

712 }, 

713 ) 

714 

715 # Choose appropriate splitter 

716 if section.get("is_excel_sheet", False): 

717 sub_chunks = self.excel_splitter.split_content(section["content"], chunk_size) 

718 else: 

719 sub_chunks = self.standard_splitter.split_content(section["content"], chunk_size) 

720 

721 # Create metadata for each sub-chunk 

722 for i, sub_chunk in enumerate(sub_chunks): 

723 sub_section = { 

724 "content": sub_chunk, 

725 "level": section["level"], 

726 "title": f"{section['title']} (Part {i+1})" if section.get("title") else f"Part {i+1}", 

727 "path": section["path"], 

728 "parent_section": section.get("title", "Unknown"), 

729 "sub_chunk_index": i, 

730 "total_sub_chunks": len(sub_chunks), 

731 "is_excel_sheet": section.get("is_excel_sheet", False), 

732 } 

733 final_sections.append(sub_section) 

734 else: 

735 # Section is already small enough 

736 final_sections.append(section) 

737 

738 # Ensure each section has proper metadata 

739 for section in final_sections: 

740 if "level" not in section: 

741 section["level"] = 0 

742 if "title" not in section: 

743 section["title"] = parser.extract_section_title(section["content"]) 

744 if "path" not in section: 

745 section["path"] = [] 

746 if "is_excel_sheet" not in section: 

747 section["is_excel_sheet"] = False 

748 

749 return final_sections 

750 

751 def merge_related_sections( 

752 self, sections: list[dict[str, Any]] 

753 ) -> list[dict[str, Any]]: 

754 """Merge small related sections to maintain context. 

755 

756 Args: 

757 sections: List of section dictionaries 

758 

759 Returns: 

760 List of merged section dictionaries 

761 """ 

762 if not sections: 

763 return [] 

764 

765 merged = [] 

766 current_section = sections[0].copy() 

767 min_section_size = 500 # Minimum characters for a standalone section 

768 

769 for i in range(1, len(sections)): 

770 next_section = sections[i] 

771 

772 # If current section is small and next section is a subsection, merge them 

773 if ( 

774 len(current_section["content"]) < min_section_size 

775 and next_section["level"] > current_section["level"] 

776 ): 

777 current_section["content"] += "\n" + next_section["content"] 

778 # Keep other metadata from the parent section 

779 else: 

780 merged.append(current_section) 

781 current_section = next_section.copy() 

782 

783 # Add the last section 

784 merged.append(current_section) 

785 return merged