Coverage for src/qdrant_loader/core/chunking/strategy/html/html_section_splitter.py: 64%
216 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""HTML-specific section splitter for semantic boundary-based chunking."""
3import re
4from typing import Any
6from bs4 import BeautifulSoup, Tag
8from qdrant_loader.config import Settings
9from qdrant_loader.core.chunking.strategy.base.section_splitter import (
10 BaseSectionSplitter,
11)
12from qdrant_loader.core.document import Document
14from .html_document_parser import HTMLDocumentParser, SectionType
17class HTMLSectionSplitter(BaseSectionSplitter):
18 """Section splitter for HTML documents with semantic boundary detection."""
20 def __init__(self, settings: Settings):
21 super().__init__(settings)
23 # Get strategy-specific configuration
24 self.html_config = settings.global_config.chunking.strategies.html
25 self.simple_parsing_threshold = self.html_config.simple_parsing_threshold
26 self.max_html_size_for_parsing = self.html_config.max_html_size_for_parsing
27 self.preserve_semantic_structure = self.html_config.preserve_semantic_structure
29 # Initialize HTML document parser for semantic analysis
30 self.document_parser = HTMLDocumentParser()
32 # Performance limits
33 self.max_sections_to_process = 200
34 self.max_recursion_depth = 10
36 def split_sections(
37 self, content: str, document: Document | None = None
38 ) -> list[dict[str, Any]]:
39 """Split HTML content into semantic sections."""
40 if not content.strip():
41 return []
43 # Performance check: use simple parsing for very large files
44 if len(content) > self.max_html_size_for_parsing:
45 return self._simple_html_split(content)
47 try:
48 # Use semantic parsing for manageable files
49 if (
50 len(content) <= self.simple_parsing_threshold
51 and self.preserve_semantic_structure
52 ):
53 sections = self._semantic_html_split(content)
54 else:
55 sections = self._simple_html_split(content)
57 if not sections:
58 return self._fallback_split(content)
60 # Merge small sections and split large ones
61 merged_sections = self._merge_small_sections(sections)
62 final_sections = self._split_large_sections(merged_sections)
64 return final_sections[: self.max_chunks_per_document]
66 except Exception:
67 # Fallback to simple text-based splitting
68 return self._fallback_split(content)
70 def _semantic_html_split(self, content: str) -> list[dict[str, Any]]:
71 """Split HTML using semantic structure analysis."""
72 try:
73 soup = BeautifulSoup(content, "html.parser")
75 # Remove script and style elements for cleaner processing
76 for script in soup(["script", "style"]):
77 script.decompose()
79 sections = []
80 section_count = 0
82 def process_element(element, level=0, parent_path=""):
83 nonlocal section_count
85 # Performance limits
86 if section_count >= self.max_sections_to_process:
87 return
88 if level > self.max_recursion_depth:
89 return
91 if isinstance(element, Tag):
92 tag_name = element.name.lower()
94 # Check if this is a meaningful semantic element
95 if self._is_meaningful_element(element, tag_name):
96 text_content = element.get_text(strip=True)
98 # Skip empty or very small sections
99 if len(text_content) < 10:
100 return
102 # Build DOM path for context
103 current_path = (
104 f"{parent_path}/{tag_name}" if parent_path else tag_name
105 )
106 if element.get("id"):
107 current_path += f"#{element.get('id')}"
108 elif element.get("class"):
109 classes = " ".join(element.get("class", []))
110 current_path += f".{classes.replace(' ', '.')}"
112 # Extract section metadata
113 section_metadata = (
114 self.document_parser.extract_section_metadata(element)
115 )
117 # Add HTML-specific context
118 section_metadata.update(
119 {
120 "content": str(element),
121 "dom_path": current_path,
122 "depth_level": level,
123 "parent_path": parent_path,
124 "text_content": text_content,
125 "element_position": section_count,
126 }
127 )
129 sections.append(section_metadata)
130 section_count += 1
132 # Don't process children of certain container elements to avoid duplication
133 if tag_name in self.document_parser.section_elements:
134 return
136 # Process children with depth limit
137 if hasattr(element, "children") and level < self.max_recursion_depth:
138 for child in element.children:
139 process_element(
140 child,
141 level + 1,
142 current_path if isinstance(element, Tag) else parent_path,
143 )
145 # Start processing from body or root
146 body = soup.find("body")
147 if body:
148 process_element(body)
149 else:
150 process_element(soup)
152 return sections
154 except Exception:
155 # Fallback to simple parsing
156 return self._simple_html_split(content)
158 def _simple_html_split(self, content: str) -> list[dict[str, Any]]:
159 """Simple HTML splitting for large files or when semantic parsing fails."""
160 try:
161 soup = BeautifulSoup(content, "html.parser")
163 # Remove script and style elements
164 for script in soup(["script", "style"]):
165 script.decompose()
167 # Get clean text
168 text = soup.get_text(separator="\n", strip=True)
170 # Split into chunks by size
171 sections = []
172 chunks = self._split_text_by_size(text, self.chunk_size)
174 for i, chunk in enumerate(chunks):
175 section = {
176 "content": chunk,
177 "text_content": chunk,
178 "tag_name": "div",
179 "section_type": SectionType.DIV.value,
180 "level": 0,
181 "attributes": {},
182 "dom_path": f"body/div[{i}]",
183 "depth_level": 1,
184 "parent_path": "body",
185 "element_position": i,
186 "word_count": len(chunk.split()),
187 "char_count": len(chunk),
188 "parsing_method": "simple",
189 }
190 sections.append(section)
192 return sections
194 except Exception:
195 return self._fallback_split(content)
197 def _fallback_split(self, content: str) -> list[dict[str, Any]]:
198 """Ultimate fallback: treat as plain text."""
199 chunks = self._split_text_by_size(content, self.chunk_size)
201 sections = []
202 for i, chunk in enumerate(chunks):
203 section = {
204 "content": chunk,
205 "text_content": chunk,
206 "tag_name": "div",
207 "section_type": SectionType.DIV.value,
208 "level": 0,
209 "attributes": {},
210 "dom_path": f"fallback/div[{i}]",
211 "depth_level": 0,
212 "parent_path": "",
213 "element_position": i,
214 "word_count": len(chunk.split()),
215 "char_count": len(chunk),
216 "parsing_method": "fallback",
217 }
218 sections.append(section)
220 return sections
222 def _is_meaningful_element(self, element: Tag, tag_name: str) -> bool:
223 """Check if an HTML element is meaningful for chunking."""
224 # Always include semantic HTML5 elements
225 if tag_name in self.document_parser.section_elements:
226 return True
228 # Include headings
229 if tag_name in self.document_parser.heading_elements:
230 return True
232 # Include block-level content elements
233 if tag_name in self.document_parser.block_elements:
234 return True
236 # Include elements with meaningful content
237 text_content = element.get_text(strip=True)
238 if len(text_content) >= 50: # Minimum meaningful content
239 return True
241 # Include elements with specific roles or IDs
242 if element.get("role") or element.get("id"):
243 return True
245 return False
247 def _merge_small_sections(
248 self, sections: list[dict[str, Any]]
249 ) -> list[dict[str, Any]]:
250 """Merge small adjacent sections for better chunk utilization."""
251 if not sections:
252 return []
254 merged = []
255 current_group = []
256 current_size = 0
257 min_section_size = 100 # Minimum size for standalone sections
259 for section in sections:
260 section_size = len(section.get("text_content", ""))
262 # Large sections or important semantic elements should stand alone
263 if (
264 section_size >= min_section_size
265 or section.get("tag_name") in self.document_parser.section_elements
266 or section.get("tag_name") in self.document_parser.heading_elements
267 ):
268 # First, process any accumulated small sections
269 if current_group:
270 merged_section = self._create_merged_section(current_group)
271 merged.append(merged_section)
272 current_group = []
273 current_size = 0
275 # Add the large/important section
276 merged.append(section)
277 else:
278 # Accumulate small sections
279 current_group.append(section)
280 current_size += section_size
282 # If accumulated size is sufficient, create a merged section
283 if current_size >= min_section_size:
284 merged_section = self._create_merged_section(current_group)
285 merged.append(merged_section)
286 current_group = []
287 current_size = 0
289 # Handle remaining small sections
290 if current_group:
291 merged_section = self._create_merged_section(current_group)
292 merged.append(merged_section)
294 return merged
296 def _create_merged_section(self, sections: list[dict[str, Any]]) -> dict[str, Any]:
297 """Create a merged section from multiple small sections."""
298 if not sections:
299 return {}
301 if len(sections) == 1:
302 return sections[0]
304 # Merge content and metadata
305 merged_content = "\n\n".join(section.get("content", "") for section in sections)
306 merged_text = "\n\n".join(
307 section.get("text_content", "") for section in sections
308 )
310 # Build combined DOM path
311 paths = [section.get("dom_path", "") for section in sections]
312 merged_path = f"merged[{','.join(paths[:3])}{'...' if len(paths) > 3 else ''}]"
314 # Use the first section as base and update
315 merged_section = sections[0].copy()
316 merged_section.update(
317 {
318 "content": merged_content,
319 "text_content": merged_text,
320 "tag_name": "div", # Generic container
321 "section_type": SectionType.DIV.value,
322 "dom_path": merged_path,
323 "word_count": len(merged_text.split()),
324 "char_count": len(merged_text),
325 "merged_sections_count": len(sections),
326 "is_merged": True,
327 }
328 )
330 return merged_section
332 def _split_large_sections(
333 self, sections: list[dict[str, Any]]
334 ) -> list[dict[str, Any]]:
335 """Split sections that are too large into smaller parts."""
336 final_sections = []
338 for section in sections:
339 content_size = len(section.get("content", ""))
341 if content_size > self.chunk_size:
342 # Split large sections
343 split_parts = self._split_large_content(
344 section.get("content", ""), self.chunk_size
345 )
347 for i, part in enumerate(split_parts):
348 split_section = section.copy()
349 split_section.update(
350 {
351 "content": part,
352 "text_content": self._extract_text_from_html(part),
353 "dom_path": f"{section.get('dom_path', 'unknown')}[part-{i+1}]",
354 "word_count": len(part.split()),
355 "char_count": len(part),
356 "is_split": True,
357 "split_part": i + 1,
358 "total_split_parts": len(split_parts),
359 }
360 )
361 final_sections.append(split_section)
362 else:
363 final_sections.append(section)
365 return final_sections
367 def _split_large_content(self, content: str, max_size: int) -> list[str]:
368 """Split large HTML content while preserving structure where possible."""
369 if len(content) <= max_size:
370 return [content]
372 try:
373 # Try to split by HTML structure first
374 soup = BeautifulSoup(content, "html.parser")
375 parts = []
376 current_part = ""
378 # Process top-level elements
379 for element in soup.children:
380 element_str = str(element)
382 if len(current_part) + len(element_str) <= max_size:
383 current_part += element_str
384 else:
385 if current_part:
386 parts.append(current_part)
387 current_part = element_str
389 # If single element is too large, split it by text
390 if len(current_part) > max_size:
391 text_parts = self._split_text_by_size(current_part, max_size)
392 parts.extend(text_parts[:-1]) # Add all but last
393 current_part = text_parts[-1] if text_parts else ""
395 # Limit number of parts
396 if len(parts) >= 10:
397 break
399 if current_part:
400 parts.append(current_part)
402 return parts
404 except Exception:
405 # Fallback to simple text splitting
406 return self._split_text_by_size(content, max_size)
408 def _split_text_by_size(self, text: str, max_size: int) -> list[str]:
409 """Split text by size with word boundaries."""
410 if len(text) <= max_size:
411 return [text]
413 parts = []
414 current_part = ""
416 # Split by paragraphs first
417 paragraphs = re.split(r"\n\s*\n", text)
419 for para in paragraphs:
420 if len(current_part) + len(para) + 2 <= max_size: # +2 for \n\n
421 current_part += para + "\n\n"
422 else:
423 if current_part:
424 parts.append(current_part.strip())
426 # If single paragraph is too large, split by sentences
427 if len(para) > max_size:
428 sentences = re.split(r"(?<=[.!?])\s+", para)
429 current_part = ""
431 for sentence in sentences:
432 if len(current_part) + len(sentence) + 1 <= max_size:
433 current_part += sentence + " "
434 else:
435 if current_part:
436 parts.append(current_part.strip())
437 current_part = sentence + " "
438 else:
439 current_part = para + "\n\n"
441 # Limit number of parts
442 if len(parts) >= 20:
443 break
445 if current_part:
446 parts.append(current_part.strip())
448 return parts
450 def _extract_text_from_html(self, html_content: str) -> str:
451 """Extract clean text from HTML content."""
452 try:
453 soup = BeautifulSoup(html_content, "html.parser")
454 return soup.get_text(separator=" ", strip=True)
455 except Exception:
456 # Fallback: remove HTML tags with regex
457 text = re.sub(r"<[^>]+>", "", html_content)
458 return re.sub(r"\s+", " ", text).strip()