Coverage for src/qdrant_loader/core/chunking/strategy/markdown/section_splitter.py: 75%
353 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Section splitting strategies for markdown chunking."""
3import re
4from abc import ABC, abstractmethod
5from dataclasses import dataclass
6from typing import TYPE_CHECKING, Any
8import structlog
10if TYPE_CHECKING:
11 from qdrant_loader.config import Settings
13logger = structlog.get_logger(__name__)
16@dataclass
17class HeaderAnalysis:
18 """Analysis of header distribution in a document."""
19 h1: int = 0
20 h2: int = 0
21 h3: int = 0
22 h4: int = 0
23 h5: int = 0
24 h6: int = 0
25 total_headers: int = 0
26 content_length: int = 0
27 avg_section_size: int = 0
29 def __post_init__(self):
30 """Calculate derived metrics."""
31 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6
32 if self.total_headers > 0:
33 self.avg_section_size = self.content_length // self.total_headers
36@dataclass
37class SectionMetadata:
38 """Enhanced section metadata with hierarchical relationships."""
39 title: str
40 level: int
41 content: str
42 order: int
43 start_line: int
44 end_line: int
45 parent_section: str = None
46 breadcrumb: str = ""
47 anchor: str = ""
48 previous_section: str = None
49 next_section: str = None
50 sibling_sections: list[str] = None
51 subsections: list[str] = None
52 content_analysis: dict = None
54 def __post_init__(self):
55 """Initialize default values."""
56 if self.sibling_sections is None:
57 self.sibling_sections = []
58 if self.subsections is None:
59 self.subsections = []
60 if self.content_analysis is None:
61 self.content_analysis = {}
62 if not self.anchor:
63 self.anchor = self._generate_anchor()
65 def _generate_anchor(self) -> str:
66 """Generate URL anchor from title."""
67 import re
68 # Convert title to lowercase, replace spaces and special chars with hyphens
69 anchor = re.sub(r'[^\w\s-]', '', self.title.lower())
70 anchor = re.sub(r'[-\s]+', '-', anchor)
71 return anchor.strip('-')
74class BaseSplitter(ABC):
75 """Base class for section splitting strategies."""
77 def __init__(self, settings: "Settings"):
78 """Initialize the splitter.
80 Args:
81 settings: Configuration settings
82 """
83 self.settings = settings
84 self.chunk_size = settings.global_config.chunking.chunk_size
85 self.chunk_overlap = settings.global_config.chunking.chunk_overlap
87 @abstractmethod
88 def split_content(self, content: str, max_size: int) -> list[str]:
89 """Split content into chunks.
91 Args:
92 content: Content to split
93 max_size: Maximum chunk size
95 Returns:
96 List of content chunks
97 """
98 pass
101class StandardSplitter(BaseSplitter):
102 """Standard markdown text splitter that preserves structure."""
104 def split_content(self, content: str, max_size: int) -> list[str]:
105 """Split a large section into smaller chunks while preserving markdown structure.
107 Args:
108 content: Section content to split
109 max_size: Maximum chunk size
111 Returns:
112 List of content chunks
113 """
114 chunks = []
116 # Calculate dynamic safety limit based on configuration
117 # Allow up to 50% of max_chunks_per_document for a single section
118 max_chunks_per_section = min(
119 self.settings.global_config.chunking.max_chunks_per_document // 2,
120 1000 # Absolute maximum to prevent runaway chunking
121 )
123 # Split by paragraphs first
124 paragraphs = re.split(r"\n\s*\n", content)
126 # Flatten paragraphs into manageable text units
127 text_units = []
128 for para in paragraphs:
129 para = para.strip()
130 if not para:
131 continue
133 # If paragraph is too large, split by sentences
134 if len(para) > max_size:
135 sentences = re.split(r"(?<=[.!?])\s+", para)
136 text_units.extend([s.strip() for s in sentences if s.strip()])
137 else:
138 text_units.append(para)
140 # Build chunks with overlap
141 i = 0
142 while i < len(text_units) and len(chunks) < max_chunks_per_section:
143 current_chunk = ""
144 units_in_chunk = 0
146 # Build the current chunk
147 j = i
148 while j < len(text_units):
149 unit = text_units[j]
151 # Check if adding this unit would exceed max_size
152 if current_chunk and len(current_chunk) + len(unit) + 2 > max_size:
153 break
155 # Add unit to chunk
156 if current_chunk:
157 current_chunk += "\n\n" + unit
158 else:
159 current_chunk = unit
161 units_in_chunk += 1
162 j += 1
164 # Add chunk if it has content
165 if current_chunk.strip():
166 chunks.append(current_chunk.strip())
168 # Calculate overlap and advance position
169 if units_in_chunk > 0:
170 # When overlap is 0, advance by all units to avoid any overlap
171 if self.chunk_overlap == 0:
172 advance = units_in_chunk
173 else:
174 # Calculate how many characters of overlap we want
175 overlap_chars = min(self.chunk_overlap, len(current_chunk) // 4) # Max 25% overlap
177 # Find a good overlap point by counting back from the end
178 if overlap_chars > 0 and len(current_chunk) > overlap_chars:
179 # Count how many text units should be included in overlap
180 overlap_units = 0
181 overlap_size = 0
182 for k in range(j - 1, i - 1, -1): # Go backwards
183 unit_size = len(text_units[k])
184 if overlap_size + unit_size <= overlap_chars:
185 overlap_size += unit_size
186 overlap_units += 1
187 else:
188 break
190 # Advance by total units minus overlap units, ensuring progress
191 advance = max(1, units_in_chunk - overlap_units)
192 else:
193 # No overlap possible, advance by all units
194 advance = max(1, units_in_chunk)
196 i += advance
197 else:
198 # Safety: ensure we make progress even if no units were added
199 i += 1
201 # Handle remaining units if we hit the chunk limit
202 if i < len(text_units) and len(chunks) >= max_chunks_per_section:
203 logger.warning(
204 f"Section reached maximum chunks limit ({max_chunks_per_section}), truncating remaining content",
205 extra={
206 "remaining_units": len(text_units) - i,
207 "max_chunks_per_section": max_chunks_per_section,
208 },
209 )
211 return chunks
214class ExcelSplitter(BaseSplitter):
215 """Excel-specific splitter that preserves table structure."""
217 def split_content(self, content: str, max_size: int) -> list[str]:
218 """Split Excel sheet content into chunks, preserving table structure where possible.
220 Args:
221 content: Excel sheet content to split
222 max_size: Maximum chunk size
224 Returns:
225 List of content chunks
226 """
227 chunks = []
229 # Calculate dynamic safety limit
230 max_chunks_per_section = min(
231 self.settings.global_config.chunking.max_chunks_per_document // 2,
232 1000
233 )
235 # Split content into logical units: headers, tables, and text blocks
236 logical_units = []
237 lines = content.split("\n")
238 current_unit = []
239 in_table = False
241 for line in lines:
242 line = line.strip()
244 # Detect table boundaries
245 is_table_line = bool(re.match(r"^\|.*\|$", line)) or bool(re.match(r"^[|\-\s:]+$", line))
247 if is_table_line and not in_table:
248 # Starting a new table
249 if current_unit:
250 logical_units.append("\n".join(current_unit))
251 current_unit = []
252 in_table = True
253 current_unit.append(line)
254 elif not is_table_line and in_table:
255 # Ending a table
256 if current_unit:
257 logical_units.append("\n".join(current_unit))
258 current_unit = []
259 in_table = False
260 if line: # Don't add empty lines
261 current_unit.append(line)
262 else:
263 # Continue current unit
264 if line or current_unit: # Don't start with empty lines
265 current_unit.append(line)
267 # Add final unit
268 if current_unit:
269 logical_units.append("\n".join(current_unit))
271 # Split large logical units that exceed max_size
272 split_logical_units = []
273 for unit in logical_units:
274 if len(unit) > max_size:
275 # Split large unit by lines to preserve table structure
276 lines = unit.split('\n')
277 current_sub_unit = []
279 for line in lines:
280 # Check if adding this line would exceed max_size
281 test_unit = '\n'.join(current_sub_unit + [line])
282 if current_sub_unit and len(test_unit) > max_size:
283 # Save current sub-unit and start new one
284 split_logical_units.append('\n'.join(current_sub_unit))
285 current_sub_unit = [line]
286 else:
287 current_sub_unit.append(line)
289 # Add the last sub-unit
290 if current_sub_unit:
291 split_logical_units.append('\n'.join(current_sub_unit))
292 else:
293 split_logical_units.append(unit)
295 # Use the split logical units
296 logical_units = split_logical_units
298 # Group logical units into chunks
299 i = 0
300 while i < len(logical_units) and len(chunks) < max_chunks_per_section:
301 current_chunk = ""
302 units_in_chunk = 0
304 # Build the current chunk
305 j = i
306 while j < len(logical_units):
307 unit = logical_units[j]
309 # Check if adding this unit would exceed max_size
310 if current_chunk and len(current_chunk) + len(unit) + 2 > max_size:
311 break
313 # Add unit to chunk
314 if current_chunk:
315 current_chunk += "\n\n" + unit
316 else:
317 current_chunk = unit
319 units_in_chunk += 1
320 j += 1
322 # Add chunk if it has content
323 if current_chunk.strip():
324 chunks.append(current_chunk.strip())
326 # Calculate overlap and advance position
327 if units_in_chunk > 0:
328 if self.chunk_overlap == 0:
329 advance = units_in_chunk
330 else:
331 # For Excel content, use more conservative overlap (preserve table boundaries)
332 overlap_units = min(1, units_in_chunk // 2) # At most 1 unit overlap
333 advance = max(1, units_in_chunk - overlap_units)
335 i += advance
336 else:
337 i += 1
339 # Handle remaining units if we hit the chunk limit
340 if i < len(logical_units) and len(chunks) >= max_chunks_per_section:
341 logger.warning(
342 f"Excel sheet reached maximum chunks limit ({max_chunks_per_section}), truncating remaining content",
343 extra={
344 "remaining_units": len(logical_units) - i,
345 "max_chunks_per_section": max_chunks_per_section,
346 },
347 )
349 return chunks
352class FallbackSplitter(BaseSplitter):
353 """Simple fallback splitter for when other strategies fail."""
355 def split_content(self, content: str, max_size: int) -> list[str]:
356 """Simple chunking implementation based on fixed size.
358 Args:
359 content: Content to split
360 max_size: Maximum chunk size
362 Returns:
363 List of content chunks
364 """
365 chunks = []
367 # Split by paragraphs first
368 paragraphs = re.split(r"\n\s*\n", content)
369 current_chunk = ""
371 for para in paragraphs:
372 if len(current_chunk) + len(para) <= max_size:
373 current_chunk += para + "\n\n"
374 else:
375 if current_chunk:
376 chunks.append(current_chunk.strip())
377 current_chunk = para + "\n\n"
379 # Add the last chunk if not empty
380 if current_chunk:
381 chunks.append(current_chunk.strip())
383 return chunks
386class SectionSplitter:
387 """Main section splitter that coordinates different splitting strategies."""
389 def __init__(self, settings: "Settings"):
390 """Initialize the section splitter.
392 Args:
393 settings: Configuration settings
394 """
395 self.settings = settings
396 self.standard_splitter = StandardSplitter(settings)
397 self.excel_splitter = ExcelSplitter(settings)
398 self.fallback_splitter = FallbackSplitter(settings)
400 def analyze_header_distribution(self, text: str) -> HeaderAnalysis:
401 """Analyze header distribution to guide splitting decisions.
403 Args:
404 text: Document content to analyze
406 Returns:
407 HeaderAnalysis with distribution metrics
408 """
409 analysis = HeaderAnalysis()
410 analysis.content_length = len(text)
412 lines = text.split('\n')
413 for line in lines:
414 line = line.strip()
415 header_match = re.match(r'^(#{1,6})\s+(.+)', line)
416 if header_match:
417 level = len(header_match.group(1))
418 if level == 1:
419 analysis.h1 += 1
420 elif level == 2:
421 analysis.h2 += 1
422 elif level == 3:
423 analysis.h3 += 1
424 elif level == 4:
425 analysis.h4 += 1
426 elif level == 5:
427 analysis.h5 += 1
428 elif level == 6:
429 analysis.h6 += 1
431 # Let __post_init__ calculate derived metrics
432 analysis.__post_init__()
434 logger.debug(
435 "Header distribution analysis",
436 extra={
437 "h1": analysis.h1,
438 "h2": analysis.h2,
439 "h3": analysis.h3,
440 "total_headers": analysis.total_headers,
441 "content_length": analysis.content_length,
442 "avg_section_size": analysis.avg_section_size,
443 }
444 )
446 return analysis
448 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]:
449 """Intelligently determine optimal split levels based on document characteristics.
451 Args:
452 text: Document content
453 document: Optional document for context
455 Returns:
456 Set of header levels to split on
457 """
458 header_analysis = self.analyze_header_distribution(text)
460 # Check if this is a converted Excel file
461 is_converted_excel = (
462 document and
463 document.metadata.get("original_file_type") == "xlsx"
464 )
466 if is_converted_excel:
467 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets
468 if header_analysis.h3 > 10:
469 return {1, 2, 3}
470 else:
471 return {1, 2}
473 # Regular markdown: Intelligent granularity based on structure
474 if header_analysis.h1 <= 1 and header_analysis.h2 >= 3:
475 # Single H1 with multiple H2s - the common case requiring granular splitting!
476 logger.info(
477 "Detected single H1 with multiple H2 sections - applying granular splitting",
478 extra={
479 "h1_count": header_analysis.h1,
480 "h2_count": header_analysis.h2,
481 "h3_count": header_analysis.h3,
482 }
483 )
484 # Split on H2 and H3 if there are many H3s
485 if header_analysis.h3 >= 8:
486 return {1, 2, 3}
487 else:
488 return {1, 2}
489 elif header_analysis.h1 >= 3:
490 # Multiple H1s - keep traditional splitting to avoid over-fragmentation
491 logger.info(
492 "Multiple H1 sections detected - using traditional H1-only splitting",
493 extra={"h1_count": header_analysis.h1}
494 )
495 return {1}
496 elif header_analysis.h1 == 0 and header_analysis.h2 == 0 and header_analysis.h3 >= 1:
497 # 🔥 FIX: Converted documents often have only H3+ headers
498 logger.info(
499 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting",
500 extra={
501 "h1_count": header_analysis.h1,
502 "h2_count": header_analysis.h2,
503 "h3_count": header_analysis.h3,
504 "h4_count": header_analysis.h4,
505 "total_headers": header_analysis.total_headers,
506 }
507 )
508 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure
509 if header_analysis.h3 == 1 and header_analysis.h4 >= 3:
510 # Single H3 with multiple H4s (common DOCX pattern) - split on both
511 return {3, 4}
512 elif header_analysis.h3 >= 3:
513 # Multiple H3s - split on H3 primarily, H4 if many
514 if header_analysis.h4 >= 8:
515 return {3, 4}
516 else:
517 return {3}
518 elif header_analysis.total_headers >= 8:
519 # Many headers total - split on H3 and H4
520 return {3, 4}
521 else:
522 # Default - split on H3 only
523 return {3}
524 elif header_analysis.total_headers <= 3:
525 # Very small document - minimal splitting
526 logger.info(
527 "Small document detected - minimal splitting",
528 extra={"total_headers": header_analysis.total_headers}
529 )
530 return {1, 2}
531 else:
532 # Default case - moderate granularity
533 return {1, 2}
535 def build_enhanced_section_metadata(self, sections: list[dict]) -> list[SectionMetadata]:
536 """Build enhanced section metadata with hierarchical relationships.
538 Args:
539 sections: Basic section data from split_sections
541 Returns:
542 List of enhanced SectionMetadata objects
543 """
544 enhanced_sections = []
546 for i, section in enumerate(sections):
547 # Build breadcrumb from hierarchy path
548 breadcrumb_parts = section.get("path", [])
549 if section.get("title"):
550 breadcrumb_parts = breadcrumb_parts + [section["title"]]
551 breadcrumb = " > ".join(breadcrumb_parts)
553 # Find parent section
554 parent_section = None
555 if section.get("path"):
556 parent_section = section["path"][-1]
558 # Find siblings (sections at same level with same parent)
559 current_level = section.get("level", 0)
560 current_path = section.get("path", [])
561 sibling_sections = []
563 for other_section in sections:
564 if (other_section != section and
565 other_section.get("level") == current_level and
566 other_section.get("path", []) == current_path):
567 sibling_sections.append(other_section.get("title", ""))
569 # Find previous and next sections
570 previous_section = sections[i-1].get("title") if i > 0 else None
571 next_section = sections[i+1].get("title") if i < len(sections) - 1 else None
573 # Find subsections (direct children)
574 subsections = []
575 current_title = section.get("title", "")
576 for other_section in sections[i+1:]:
577 other_path = other_section.get("path", [])
578 if (len(other_path) > len(current_path) and
579 other_path[:-1] == current_path + [current_title]):
580 subsections.append(other_section.get("title", ""))
581 elif len(other_path) <= len(current_path):
582 # We've moved to a different branch
583 break
585 # Analyze content characteristics
586 content = section.get("content", "")
587 content_analysis = {
588 "has_code_blocks": bool(re.search(r"```", content)),
589 "has_tables": bool(re.search(r"\|.*\|", content)),
590 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)),
591 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)),
592 "word_count": len(content.split()),
593 "estimated_read_time": max(1, len(content.split()) // 200), # minutes
594 "char_count": len(content),
595 }
597 enhanced_section = SectionMetadata(
598 title=section.get("title", "Untitled"),
599 level=section.get("level", 0),
600 content=content,
601 order=i,
602 start_line=0, # Could be enhanced to track actual line numbers
603 end_line=0,
604 parent_section=parent_section,
605 breadcrumb=breadcrumb,
606 previous_section=previous_section,
607 next_section=next_section,
608 sibling_sections=sibling_sections,
609 subsections=subsections,
610 content_analysis=content_analysis,
611 )
613 enhanced_sections.append(enhanced_section)
615 return enhanced_sections
617 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]:
618 """Split text into sections based on headers and document type.
620 Args:
621 text: Text to split
622 document: Optional document for context
624 Returns:
625 List of section dictionaries
626 """
627 from .document_parser import DocumentParser, HierarchyBuilder
629 parser = DocumentParser()
630 hierarchy_builder = HierarchyBuilder()
632 structure = parser.parse_document_structure(text)
633 sections = []
634 current_section = None
635 current_level = None
636 current_title = None
637 current_path = []
639 # 🔥 ENHANCED: Use intelligent split level determination
640 split_levels = self.determine_optimal_split_levels(text, document)
642 logger.debug(
643 "Determined optimal split levels",
644 extra={
645 "split_levels": list(split_levels),
646 "document_type": "excel" if document and document.metadata.get("original_file_type") == "xlsx" else "markdown"
647 }
648 )
650 for item in structure:
651 if item["type"] == "header":
652 level = item["level"]
654 # Create new section for split levels or first header (level 0)
655 if level in split_levels or (level == 0 and not sections):
656 # Save previous section if exists
657 if current_section is not None:
658 sections.append(
659 {
660 "content": current_section,
661 "level": current_level,
662 "title": current_title,
663 "path": list(current_path),
664 "is_excel_sheet": document and document.metadata.get("original_file_type") == "xlsx" and level == 2,
665 }
666 )
667 # Start new section
668 current_section = item["text"] + "\n"
669 current_level = level
670 current_title = item["title"]
671 current_path = hierarchy_builder.get_section_path(item, structure)
672 else:
673 # For deeper headers, just add to current section
674 if current_section is not None:
675 current_section += item["text"] + "\n"
676 else:
677 if current_section is not None:
678 current_section += item["text"] + "\n"
679 else:
680 # If no section started yet, treat as preamble
681 current_section = item["text"] + "\n"
682 current_level = 0
683 current_title = "Preamble" if not (document and document.metadata.get("original_file_type") == "xlsx") else "Sheet Data"
684 current_path = []
686 # Add the last section
687 if current_section is not None:
688 sections.append(
689 {
690 "content": current_section,
691 "level": current_level,
692 "title": current_title,
693 "path": list(current_path),
694 "is_excel_sheet": document and document.metadata.get("original_file_type") == "xlsx" and current_level == 2,
695 }
696 )
698 # Check if sections are too large and split them
699 chunk_size = self.settings.global_config.chunking.chunk_size
700 final_sections = []
702 for section in sections:
703 if len(section["content"]) > chunk_size:
704 # Split large section into smaller chunks
705 logger.debug(
706 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks",
707 extra={
708 "section_title": section.get("title", "Unknown"),
709 "section_size": len(section["content"]),
710 "chunk_size_limit": chunk_size,
711 "is_excel_sheet": section.get("is_excel_sheet", False),
712 },
713 )
715 # Choose appropriate splitter
716 if section.get("is_excel_sheet", False):
717 sub_chunks = self.excel_splitter.split_content(section["content"], chunk_size)
718 else:
719 sub_chunks = self.standard_splitter.split_content(section["content"], chunk_size)
721 # Create metadata for each sub-chunk
722 for i, sub_chunk in enumerate(sub_chunks):
723 sub_section = {
724 "content": sub_chunk,
725 "level": section["level"],
726 "title": f"{section['title']} (Part {i+1})" if section.get("title") else f"Part {i+1}",
727 "path": section["path"],
728 "parent_section": section.get("title", "Unknown"),
729 "sub_chunk_index": i,
730 "total_sub_chunks": len(sub_chunks),
731 "is_excel_sheet": section.get("is_excel_sheet", False),
732 }
733 final_sections.append(sub_section)
734 else:
735 # Section is already small enough
736 final_sections.append(section)
738 # Ensure each section has proper metadata
739 for section in final_sections:
740 if "level" not in section:
741 section["level"] = 0
742 if "title" not in section:
743 section["title"] = parser.extract_section_title(section["content"])
744 if "path" not in section:
745 section["path"] = []
746 if "is_excel_sheet" not in section:
747 section["is_excel_sheet"] = False
749 return final_sections
751 def merge_related_sections(
752 self, sections: list[dict[str, Any]]
753 ) -> list[dict[str, Any]]:
754 """Merge small related sections to maintain context.
756 Args:
757 sections: List of section dictionaries
759 Returns:
760 List of merged section dictionaries
761 """
762 if not sections:
763 return []
765 merged = []
766 current_section = sections[0].copy()
767 min_section_size = 500 # Minimum characters for a standalone section
769 for i in range(1, len(sections)):
770 next_section = sections[i]
772 # If current section is small and next section is a subsection, merge them
773 if (
774 len(current_section["content"]) < min_section_size
775 and next_section["level"] > current_section["level"]
776 ):
777 current_section["content"] += "\n" + next_section["content"]
778 # Keep other metadata from the parent section
779 else:
780 merged.append(current_section)
781 current_section = next_section.copy()
783 # Add the last section
784 merged.append(current_section)
785 return merged