Coverage for src / qdrant_loader / core / chunking / strategy / markdown / section_splitter.py: 89%
226 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
1"""Section splitting strategies for markdown chunking."""
3import re
4from dataclasses import dataclass
5from typing import TYPE_CHECKING, Any
7import structlog
9if TYPE_CHECKING:
10 from qdrant_loader.config import Settings
12# Re-export classes and local dependencies at top to satisfy E402
13from .document_parser import DocumentParser, HierarchyBuilder # noqa: F401
14from .splitters.base import BaseSplitter # re-export base class # noqa: F401
15from .splitters.excel import ExcelSplitter # re-export # noqa: F401
16from .splitters.fallback import FallbackSplitter # re-export # noqa: F401
17from .splitters.standard import StandardSplitter # re-export # noqa: F401
19logger = structlog.get_logger(__name__)
22# Markdown configuration placeholder - can be imported from settings if needed
23class MarkdownConfig:
24 """Configuration for markdown processing."""
26 words_per_minute_reading = 200
29markdown_config = MarkdownConfig()
32@dataclass
33class HeaderAnalysis:
34 """Analysis of header distribution in a document."""
36 h1: int = 0
37 h2: int = 0
38 h3: int = 0
39 h4: int = 0
40 h5: int = 0
41 h6: int = 0
42 total_headers: int = 0
43 content_length: int = 0
44 avg_section_size: int = 0
46 def __post_init__(self):
47 """Calculate derived metrics."""
48 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6
49 if self.total_headers > 0:
50 self.avg_section_size = self.content_length // self.total_headers
53@dataclass
54class SectionMetadata:
55 """Enhanced section metadata with hierarchical relationships."""
57 title: str
58 level: int
59 content: str
60 order: int
61 start_line: int
62 end_line: int
63 parent_section: str = None
64 breadcrumb: str = ""
65 anchor: str = ""
66 previous_section: str = None
67 next_section: str = None
68 sibling_sections: list[str] = None
69 subsections: list[str] = None
70 content_analysis: dict = None
72 def __post_init__(self):
73 """Initialize default values."""
74 if self.sibling_sections is None:
75 self.sibling_sections = []
76 if self.subsections is None:
77 self.subsections = []
78 if self.content_analysis is None:
79 self.content_analysis = {}
80 if not self.anchor:
81 self.anchor = self._generate_anchor()
83 def _generate_anchor(self) -> str:
84 """Generate URL anchor from title."""
85 import re
87 # Convert title to lowercase, replace spaces and special chars with hyphens
88 anchor = re.sub(r"[^\w\s-]", "", self.title.lower())
89 anchor = re.sub(r"[-\s]+", "-", anchor)
90 return anchor.strip("-")
93class SectionSplitter:
94 """Main section splitter that coordinates different splitting strategies."""
96 def __init__(self, settings: "Settings"):
97 """Initialize the section splitter.
99 Args:
100 settings: Configuration settings
101 """
102 self.settings = settings
103 self.standard_splitter = StandardSplitter(settings)
104 self.excel_splitter = ExcelSplitter(settings)
105 self.fallback_splitter = FallbackSplitter(settings)
107 def _is_excel_document(self, document: Any) -> bool:
108 """Check whether a document originated from an Excel file (.xls/.xlsx)."""
109 if not document:
110 return False
112 metadata = getattr(document, "metadata", None) or {}
113 original_file_type = str(metadata.get("original_file_type", "")).lower()
114 normalized_file_type = original_file_type.lstrip(".")
116 return normalized_file_type in {"xls", "xlsx"}
118 def analyze_header_distribution(self, text: str) -> HeaderAnalysis:
119 """Analyze header distribution to guide splitting decisions.
121 Args:
122 text: Document content to analyze
124 Returns:
125 HeaderAnalysis with distribution metrics
126 """
127 analysis = HeaderAnalysis()
128 analysis.content_length = len(text)
130 lines = text.split("\n")
131 for line in lines:
132 line = line.strip()
133 header_match = re.match(r"^(#{1,6})\s+(.+)", line)
134 if header_match:
135 level = len(header_match.group(1))
136 if level == 1:
137 analysis.h1 += 1
138 elif level == 2:
139 analysis.h2 += 1
140 elif level == 3:
141 analysis.h3 += 1
142 elif level == 4:
143 analysis.h4 += 1
144 elif level == 5:
145 analysis.h5 += 1
146 elif level == 6:
147 analysis.h6 += 1
149 # Let __post_init__ calculate derived metrics
150 analysis.__post_init__()
152 logger.debug(
153 "Header distribution analysis",
154 extra={
155 "h1": analysis.h1,
156 "h2": analysis.h2,
157 "h3": analysis.h3,
158 "total_headers": analysis.total_headers,
159 "content_length": analysis.content_length,
160 "avg_section_size": analysis.avg_section_size,
161 },
162 )
164 return analysis
166 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]:
167 """Intelligently determine optimal split levels based on document characteristics.
169 Args:
170 text: Document content
171 document: Optional document for context
173 Returns:
174 Set of header levels to split on
175 """
176 header_analysis = self.analyze_header_distribution(text)
178 # Check if this is a converted Excel file
179 is_converted_excel = self._is_excel_document(document)
181 if is_converted_excel:
182 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets
183 if header_analysis.h3 > 10:
184 return {1, 2, 3}
185 else:
186 return {1, 2}
188 # Get configured thresholds
189 markdown_config = self.settings.global_config.chunking.strategies.markdown
190 h1_threshold = markdown_config.header_analysis_threshold_h1
191 h3_threshold = markdown_config.header_analysis_threshold_h3
193 # Regular markdown: Intelligent granularity based on structure
194 if header_analysis.h1 <= 1 and header_analysis.h2 >= h1_threshold:
195 # Single H1 with multiple H2s - the common case requiring granular splitting!
196 logger.info(
197 "Detected single H1 with multiple H2 sections - applying granular splitting",
198 extra={
199 "h1_count": header_analysis.h1,
200 "h2_count": header_analysis.h2,
201 "h3_count": header_analysis.h3,
202 },
203 )
204 # Split on H2 and H3 if there are many H3s
205 if header_analysis.h3 >= h3_threshold:
206 return {1, 2, 3}
207 else:
208 return {1, 2}
209 elif header_analysis.h1 >= h1_threshold:
210 # Multiple H1s - keep traditional splitting to avoid over-fragmentation
211 logger.info(
212 "Multiple H1 sections detected - using traditional H1-only splitting",
213 extra={"h1_count": header_analysis.h1},
214 )
215 return {1}
216 elif (
217 header_analysis.h1 == 0
218 and header_analysis.h2 == 0
219 and header_analysis.h3 >= 1
220 ):
221 # 🔥 FIX: Converted documents often have only H3+ headers
222 logger.info(
223 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting",
224 extra={
225 "h1_count": header_analysis.h1,
226 "h2_count": header_analysis.h2,
227 "h3_count": header_analysis.h3,
228 "h4_count": header_analysis.h4,
229 "total_headers": header_analysis.total_headers,
230 },
231 )
232 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure
233 if header_analysis.h3 == 1 and header_analysis.h4 >= h1_threshold:
234 # Single H3 with multiple H4s (common DOCX pattern) - split on both
235 return {3, 4}
236 elif header_analysis.h3 >= h1_threshold:
237 # Multiple H3s - split on H3 primarily, H4 if many
238 if header_analysis.h4 >= h3_threshold:
239 return {3, 4}
240 else:
241 return {3}
242 elif header_analysis.total_headers >= h3_threshold:
243 # Many headers total - split on H3 and H4
244 return {3, 4}
245 else:
246 # Default - split on H3 only
247 return {3}
248 elif header_analysis.total_headers <= 3:
249 # Very small document - minimal splitting
250 logger.info(
251 "Small document detected - minimal splitting",
252 extra={"total_headers": header_analysis.total_headers},
253 )
254 return {1, 2}
255 else:
256 # Default case - moderate granularity
257 return {1, 2}
259 def build_enhanced_section_metadata(
260 self, sections: list[dict]
261 ) -> list[SectionMetadata]:
262 """Build enhanced section metadata with hierarchical relationships.
264 Args:
265 sections: Basic section data from split_sections
267 Returns:
268 List of enhanced SectionMetadata objects
269 """
270 enhanced_sections: list[SectionMetadata] = []
272 for i, section in enumerate(sections):
273 breadcrumb_parts = section.get("path", [])
274 if section.get("title"):
275 breadcrumb_parts = breadcrumb_parts + [section["title"]]
276 breadcrumb = " > ".join(breadcrumb_parts)
278 parent_section = None
279 if section.get("path"):
280 parent_section = section["path"][-1]
282 current_level = section.get("level", 0)
283 current_path = section.get("path", [])
284 sibling_sections: list[str] = []
286 for other_section in sections:
287 if (
288 other_section != section
289 and other_section.get("level") == current_level
290 and other_section.get("path", []) == current_path
291 ):
292 sibling_sections.append(other_section.get("title", ""))
294 previous_section = sections[i - 1].get("title") if i > 0 else None
295 next_section = (
296 sections[i + 1].get("title") if i < len(sections) - 1 else None
297 )
299 subsections: list[str] = []
300 current_title = section.get("title", "")
301 for other_section in sections[i + 1 :]:
302 other_path = other_section.get("path", [])
303 if len(other_path) > len(current_path) and other_path[
304 :-1
305 ] == current_path + [current_title]:
306 subsections.append(other_section.get("title", ""))
307 elif len(other_path) <= len(current_path):
308 break
310 content = section.get("content", "")
311 content_analysis = {
312 "has_code_blocks": bool(re.search(r"```", content)),
313 "has_tables": bool(re.search(r"\|.*\|", content)),
314 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)),
315 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)),
316 "word_count": len(content.split()),
317 "estimated_read_time": max(
318 1, len(content.split()) // markdown_config.words_per_minute_reading
319 ),
320 "char_count": len(content),
321 }
323 enhanced_section = SectionMetadata(
324 title=section.get("title", "Untitled"),
325 level=section.get("level", 0),
326 content=content,
327 order=i,
328 start_line=0,
329 end_line=0,
330 parent_section=parent_section,
331 breadcrumb=breadcrumb,
332 previous_section=previous_section,
333 next_section=next_section,
334 sibling_sections=sibling_sections,
335 subsections=subsections,
336 content_analysis=content_analysis,
337 )
339 enhanced_sections.append(enhanced_section)
341 return enhanced_sections
343 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]:
344 """Split text into sections based on headers and document type.
346 Args:
347 text: Text to split
348 document: Optional document for context
350 Returns:
351 List of section dictionaries
352 """
354 parser = DocumentParser()
355 hierarchy_builder = HierarchyBuilder()
357 structure = parser.parse_document_structure(text)
358 sections: list[dict[str, Any]] = []
359 current_section = None
360 current_level = None
361 current_title = None
362 current_path: list[str] = []
364 split_levels = self.determine_optimal_split_levels(text, document)
366 logger.debug(
367 "Determined optimal split levels",
368 extra={
369 "split_levels": list(split_levels),
370 "document_type": (
371 "excel" if self._is_excel_document(document) else "markdown"
372 ),
373 },
374 )
376 for item in structure:
377 if item["type"] == "header":
378 level = item["level"]
380 if level in split_levels or (level == 0 and not sections):
381 if current_section is not None:
382 sections.append(
383 {
384 "content": current_section,
385 "level": current_level,
386 "title": current_title,
387 "path": list(current_path),
388 "is_excel_sheet": self._is_excel_document(document)
389 and level == 2,
390 }
391 )
392 current_section = item["text"] + "\n"
393 current_level = level
394 current_title = item["title"]
395 current_path = hierarchy_builder.get_section_path(item, structure)
396 else:
397 if current_section is not None:
398 current_section += item["text"] + "\n"
399 else:
400 if current_section is not None:
401 current_section += item["text"] + "\n"
402 else:
403 current_section = item["text"] + "\n"
404 current_level = 0
405 current_title = (
406 "Preamble"
407 if not self._is_excel_document(document)
408 else "Sheet Data"
409 )
410 current_path = []
412 if current_section is not None:
413 sections.append(
414 {
415 "content": current_section,
416 "level": current_level,
417 "title": current_title,
418 "path": list(current_path),
419 "is_excel_sheet": self._is_excel_document(document)
420 and current_level == 2,
421 }
422 )
424 chunk_size = self.settings.global_config.chunking.chunk_size
425 final_sections: list[dict[str, Any]] = []
427 for section in sections:
428 if len(section["content"]) > chunk_size:
429 logger.debug(
430 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks",
431 extra={
432 "section_title": section.get("title", "Unknown"),
433 "section_size": len(section["content"]),
434 "chunk_size_limit": chunk_size,
435 "is_excel_sheet": section.get("is_excel_sheet", False),
436 },
437 )
439 if section.get("is_excel_sheet", False):
440 sub_chunks = self.excel_splitter.split_content(
441 section["content"], chunk_size
442 )
443 else:
444 sub_chunks = self.standard_splitter.split_content(
445 section["content"], chunk_size
446 )
448 for i, sub_chunk in enumerate(sub_chunks):
449 sub_section = {
450 "content": sub_chunk,
451 "level": section["level"],
452 "title": (
453 f"{section['title']} (Part {i+1})"
454 if section.get("title")
455 else f"Part {i+1}"
456 ),
457 "path": section["path"],
458 "parent_section": section.get("title", "Unknown"),
459 "sub_chunk_index": i,
460 "total_sub_chunks": len(sub_chunks),
461 "is_excel_sheet": section.get("is_excel_sheet", False),
462 }
463 final_sections.append(sub_section)
464 else:
465 final_sections.append(section)
467 for section in final_sections:
468 if "level" not in section:
469 section["level"] = 0
470 if "title" not in section:
471 section["title"] = parser.extract_section_title(section["content"])
472 if "path" not in section:
473 section["path"] = []
474 if "is_excel_sheet" not in section:
475 section["is_excel_sheet"] = False
477 return final_sections
479 def merge_related_sections(
480 self, sections: list[dict[str, Any]]
481 ) -> list[dict[str, Any]]:
482 """Merge small related sections to maintain context.
484 Args:
485 sections: List of section dictionaries
487 Returns:
488 List of merged section dictionaries
489 """
490 if not sections:
491 return []
493 merged: list[dict[str, Any]] = []
494 current_section = sections[0].copy()
495 min_section_size = (
496 self.settings.global_config.chunking.strategies.markdown.min_section_size
497 )
499 for i in range(1, len(sections)):
500 next_section = sections[i]
502 if (
503 len(current_section["content"]) < min_section_size
504 and next_section["level"] > current_section["level"]
505 ):
506 current_section["content"] += "\n" + next_section["content"]
507 else:
508 merged.append(current_section)
509 current_section = next_section.copy()
511 merged.append(current_section)
512 return merged