Coverage for src/qdrant_loader/core/chunking/strategy/markdown/section_splitter.py: 89%
219 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Section splitting strategies for markdown chunking."""
3import re
4from dataclasses import dataclass
5from typing import TYPE_CHECKING, Any
7import structlog
9if TYPE_CHECKING:
10 from qdrant_loader.config import Settings
12# Re-export classes and local dependencies at top to satisfy E402
13from .document_parser import DocumentParser, HierarchyBuilder # noqa: F401
14from .splitters.base import BaseSplitter # re-export base class # noqa: F401
15from .splitters.excel import ExcelSplitter # re-export # noqa: F401
16from .splitters.fallback import FallbackSplitter # re-export # noqa: F401
17from .splitters.standard import StandardSplitter # re-export # noqa: F401
19logger = structlog.get_logger(__name__)
22# Markdown configuration placeholder - can be imported from settings if needed
23class MarkdownConfig:
24 """Configuration for markdown processing."""
26 words_per_minute_reading = 200
29markdown_config = MarkdownConfig()
32@dataclass
33class HeaderAnalysis:
34 """Analysis of header distribution in a document."""
36 h1: int = 0
37 h2: int = 0
38 h3: int = 0
39 h4: int = 0
40 h5: int = 0
41 h6: int = 0
42 total_headers: int = 0
43 content_length: int = 0
44 avg_section_size: int = 0
46 def __post_init__(self):
47 """Calculate derived metrics."""
48 self.total_headers = self.h1 + self.h2 + self.h3 + self.h4 + self.h5 + self.h6
49 if self.total_headers > 0:
50 self.avg_section_size = self.content_length // self.total_headers
53@dataclass
54class SectionMetadata:
55 """Enhanced section metadata with hierarchical relationships."""
57 title: str
58 level: int
59 content: str
60 order: int
61 start_line: int
62 end_line: int
63 parent_section: str = None
64 breadcrumb: str = ""
65 anchor: str = ""
66 previous_section: str = None
67 next_section: str = None
68 sibling_sections: list[str] = None
69 subsections: list[str] = None
70 content_analysis: dict = None
72 def __post_init__(self):
73 """Initialize default values."""
74 if self.sibling_sections is None:
75 self.sibling_sections = []
76 if self.subsections is None:
77 self.subsections = []
78 if self.content_analysis is None:
79 self.content_analysis = {}
80 if not self.anchor:
81 self.anchor = self._generate_anchor()
83 def _generate_anchor(self) -> str:
84 """Generate URL anchor from title."""
85 import re
87 # Convert title to lowercase, replace spaces and special chars with hyphens
88 anchor = re.sub(r"[^\w\s-]", "", self.title.lower())
89 anchor = re.sub(r"[-\s]+", "-", anchor)
90 return anchor.strip("-")
93class SectionSplitter:
94 """Main section splitter that coordinates different splitting strategies."""
96 def __init__(self, settings: "Settings"):
97 """Initialize the section splitter.
99 Args:
100 settings: Configuration settings
101 """
102 self.settings = settings
103 self.standard_splitter = StandardSplitter(settings)
104 self.excel_splitter = ExcelSplitter(settings)
105 self.fallback_splitter = FallbackSplitter(settings)
107 def analyze_header_distribution(self, text: str) -> HeaderAnalysis:
108 """Analyze header distribution to guide splitting decisions.
110 Args:
111 text: Document content to analyze
113 Returns:
114 HeaderAnalysis with distribution metrics
115 """
116 analysis = HeaderAnalysis()
117 analysis.content_length = len(text)
119 lines = text.split("\n")
120 for line in lines:
121 line = line.strip()
122 header_match = re.match(r"^(#{1,6})\s+(.+)", line)
123 if header_match:
124 level = len(header_match.group(1))
125 if level == 1:
126 analysis.h1 += 1
127 elif level == 2:
128 analysis.h2 += 1
129 elif level == 3:
130 analysis.h3 += 1
131 elif level == 4:
132 analysis.h4 += 1
133 elif level == 5:
134 analysis.h5 += 1
135 elif level == 6:
136 analysis.h6 += 1
138 # Let __post_init__ calculate derived metrics
139 analysis.__post_init__()
141 logger.debug(
142 "Header distribution analysis",
143 extra={
144 "h1": analysis.h1,
145 "h2": analysis.h2,
146 "h3": analysis.h3,
147 "total_headers": analysis.total_headers,
148 "content_length": analysis.content_length,
149 "avg_section_size": analysis.avg_section_size,
150 },
151 )
153 return analysis
155 def determine_optimal_split_levels(self, text: str, document=None) -> set[int]:
156 """Intelligently determine optimal split levels based on document characteristics.
158 Args:
159 text: Document content
160 document: Optional document for context
162 Returns:
163 Set of header levels to split on
164 """
165 header_analysis = self.analyze_header_distribution(text)
167 # Check if this is a converted Excel file
168 is_converted_excel = (
169 document and document.metadata.get("original_file_type") == "xlsx"
170 )
172 if is_converted_excel:
173 # Excel files: H1 (document) + H2 (sheets) + potentially H3 for large sheets
174 if header_analysis.h3 > 10:
175 return {1, 2, 3}
176 else:
177 return {1, 2}
179 # Get configured thresholds
180 markdown_config = self.settings.global_config.chunking.strategies.markdown
181 h1_threshold = markdown_config.header_analysis_threshold_h1
182 h3_threshold = markdown_config.header_analysis_threshold_h3
184 # Regular markdown: Intelligent granularity based on structure
185 if header_analysis.h1 <= 1 and header_analysis.h2 >= h1_threshold:
186 # Single H1 with multiple H2s - the common case requiring granular splitting!
187 logger.info(
188 "Detected single H1 with multiple H2 sections - applying granular splitting",
189 extra={
190 "h1_count": header_analysis.h1,
191 "h2_count": header_analysis.h2,
192 "h3_count": header_analysis.h3,
193 },
194 )
195 # Split on H2 and H3 if there are many H3s
196 if header_analysis.h3 >= h3_threshold:
197 return {1, 2, 3}
198 else:
199 return {1, 2}
200 elif header_analysis.h1 >= h1_threshold:
201 # Multiple H1s - keep traditional splitting to avoid over-fragmentation
202 logger.info(
203 "Multiple H1 sections detected - using traditional H1-only splitting",
204 extra={"h1_count": header_analysis.h1},
205 )
206 return {1}
207 elif (
208 header_analysis.h1 == 0
209 and header_analysis.h2 == 0
210 and header_analysis.h3 >= 1
211 ):
212 # 🔥 FIX: Converted documents often have only H3+ headers
213 logger.info(
214 "Detected document with H3+ headers only (likely converted DOCX) - applying H3+ splitting",
215 extra={
216 "h1_count": header_analysis.h1,
217 "h2_count": header_analysis.h2,
218 "h3_count": header_analysis.h3,
219 "h4_count": header_analysis.h4,
220 "total_headers": header_analysis.total_headers,
221 },
222 )
223 # 🔥 ENHANCED: Intelligent H3/H4 splitting based on document structure
224 if header_analysis.h3 == 1 and header_analysis.h4 >= h1_threshold:
225 # Single H3 with multiple H4s (common DOCX pattern) - split on both
226 return {3, 4}
227 elif header_analysis.h3 >= h1_threshold:
228 # Multiple H3s - split on H3 primarily, H4 if many
229 if header_analysis.h4 >= h3_threshold:
230 return {3, 4}
231 else:
232 return {3}
233 elif header_analysis.total_headers >= h3_threshold:
234 # Many headers total - split on H3 and H4
235 return {3, 4}
236 else:
237 # Default - split on H3 only
238 return {3}
239 elif header_analysis.total_headers <= 3:
240 # Very small document - minimal splitting
241 logger.info(
242 "Small document detected - minimal splitting",
243 extra={"total_headers": header_analysis.total_headers},
244 )
245 return {1, 2}
246 else:
247 # Default case - moderate granularity
248 return {1, 2}
250 def build_enhanced_section_metadata(
251 self, sections: list[dict]
252 ) -> list[SectionMetadata]:
253 """Build enhanced section metadata with hierarchical relationships.
255 Args:
256 sections: Basic section data from split_sections
258 Returns:
259 List of enhanced SectionMetadata objects
260 """
261 enhanced_sections: list[SectionMetadata] = []
263 for i, section in enumerate(sections):
264 breadcrumb_parts = section.get("path", [])
265 if section.get("title"):
266 breadcrumb_parts = breadcrumb_parts + [section["title"]]
267 breadcrumb = " > ".join(breadcrumb_parts)
269 parent_section = None
270 if section.get("path"):
271 parent_section = section["path"][-1]
273 current_level = section.get("level", 0)
274 current_path = section.get("path", [])
275 sibling_sections: list[str] = []
277 for other_section in sections:
278 if (
279 other_section != section
280 and other_section.get("level") == current_level
281 and other_section.get("path", []) == current_path
282 ):
283 sibling_sections.append(other_section.get("title", ""))
285 previous_section = sections[i - 1].get("title") if i > 0 else None
286 next_section = (
287 sections[i + 1].get("title") if i < len(sections) - 1 else None
288 )
290 subsections: list[str] = []
291 current_title = section.get("title", "")
292 for other_section in sections[i + 1 :]:
293 other_path = other_section.get("path", [])
294 if len(other_path) > len(current_path) and other_path[
295 :-1
296 ] == current_path + [current_title]:
297 subsections.append(other_section.get("title", ""))
298 elif len(other_path) <= len(current_path):
299 break
301 content = section.get("content", "")
302 content_analysis = {
303 "has_code_blocks": bool(re.search(r"```", content)),
304 "has_tables": bool(re.search(r"\|.*\|", content)),
305 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", content)),
306 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", content)),
307 "word_count": len(content.split()),
308 "estimated_read_time": max(
309 1, len(content.split()) // markdown_config.words_per_minute_reading
310 ),
311 "char_count": len(content),
312 }
314 enhanced_section = SectionMetadata(
315 title=section.get("title", "Untitled"),
316 level=section.get("level", 0),
317 content=content,
318 order=i,
319 start_line=0,
320 end_line=0,
321 parent_section=parent_section,
322 breadcrumb=breadcrumb,
323 previous_section=previous_section,
324 next_section=next_section,
325 sibling_sections=sibling_sections,
326 subsections=subsections,
327 content_analysis=content_analysis,
328 )
330 enhanced_sections.append(enhanced_section)
332 return enhanced_sections
334 def split_sections(self, text: str, document=None) -> list[dict[str, Any]]:
335 """Split text into sections based on headers and document type.
337 Args:
338 text: Text to split
339 document: Optional document for context
341 Returns:
342 List of section dictionaries
343 """
345 parser = DocumentParser()
346 hierarchy_builder = HierarchyBuilder()
348 structure = parser.parse_document_structure(text)
349 sections: list[dict[str, Any]] = []
350 current_section = None
351 current_level = None
352 current_title = None
353 current_path: list[str] = []
355 split_levels = self.determine_optimal_split_levels(text, document)
357 logger.debug(
358 "Determined optimal split levels",
359 extra={
360 "split_levels": list(split_levels),
361 "document_type": (
362 "excel"
363 if document
364 and document.metadata.get("original_file_type") == "xlsx"
365 else "markdown"
366 ),
367 },
368 )
370 for item in structure:
371 if item["type"] == "header":
372 level = item["level"]
374 if level in split_levels or (level == 0 and not sections):
375 if current_section is not None:
376 sections.append(
377 {
378 "content": current_section,
379 "level": current_level,
380 "title": current_title,
381 "path": list(current_path),
382 "is_excel_sheet": document
383 and document.metadata.get("original_file_type")
384 == "xlsx"
385 and level == 2,
386 }
387 )
388 current_section = item["text"] + "\n"
389 current_level = level
390 current_title = item["title"]
391 current_path = hierarchy_builder.get_section_path(item, structure)
392 else:
393 if current_section is not None:
394 current_section += item["text"] + "\n"
395 else:
396 if current_section is not None:
397 current_section += item["text"] + "\n"
398 else:
399 current_section = item["text"] + "\n"
400 current_level = 0
401 current_title = (
402 "Preamble"
403 if not (
404 document
405 and document.metadata.get("original_file_type") == "xlsx"
406 )
407 else "Sheet Data"
408 )
409 current_path = []
411 if current_section is not None:
412 sections.append(
413 {
414 "content": current_section,
415 "level": current_level,
416 "title": current_title,
417 "path": list(current_path),
418 "is_excel_sheet": document
419 and document.metadata.get("original_file_type") == "xlsx"
420 and current_level == 2,
421 }
422 )
424 chunk_size = self.settings.global_config.chunking.chunk_size
425 final_sections: list[dict[str, Any]] = []
427 for section in sections:
428 if len(section["content"]) > chunk_size:
429 logger.debug(
430 f"Section too large ({len(section['content'])} chars), splitting into smaller chunks",
431 extra={
432 "section_title": section.get("title", "Unknown"),
433 "section_size": len(section["content"]),
434 "chunk_size_limit": chunk_size,
435 "is_excel_sheet": section.get("is_excel_sheet", False),
436 },
437 )
439 if section.get("is_excel_sheet", False):
440 sub_chunks = self.excel_splitter.split_content(
441 section["content"], chunk_size
442 )
443 else:
444 sub_chunks = self.standard_splitter.split_content(
445 section["content"], chunk_size
446 )
448 for i, sub_chunk in enumerate(sub_chunks):
449 sub_section = {
450 "content": sub_chunk,
451 "level": section["level"],
452 "title": (
453 f"{section['title']} (Part {i+1})"
454 if section.get("title")
455 else f"Part {i+1}"
456 ),
457 "path": section["path"],
458 "parent_section": section.get("title", "Unknown"),
459 "sub_chunk_index": i,
460 "total_sub_chunks": len(sub_chunks),
461 "is_excel_sheet": section.get("is_excel_sheet", False),
462 }
463 final_sections.append(sub_section)
464 else:
465 final_sections.append(section)
467 for section in final_sections:
468 if "level" not in section:
469 section["level"] = 0
470 if "title" not in section:
471 section["title"] = parser.extract_section_title(section["content"])
472 if "path" not in section:
473 section["path"] = []
474 if "is_excel_sheet" not in section:
475 section["is_excel_sheet"] = False
477 return final_sections
479 def merge_related_sections(
480 self, sections: list[dict[str, Any]]
481 ) -> list[dict[str, Any]]:
482 """Merge small related sections to maintain context.
484 Args:
485 sections: List of section dictionaries
487 Returns:
488 List of merged section dictionaries
489 """
490 if not sections:
491 return []
493 merged: list[dict[str, Any]] = []
494 current_section = sections[0].copy()
495 min_section_size = (
496 self.settings.global_config.chunking.strategies.markdown.min_section_size
497 )
499 for i in range(1, len(sections)):
500 next_section = sections[i]
502 if (
503 len(current_section["content"]) < min_section_size
504 and next_section["level"] > current_section["level"]
505 ):
506 current_section["content"] += "\n" + next_section["content"]
507 else:
508 merged.append(current_section)
509 current_section = next_section.copy()
511 merged.append(current_section)
512 return merged