Coverage for src/qdrant_loader/core/chunking/strategy/html/html_section_splitter.py: 64%

216 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""HTML-specific section splitter for semantic boundary-based chunking.""" 

2 

3import re 

4from typing import Any 

5 

6from bs4 import BeautifulSoup, Tag 

7 

8from qdrant_loader.config import Settings 

9from qdrant_loader.core.chunking.strategy.base.section_splitter import ( 

10 BaseSectionSplitter, 

11) 

12from qdrant_loader.core.document import Document 

13 

14from .html_document_parser import HTMLDocumentParser, SectionType 

15 

16 

17class HTMLSectionSplitter(BaseSectionSplitter): 

18 """Section splitter for HTML documents with semantic boundary detection.""" 

19 

20 def __init__(self, settings: Settings): 

21 super().__init__(settings) 

22 

23 # Get strategy-specific configuration 

24 self.html_config = settings.global_config.chunking.strategies.html 

25 self.simple_parsing_threshold = self.html_config.simple_parsing_threshold 

26 self.max_html_size_for_parsing = self.html_config.max_html_size_for_parsing 

27 self.preserve_semantic_structure = self.html_config.preserve_semantic_structure 

28 

29 # Initialize HTML document parser for semantic analysis 

30 self.document_parser = HTMLDocumentParser() 

31 

32 # Performance limits 

33 self.max_sections_to_process = 200 

34 self.max_recursion_depth = 10 

35 

36 def split_sections( 

37 self, content: str, document: Document | None = None 

38 ) -> list[dict[str, Any]]: 

39 """Split HTML content into semantic sections.""" 

40 if not content.strip(): 

41 return [] 

42 

43 # Performance check: use simple parsing for very large files 

44 if len(content) > self.max_html_size_for_parsing: 

45 return self._simple_html_split(content) 

46 

47 try: 

48 # Use semantic parsing for manageable files 

49 if ( 

50 len(content) <= self.simple_parsing_threshold 

51 and self.preserve_semantic_structure 

52 ): 

53 sections = self._semantic_html_split(content) 

54 else: 

55 sections = self._simple_html_split(content) 

56 

57 if not sections: 

58 return self._fallback_split(content) 

59 

60 # Merge small sections and split large ones 

61 merged_sections = self._merge_small_sections(sections) 

62 final_sections = self._split_large_sections(merged_sections) 

63 

64 return final_sections[: self.max_chunks_per_document] 

65 

66 except Exception: 

67 # Fallback to simple text-based splitting 

68 return self._fallback_split(content) 

69 

70 def _semantic_html_split(self, content: str) -> list[dict[str, Any]]: 

71 """Split HTML using semantic structure analysis.""" 

72 try: 

73 soup = BeautifulSoup(content, "html.parser") 

74 

75 # Remove script and style elements for cleaner processing 

76 for script in soup(["script", "style"]): 

77 script.decompose() 

78 

79 sections = [] 

80 section_count = 0 

81 

82 def process_element(element, level=0, parent_path=""): 

83 nonlocal section_count 

84 

85 # Performance limits 

86 if section_count >= self.max_sections_to_process: 

87 return 

88 if level > self.max_recursion_depth: 

89 return 

90 

91 if isinstance(element, Tag): 

92 tag_name = element.name.lower() 

93 

94 # Check if this is a meaningful semantic element 

95 if self._is_meaningful_element(element, tag_name): 

96 text_content = element.get_text(strip=True) 

97 

98 # Skip empty or very small sections 

99 if len(text_content) < 10: 

100 return 

101 

102 # Build DOM path for context 

103 current_path = ( 

104 f"{parent_path}/{tag_name}" if parent_path else tag_name 

105 ) 

106 if element.get("id"): 

107 current_path += f"#{element.get('id')}" 

108 elif element.get("class"): 

109 classes = " ".join(element.get("class", [])) 

110 current_path += f".{classes.replace(' ', '.')}" 

111 

112 # Extract section metadata 

113 section_metadata = ( 

114 self.document_parser.extract_section_metadata(element) 

115 ) 

116 

117 # Add HTML-specific context 

118 section_metadata.update( 

119 { 

120 "content": str(element), 

121 "dom_path": current_path, 

122 "depth_level": level, 

123 "parent_path": parent_path, 

124 "text_content": text_content, 

125 "element_position": section_count, 

126 } 

127 ) 

128 

129 sections.append(section_metadata) 

130 section_count += 1 

131 

132 # Don't process children of certain container elements to avoid duplication 

133 if tag_name in self.document_parser.section_elements: 

134 return 

135 

136 # Process children with depth limit 

137 if hasattr(element, "children") and level < self.max_recursion_depth: 

138 for child in element.children: 

139 process_element( 

140 child, 

141 level + 1, 

142 current_path if isinstance(element, Tag) else parent_path, 

143 ) 

144 

145 # Start processing from body or root 

146 body = soup.find("body") 

147 if body: 

148 process_element(body) 

149 else: 

150 process_element(soup) 

151 

152 return sections 

153 

154 except Exception: 

155 # Fallback to simple parsing 

156 return self._simple_html_split(content) 

157 

158 def _simple_html_split(self, content: str) -> list[dict[str, Any]]: 

159 """Simple HTML splitting for large files or when semantic parsing fails.""" 

160 try: 

161 soup = BeautifulSoup(content, "html.parser") 

162 

163 # Remove script and style elements 

164 for script in soup(["script", "style"]): 

165 script.decompose() 

166 

167 # Get clean text 

168 text = soup.get_text(separator="\n", strip=True) 

169 

170 # Split into chunks by size 

171 sections = [] 

172 chunks = self._split_text_by_size(text, self.chunk_size) 

173 

174 for i, chunk in enumerate(chunks): 

175 section = { 

176 "content": chunk, 

177 "text_content": chunk, 

178 "tag_name": "div", 

179 "section_type": SectionType.DIV.value, 

180 "level": 0, 

181 "attributes": {}, 

182 "dom_path": f"body/div[{i}]", 

183 "depth_level": 1, 

184 "parent_path": "body", 

185 "element_position": i, 

186 "word_count": len(chunk.split()), 

187 "char_count": len(chunk), 

188 "parsing_method": "simple", 

189 } 

190 sections.append(section) 

191 

192 return sections 

193 

194 except Exception: 

195 return self._fallback_split(content) 

196 

197 def _fallback_split(self, content: str) -> list[dict[str, Any]]: 

198 """Ultimate fallback: treat as plain text.""" 

199 chunks = self._split_text_by_size(content, self.chunk_size) 

200 

201 sections = [] 

202 for i, chunk in enumerate(chunks): 

203 section = { 

204 "content": chunk, 

205 "text_content": chunk, 

206 "tag_name": "div", 

207 "section_type": SectionType.DIV.value, 

208 "level": 0, 

209 "attributes": {}, 

210 "dom_path": f"fallback/div[{i}]", 

211 "depth_level": 0, 

212 "parent_path": "", 

213 "element_position": i, 

214 "word_count": len(chunk.split()), 

215 "char_count": len(chunk), 

216 "parsing_method": "fallback", 

217 } 

218 sections.append(section) 

219 

220 return sections 

221 

222 def _is_meaningful_element(self, element: Tag, tag_name: str) -> bool: 

223 """Check if an HTML element is meaningful for chunking.""" 

224 # Always include semantic HTML5 elements 

225 if tag_name in self.document_parser.section_elements: 

226 return True 

227 

228 # Include headings 

229 if tag_name in self.document_parser.heading_elements: 

230 return True 

231 

232 # Include block-level content elements 

233 if tag_name in self.document_parser.block_elements: 

234 return True 

235 

236 # Include elements with meaningful content 

237 text_content = element.get_text(strip=True) 

238 if len(text_content) >= 50: # Minimum meaningful content 

239 return True 

240 

241 # Include elements with specific roles or IDs 

242 if element.get("role") or element.get("id"): 

243 return True 

244 

245 return False 

246 

247 def _merge_small_sections( 

248 self, sections: list[dict[str, Any]] 

249 ) -> list[dict[str, Any]]: 

250 """Merge small adjacent sections for better chunk utilization.""" 

251 if not sections: 

252 return [] 

253 

254 merged = [] 

255 current_group = [] 

256 current_size = 0 

257 min_section_size = 100 # Minimum size for standalone sections 

258 

259 for section in sections: 

260 section_size = len(section.get("text_content", "")) 

261 

262 # Large sections or important semantic elements should stand alone 

263 if ( 

264 section_size >= min_section_size 

265 or section.get("tag_name") in self.document_parser.section_elements 

266 or section.get("tag_name") in self.document_parser.heading_elements 

267 ): 

268 # First, process any accumulated small sections 

269 if current_group: 

270 merged_section = self._create_merged_section(current_group) 

271 merged.append(merged_section) 

272 current_group = [] 

273 current_size = 0 

274 

275 # Add the large/important section 

276 merged.append(section) 

277 else: 

278 # Accumulate small sections 

279 current_group.append(section) 

280 current_size += section_size 

281 

282 # If accumulated size is sufficient, create a merged section 

283 if current_size >= min_section_size: 

284 merged_section = self._create_merged_section(current_group) 

285 merged.append(merged_section) 

286 current_group = [] 

287 current_size = 0 

288 

289 # Handle remaining small sections 

290 if current_group: 

291 merged_section = self._create_merged_section(current_group) 

292 merged.append(merged_section) 

293 

294 return merged 

295 

296 def _create_merged_section(self, sections: list[dict[str, Any]]) -> dict[str, Any]: 

297 """Create a merged section from multiple small sections.""" 

298 if not sections: 

299 return {} 

300 

301 if len(sections) == 1: 

302 return sections[0] 

303 

304 # Merge content and metadata 

305 merged_content = "\n\n".join(section.get("content", "") for section in sections) 

306 merged_text = "\n\n".join( 

307 section.get("text_content", "") for section in sections 

308 ) 

309 

310 # Build combined DOM path 

311 paths = [section.get("dom_path", "") for section in sections] 

312 merged_path = f"merged[{','.join(paths[:3])}{'...' if len(paths) > 3 else ''}]" 

313 

314 # Use the first section as base and update 

315 merged_section = sections[0].copy() 

316 merged_section.update( 

317 { 

318 "content": merged_content, 

319 "text_content": merged_text, 

320 "tag_name": "div", # Generic container 

321 "section_type": SectionType.DIV.value, 

322 "dom_path": merged_path, 

323 "word_count": len(merged_text.split()), 

324 "char_count": len(merged_text), 

325 "merged_sections_count": len(sections), 

326 "is_merged": True, 

327 } 

328 ) 

329 

330 return merged_section 

331 

332 def _split_large_sections( 

333 self, sections: list[dict[str, Any]] 

334 ) -> list[dict[str, Any]]: 

335 """Split sections that are too large into smaller parts.""" 

336 final_sections = [] 

337 

338 for section in sections: 

339 content_size = len(section.get("content", "")) 

340 

341 if content_size > self.chunk_size: 

342 # Split large sections 

343 split_parts = self._split_large_content( 

344 section.get("content", ""), self.chunk_size 

345 ) 

346 

347 for i, part in enumerate(split_parts): 

348 split_section = section.copy() 

349 split_section.update( 

350 { 

351 "content": part, 

352 "text_content": self._extract_text_from_html(part), 

353 "dom_path": f"{section.get('dom_path', 'unknown')}[part-{i+1}]", 

354 "word_count": len(part.split()), 

355 "char_count": len(part), 

356 "is_split": True, 

357 "split_part": i + 1, 

358 "total_split_parts": len(split_parts), 

359 } 

360 ) 

361 final_sections.append(split_section) 

362 else: 

363 final_sections.append(section) 

364 

365 return final_sections 

366 

367 def _split_large_content(self, content: str, max_size: int) -> list[str]: 

368 """Split large HTML content while preserving structure where possible.""" 

369 if len(content) <= max_size: 

370 return [content] 

371 

372 try: 

373 # Try to split by HTML structure first 

374 soup = BeautifulSoup(content, "html.parser") 

375 parts = [] 

376 current_part = "" 

377 

378 # Process top-level elements 

379 for element in soup.children: 

380 element_str = str(element) 

381 

382 if len(current_part) + len(element_str) <= max_size: 

383 current_part += element_str 

384 else: 

385 if current_part: 

386 parts.append(current_part) 

387 current_part = element_str 

388 

389 # If single element is too large, split it by text 

390 if len(current_part) > max_size: 

391 text_parts = self._split_text_by_size(current_part, max_size) 

392 parts.extend(text_parts[:-1]) # Add all but last 

393 current_part = text_parts[-1] if text_parts else "" 

394 

395 # Limit number of parts 

396 if len(parts) >= 10: 

397 break 

398 

399 if current_part: 

400 parts.append(current_part) 

401 

402 return parts 

403 

404 except Exception: 

405 # Fallback to simple text splitting 

406 return self._split_text_by_size(content, max_size) 

407 

408 def _split_text_by_size(self, text: str, max_size: int) -> list[str]: 

409 """Split text by size with word boundaries.""" 

410 if len(text) <= max_size: 

411 return [text] 

412 

413 parts = [] 

414 current_part = "" 

415 

416 # Split by paragraphs first 

417 paragraphs = re.split(r"\n\s*\n", text) 

418 

419 for para in paragraphs: 

420 if len(current_part) + len(para) + 2 <= max_size: # +2 for \n\n 

421 current_part += para + "\n\n" 

422 else: 

423 if current_part: 

424 parts.append(current_part.strip()) 

425 

426 # If single paragraph is too large, split by sentences 

427 if len(para) > max_size: 

428 sentences = re.split(r"(?<=[.!?])\s+", para) 

429 current_part = "" 

430 

431 for sentence in sentences: 

432 if len(current_part) + len(sentence) + 1 <= max_size: 

433 current_part += sentence + " " 

434 else: 

435 if current_part: 

436 parts.append(current_part.strip()) 

437 current_part = sentence + " " 

438 else: 

439 current_part = para + "\n\n" 

440 

441 # Limit number of parts 

442 if len(parts) >= 20: 

443 break 

444 

445 if current_part: 

446 parts.append(current_part.strip()) 

447 

448 return parts 

449 

450 def _extract_text_from_html(self, html_content: str) -> str: 

451 """Extract clean text from HTML content.""" 

452 try: 

453 soup = BeautifulSoup(html_content, "html.parser") 

454 return soup.get_text(separator=" ", strip=True) 

455 except Exception: 

456 # Fallback: remove HTML tags with regex 

457 text = re.sub(r"<[^>]+>", "", html_content) 

458 return re.sub(r"\s+", " ", text).strip()