Coverage for src/qdrant_loader/core/chunking/strategy/html_strategy.py: 97%

351 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""HTML-specific chunking strategy.""" 

2 

3import re 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import Any, Optional 

7 

8import structlog 

9from bs4 import BeautifulSoup, Tag 

10 

11from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

12from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

13from qdrant_loader.core.document import Document 

14from qdrant_loader.config import Settings 

15 

16logger = structlog.get_logger(__name__) 

17 

18# Performance constants to prevent timeouts 

19MAX_HTML_SIZE_FOR_PARSING = 500_000 # 500KB limit for complex HTML parsing 

20MAX_SECTIONS_TO_PROCESS = 200 # Limit number of sections to prevent timeouts 

21MAX_CHUNK_SIZE_FOR_NLP = 20_000 # 20KB limit for NLP processing on chunks 

22SIMPLE_PARSING_THRESHOLD = 100_000 # Use simple parsing for files larger than 100KB 

23 

24 

25class SectionType(Enum): 

26 """Types of sections in an HTML document.""" 

27 

28 HEADER = "header" 

29 ARTICLE = "article" 

30 SECTION = "section" 

31 NAV = "nav" 

32 ASIDE = "aside" 

33 MAIN = "main" 

34 PARAGRAPH = "paragraph" 

35 LIST = "list" 

36 TABLE = "table" 

37 CODE_BLOCK = "code_block" 

38 BLOCKQUOTE = "blockquote" 

39 DIV = "div" 

40 

41 

42@dataclass 

43class HTMLSection: 

44 """Represents a section in an HTML document.""" 

45 

46 content: str 

47 tag_name: str 

48 level: int = 0 

49 type: SectionType = SectionType.DIV 

50 parent: Optional["HTMLSection"] = None 

51 children: list["HTMLSection"] = field(default_factory=list) 

52 attributes: dict[str, str] = field(default_factory=dict) 

53 text_content: str = "" 

54 

55 def add_child(self, child: "HTMLSection"): 

56 """Add a child section.""" 

57 self.children.append(child) 

58 child.parent = self 

59 

60 

61class HTMLChunkingStrategy(BaseChunkingStrategy): 

62 """Strategy for chunking HTML documents based on semantic structure. 

63 

64 This strategy splits HTML documents into chunks based on semantic HTML elements, 

65 preserving the document structure and hierarchy. Each chunk includes: 

66 - The semantic element and its content 

67 - Parent element context for hierarchy 

68 - Element-specific metadata (tag, attributes, etc.) 

69 - Semantic analysis results 

70 """ 

71 

72 def __init__(self, settings: Settings): 

73 """Initialize the HTML chunking strategy. 

74 

75 Args: 

76 settings: Configuration settings 

77 """ 

78 super().__init__(settings) 

79 self.logger = logger 

80 self.progress_tracker = ChunkingProgressTracker(logger) 

81 

82 # Note: Semantic analyzer is now handled intelligently in base class 

83 # on a per-chunk basis based on content type and size 

84 

85 # Cache for processed chunks 

86 self._processed_chunks = {} 

87 

88 # Thread pool executor for parallel processing 

89 self._executor = None 

90 

91 # Define semantic HTML elements that should be treated as section boundaries 

92 self.section_elements = { 

93 "article", 

94 "section", 

95 "main", 

96 "header", 

97 "footer", 

98 "nav", 

99 "aside", 

100 } 

101 

102 # Define heading elements for hierarchy 

103 self.heading_elements = {"h1", "h2", "h3", "h4", "h5", "h6"} 

104 

105 # Define block-level elements that can form chunks 

106 self.block_elements = { 

107 "div", 

108 "p", 

109 "blockquote", 

110 "pre", 

111 "ul", 

112 "ol", 

113 "li", 

114 "table", 

115 "figure", 

116 } 

117 

118 def _identify_section_type(self, tag: Tag) -> SectionType: 

119 """Identify the type of section based on the HTML tag. 

120 

121 Args: 

122 tag: The BeautifulSoup tag to analyze 

123 

124 Returns: 

125 SectionType enum indicating the type of section 

126 """ 

127 tag_name = tag.name.lower() 

128 

129 if tag_name in self.heading_elements: 

130 return SectionType.HEADER 

131 elif tag_name == "article": 

132 return SectionType.ARTICLE 

133 elif tag_name == "section": 

134 return SectionType.SECTION 

135 elif tag_name == "nav": 

136 return SectionType.NAV 

137 elif tag_name == "aside": 

138 return SectionType.ASIDE 

139 elif tag_name == "main": 

140 return SectionType.MAIN 

141 elif tag_name in ["ul", "ol", "li"]: 

142 return SectionType.LIST 

143 elif tag_name == "table": 

144 return SectionType.TABLE 

145 elif tag_name in ["pre", "code"]: 

146 return SectionType.CODE_BLOCK 

147 elif tag_name == "blockquote": 

148 return SectionType.BLOCKQUOTE 

149 elif tag_name == "p": 

150 return SectionType.PARAGRAPH 

151 else: 

152 return SectionType.DIV 

153 

154 def _get_heading_level(self, tag: Tag) -> int: 

155 """Get the heading level from an HTML heading tag. 

156 

157 Args: 

158 tag: The heading tag 

159 

160 Returns: 

161 Heading level (1-6) 

162 """ 

163 if tag.name.lower() in self.heading_elements: 

164 return int(tag.name[1]) # Extract number from h1, h2, etc. 

165 return 0 

166 

167 def _extract_section_metadata(self, section: HTMLSection) -> dict[str, Any]: 

168 """Extract metadata from an HTML section. 

169 

170 Args: 

171 section: The section to analyze 

172 

173 Returns: 

174 Dictionary containing section metadata 

175 """ 

176 metadata = { 

177 "type": section.type.value, 

178 "tag_name": section.tag_name, 

179 "level": section.level, 

180 "attributes": section.attributes, 

181 "word_count": len(section.text_content.split()), 

182 "char_count": len(section.text_content), 

183 "has_code": section.type == SectionType.CODE_BLOCK, 

184 "has_links": bool(re.search(r"<a\s+[^>]*href", section.content)), 

185 "has_images": bool(re.search(r"<img\s+[^>]*src", section.content)), 

186 "is_semantic": section.tag_name in self.section_elements, 

187 "is_heading": section.tag_name in self.heading_elements, 

188 } 

189 

190 # Add parent section info if available 

191 if section.parent: 

192 metadata["parent_tag"] = section.parent.tag_name 

193 metadata["parent_type"] = section.parent.type.value 

194 metadata["parent_level"] = section.parent.level 

195 

196 # Add breadcrumb path for hierarchical context (simplified) 

197 breadcrumb = self._build_section_breadcrumb(section) 

198 if breadcrumb: 

199 metadata["breadcrumb"] = breadcrumb 

200 

201 return metadata 

202 

203 def _build_section_breadcrumb(self, section: HTMLSection) -> str: 

204 """Build a breadcrumb path of section titles to capture hierarchy. 

205 

206 Args: 

207 section: The section to build breadcrumb for 

208 

209 Returns: 

210 String representing the hierarchical path 

211 """ 

212 breadcrumb_parts = [] 

213 current = section.parent 

214 depth = 0 

215 

216 # Limit breadcrumb depth to prevent performance issues 

217 while current and depth < 5: 

218 if ( 

219 current.tag_name in self.heading_elements 

220 or current.tag_name in self.section_elements 

221 ): 

222 title = self._extract_title_from_content(current.text_content) 

223 if title and title != "Untitled Section": 

224 breadcrumb_parts.append(title) 

225 current = current.parent 

226 depth += 1 

227 

228 return " > ".join(reversed(breadcrumb_parts)) 

229 

230 def _extract_title_from_content(self, content: str) -> str: 

231 """Extract a title from content text. 

232 

233 Args: 

234 content: Text content to extract title from 

235 

236 Returns: 

237 Extracted title or "Untitled Section" 

238 """ 

239 if not content: 

240 return "Untitled Section" 

241 

242 # Take first line or first 50 characters, whichever is shorter 

243 lines = content.strip().split("\n") 

244 first_line = lines[0].strip() if lines else "" 

245 

246 if first_line: 

247 # Limit title length for performance 

248 return first_line[:100] if len(first_line) > 100 else first_line 

249 

250 return "Untitled Section" 

251 

252 def _parse_html_structure(self, html: str) -> list[dict[str, Any]]: 

253 """Parse HTML structure into semantic sections with performance optimizations. 

254 

255 Args: 

256 html: HTML content to parse 

257 

258 Returns: 

259 List of section dictionaries 

260 """ 

261 # Performance check: use simple parsing for very large files 

262 if len(html) > MAX_HTML_SIZE_FOR_PARSING: 

263 self.logger.info( 

264 f"HTML too large for complex parsing ({len(html)} bytes), using simple parsing" 

265 ) 

266 return self._simple_html_parse(html) 

267 

268 try: 

269 soup = BeautifulSoup(html, "html.parser") 

270 

271 # Remove script and style elements for cleaner processing 

272 for script in soup(["script", "style"]): 

273 script.decompose() 

274 

275 sections = [] 

276 section_count = 0 

277 

278 def process_element(element, level=0): 

279 nonlocal section_count 

280 

281 # Performance check: limit total sections 

282 if section_count >= MAX_SECTIONS_TO_PROCESS: 

283 return 

284 

285 # Performance check: limit recursion depth 

286 if level > 10: 

287 return 

288 

289 if isinstance(element, Tag): 

290 tag_name = element.name.lower() 

291 

292 # Only process meaningful elements 

293 if ( 

294 tag_name in self.section_elements 

295 or tag_name in self.heading_elements 

296 or tag_name in self.block_elements 

297 ): 

298 text_content = element.get_text(strip=True) 

299 

300 # Skip empty or very small sections 

301 if len(text_content) < 10: 

302 return 

303 

304 section_type = self._identify_section_type(element) 

305 

306 # Get attributes (limited for performance) 

307 attributes = {} 

308 if element.attrs: 

309 # Only keep essential attributes 

310 for attr in ["id", "class", "role"]: 

311 if attr in element.attrs: 

312 attributes[attr] = element.attrs[attr] 

313 

314 section = { 

315 "content": str(element), 

316 "text_content": text_content, 

317 "tag_name": tag_name, 

318 "level": level, 

319 "section_type": section_type, 

320 "attributes": attributes, 

321 "title": self._extract_title_from_content(text_content), 

322 } 

323 

324 sections.append(section) 

325 section_count += 1 

326 

327 # Process children (limited depth) 

328 if hasattr(element, "children") and level < 8: 

329 for child in element.children: 

330 process_element(child, level + 1) 

331 

332 # Start processing from body or root 

333 body = soup.find("body") 

334 if body: 

335 process_element(body) 

336 else: 

337 process_element(soup) 

338 

339 return sections[:MAX_SECTIONS_TO_PROCESS] # Ensure we don't exceed limit 

340 

341 except Exception as e: 

342 self.logger.warning(f"HTML parsing failed: {e}") 

343 return self._simple_html_parse(html) 

344 

345 def _simple_html_parse(self, html: str) -> list[dict[str, Any]]: 

346 """Simple HTML parsing for large files or when complex parsing fails. 

347 

348 Args: 

349 html: HTML content to parse 

350 

351 Returns: 

352 List of simple section dictionaries 

353 """ 

354 try: 

355 soup = BeautifulSoup(html, "html.parser") 

356 

357 # Remove script and style elements 

358 for script in soup(["script", "style"]): 

359 script.decompose() 

360 

361 # Get clean text 

362 text = soup.get_text(separator="\n", strip=True) 

363 

364 # Simple chunking by size 

365 chunk_size = self.chunk_size 

366 chunks = [] 

367 

368 # Split by paragraphs first 

369 paragraphs = re.split(r"\n\s*\n", text) 

370 current_chunk = "" 

371 

372 for para in paragraphs: 

373 if len(current_chunk) + len(para) <= chunk_size: 

374 current_chunk += para + "\n\n" 

375 else: 

376 if current_chunk: 

377 chunks.append(current_chunk.strip()) 

378 current_chunk = para + "\n\n" 

379 

380 # Limit total chunks 

381 if len(chunks) >= MAX_SECTIONS_TO_PROCESS: 

382 break 

383 

384 # Add the last chunk if not empty 

385 if current_chunk and len(chunks) < MAX_SECTIONS_TO_PROCESS: 

386 chunks.append(current_chunk.strip()) 

387 

388 # Convert to section format 

389 sections = [] 

390 for _i, chunk in enumerate(chunks): 

391 section = { 

392 "content": chunk, 

393 "text_content": chunk, 

394 "tag_name": "div", 

395 "level": 0, 

396 "section_type": SectionType.DIV, 

397 "attributes": {}, 

398 "title": self._extract_title_from_content(chunk), 

399 } 

400 sections.append(section) 

401 

402 return sections 

403 

404 except Exception as e: 

405 self.logger.error(f"Simple HTML parsing failed: {e}") 

406 # Ultimate fallback: return the entire content as one section 

407 return [ 

408 { 

409 "content": html, 

410 "text_content": html, 

411 "tag_name": "div", 

412 "level": 0, 

413 "section_type": SectionType.DIV, 

414 "attributes": {}, 

415 "title": "HTML Document", 

416 } 

417 ] 

418 

419 def _merge_small_sections( 

420 self, sections: list[dict[str, Any]] 

421 ) -> list[dict[str, Any]]: 

422 """Merge small sections to create more meaningful chunks. 

423 

424 Args: 

425 sections: List of sections to merge 

426 

427 Returns: 

428 List of merged sections 

429 """ 

430 if not sections: 

431 return [] 

432 

433 merged = [] 

434 current_group = [] 

435 current_size = 0 

436 min_size = 200 # Minimum size for standalone sections 

437 

438 for section in sections: 

439 section_size = len(section.get("text_content", "")) 

440 

441 # If section is large enough or is a significant element, keep it separate 

442 if ( 

443 section_size >= min_size 

444 or section.get("tag_name") in self.section_elements 

445 or section.get("tag_name") in self.heading_elements 

446 ): 

447 

448 # First, add any accumulated small sections 

449 if current_group: 

450 merged_section = self._create_merged_section(current_group) 

451 merged.append(merged_section) 

452 current_group = [] 

453 current_size = 0 

454 

455 # Add the large section 

456 merged.append(section) 

457 else: 

458 # Accumulate small sections 

459 current_group.append(section) 

460 current_size += section_size 

461 

462 # If accumulated size is large enough, create a merged section 

463 if current_size >= min_size: 

464 merged_section = self._create_merged_section(current_group) 

465 merged.append(merged_section) 

466 current_group = [] 

467 current_size = 0 

468 

469 # Handle remaining small sections 

470 if current_group: 

471 merged_section = self._create_merged_section(current_group) 

472 merged.append(merged_section) 

473 

474 return merged 

475 

476 def _create_merged_section(self, sections: list[dict[str, Any]]) -> dict[str, Any]: 

477 """Create a merged section from a list of small sections. 

478 

479 Args: 

480 sections: List of sections to merge 

481 

482 Returns: 

483 Merged section dictionary 

484 """ 

485 if not sections: 

486 return {} 

487 

488 if len(sections) == 1: 

489 return sections[0] 

490 

491 # Merge content 

492 merged_content = "\n\n".join(section.get("content", "") for section in sections) 

493 merged_text = "\n\n".join( 

494 section.get("text_content", "") for section in sections 

495 ) 

496 

497 # Use the first section's metadata as base 

498 merged_section = sections[0].copy() 

499 merged_section.update( 

500 { 

501 "content": merged_content, 

502 "text_content": merged_text, 

503 "title": f"Merged Section ({len(sections)} parts)", 

504 "tag_name": "div", # Generic container 

505 "section_type": SectionType.DIV, 

506 } 

507 ) 

508 

509 return merged_section 

510 

511 def _split_text(self, html: str) -> list[dict[str, Any]]: 

512 """Split HTML text into chunks based on semantic structure. 

513 

514 Args: 

515 html: The HTML content to split 

516 

517 Returns: 

518 List of dictionaries with chunk content and metadata 

519 """ 

520 # Performance check: use simple parsing for large files 

521 if len(html) > SIMPLE_PARSING_THRESHOLD: 

522 self.logger.info( 

523 f"Using simple parsing for large HTML file ({len(html)} bytes)" 

524 ) 

525 return self._simple_html_parse(html) 

526 

527 # Parse HTML structure 

528 sections = self._parse_html_structure(html) 

529 

530 if not sections: 

531 return self._simple_html_parse(html) 

532 

533 # Merge small sections 

534 merged_sections = self._merge_small_sections(sections) 

535 

536 # Split large sections if needed 

537 final_sections = [] 

538 for section in merged_sections: 

539 content_size = len(section.get("content", "")) 

540 if content_size > self.chunk_size: 

541 # Split large sections 

542 split_parts = self._split_large_section( 

543 section.get("content", ""), self.chunk_size 

544 ) 

545 for i, part in enumerate(split_parts): 

546 split_section = section.copy() 

547 split_section.update( 

548 { 

549 "content": part, 

550 "text_content": part, 

551 "title": f"{section.get('title', 'Section')} (Part {i+1})", 

552 } 

553 ) 

554 final_sections.append(split_section) 

555 else: 

556 final_sections.append(section) 

557 

558 return final_sections[:MAX_SECTIONS_TO_PROCESS] # Ensure we don't exceed limit 

559 

560 def _split_large_section(self, content: str, max_size: int) -> list[str]: 

561 """Split a large section into smaller parts. 

562 

563 Args: 

564 content: Content to split 

565 max_size: Maximum size per part 

566 

567 Returns: 

568 List of content parts 

569 """ 

570 if len(content) <= max_size: 

571 return [content] 

572 

573 # Simple splitting by size with word boundaries 

574 parts = [] 

575 current_part = "" 

576 words = content.split() 

577 

578 for word in words: 

579 if len(current_part) + len(word) + 1 <= max_size: 

580 current_part += word + " " 

581 else: 

582 if current_part: 

583 parts.append(current_part.strip()) 

584 current_part = word + " " 

585 

586 # Limit number of parts 

587 if len(parts) >= 10: 

588 break 

589 

590 if current_part: 

591 parts.append(current_part.strip()) 

592 

593 return parts 

594 

595 def _extract_section_title(self, chunk: str) -> str: 

596 """Extract a title from a chunk of HTML content. 

597 

598 Args: 

599 chunk: HTML chunk content 

600 

601 Returns: 

602 Extracted title 

603 """ 

604 try: 

605 soup = BeautifulSoup(chunk, "html.parser") 

606 

607 # Try to find title in various elements 

608 for tag in ["h1", "h2", "h3", "h4", "h5", "h6", "title"]: 

609 element = soup.find(tag) 

610 if element: 

611 title = element.get_text(strip=True) 

612 if title: 

613 return title[:100] # Limit title length 

614 

615 # Try to find text in semantic elements 

616 for tag in ["article", "section", "main"]: 

617 element = soup.find(tag) 

618 if element: 

619 text = element.get_text(strip=True) 

620 if text: 

621 return self._extract_title_from_content(text) 

622 

623 # Fallback to first text content 

624 text = soup.get_text(strip=True) 

625 if text: 

626 return self._extract_title_from_content(text) 

627 

628 return "Untitled Section" 

629 

630 except Exception: 

631 return "Untitled Section" 

632 

633 def chunk_document(self, document: Document) -> list[Document]: 

634 """Chunk an HTML document using semantic boundaries. 

635 

636 Args: 

637 document: The document to chunk 

638 

639 Returns: 

640 List of chunked documents 

641 """ 

642 file_name = ( 

643 document.metadata.get("file_name") 

644 or document.metadata.get("original_filename") 

645 or document.title 

646 or f"{document.source_type}:{document.source}" 

647 ) 

648 

649 # Start progress tracking 

650 self.progress_tracker.start_chunking( 

651 document.id, 

652 document.source, 

653 document.source_type, 

654 len(document.content), 

655 file_name, 

656 ) 

657 

658 try: 

659 # Check for very large files that should use fallback chunking 

660 if len(document.content) > MAX_HTML_SIZE_FOR_PARSING: 

661 self.logger.info( 

662 f"HTML file too large ({len(document.content)} bytes), using fallback chunking" 

663 ) 

664 self.progress_tracker.log_fallback( 

665 document.id, f"Large HTML file ({len(document.content)} bytes)" 

666 ) 

667 return self._fallback_chunking(document) 

668 

669 # Parse HTML and extract semantic sections 

670 self.logger.debug("Parsing HTML structure") 

671 sections = self._split_text(document.content) 

672 

673 if not sections: 

674 self.progress_tracker.finish_chunking(document.id, 0, "html") 

675 return [] 

676 

677 # Create chunk documents 

678 chunked_docs = [] 

679 for i, section in enumerate(sections): 

680 chunk_content = section["content"] 

681 self.logger.debug( 

682 f"Processing HTML section {i+1}/{len(sections)}", 

683 extra={ 

684 "chunk_size": len(chunk_content), 

685 "section_type": section.get("section_type", "unknown"), 

686 "tag_name": section.get("tag_name", "unknown"), 

687 }, 

688 ) 

689 

690 # Create chunk document with enhanced metadata 

691 chunk_doc = self._create_chunk_document( 

692 original_doc=document, 

693 chunk_content=chunk_content, 

694 chunk_index=i, 

695 total_chunks=len(sections), 

696 skip_nlp=False, 

697 ) 

698 

699 # Add HTML-specific metadata 

700 chunk_doc.metadata.update(section) 

701 chunk_doc.metadata["chunking_strategy"] = "html" 

702 chunk_doc.metadata["parent_document_id"] = document.id 

703 

704 chunked_docs.append(chunk_doc) 

705 

706 # Finish progress tracking 

707 self.progress_tracker.finish_chunking( 

708 document.id, len(chunked_docs), "html" 

709 ) 

710 return chunked_docs 

711 

712 except Exception as e: 

713 self.progress_tracker.log_error(document.id, str(e)) 

714 # Fallback to default chunking 

715 self.progress_tracker.log_fallback( 

716 document.id, f"HTML parsing failed: {str(e)}" 

717 ) 

718 return self._fallback_chunking(document) 

719 

720 def _fallback_chunking(self, document: Document) -> list[Document]: 

721 """Simple fallback chunking when the main strategy fails. 

722 

723 Args: 

724 document: Document to chunk 

725 

726 Returns: 

727 List of chunked documents 

728 """ 

729 self.logger.info("Using fallback chunking strategy for HTML document") 

730 

731 try: 

732 # Clean HTML and convert to text for simple chunking 

733 soup = BeautifulSoup(document.content, "html.parser") 

734 

735 # Remove script and style elements 

736 for script in soup(["script", "style"]): 

737 script.decompose() 

738 

739 text = soup.get_text(separator="\n", strip=True) 

740 

741 # Simple chunking implementation based on fixed size 

742 chunk_size = self.chunk_size 

743 

744 chunks = [] 

745 # Split by paragraphs first 

746 paragraphs = re.split(r"\n\s*\n", text) 

747 current_chunk = "" 

748 

749 for para in paragraphs: 

750 if len(current_chunk) + len(para) <= chunk_size: 

751 current_chunk += para + "\n\n" 

752 else: 

753 if current_chunk: 

754 chunks.append(current_chunk.strip()) 

755 current_chunk = para + "\n\n" 

756 

757 # Limit total chunks 

758 if len(chunks) >= MAX_SECTIONS_TO_PROCESS: 

759 break 

760 

761 # Add the last chunk if not empty 

762 if current_chunk and len(chunks) < MAX_SECTIONS_TO_PROCESS: 

763 chunks.append(current_chunk.strip()) 

764 

765 # Create chunked documents 

766 chunked_docs = [] 

767 valid_chunk_index = 0 

768 for i, chunk_content in enumerate(chunks): 

769 # Validate chunk content 

770 if not chunk_content or not chunk_content.strip(): 

771 self.logger.warning(f"Skipping empty fallback chunk {i+1}") 

772 continue 

773 

774 # Use base class chunk creation 

775 chunk_doc = self._create_chunk_document( 

776 original_doc=document, 

777 chunk_content=chunk_content, 

778 chunk_index=valid_chunk_index, 

779 total_chunks=len(chunks), # Will be updated at the end 

780 skip_nlp=False, # Let base class decide 

781 ) 

782 

783 # Generate unique chunk ID 

784 chunk_doc.id = Document.generate_chunk_id( 

785 document.id, valid_chunk_index 

786 ) 

787 chunk_doc.metadata["parent_document_id"] = document.id 

788 chunk_doc.metadata["chunking_method"] = "fallback_html" 

789 

790 chunked_docs.append(chunk_doc) 

791 valid_chunk_index += 1 

792 

793 # Update total_chunks in all chunk metadata to reflect actual count 

794 for chunk_doc in chunked_docs: 

795 chunk_doc.metadata["total_chunks"] = len(chunked_docs) 

796 

797 return chunked_docs 

798 

799 except Exception as e: 

800 self.logger.error(f"Fallback chunking failed: {e}") 

801 # Ultimate fallback: return original document as single chunk 

802 chunk_doc = Document( 

803 content=document.content, 

804 metadata=document.metadata.copy(), 

805 source=document.source, 

806 source_type=document.source_type, 

807 url=document.url, 

808 title=document.title, 

809 content_type=document.content_type, 

810 ) 

811 chunk_doc.id = Document.generate_chunk_id(document.id, 0) 

812 chunk_doc.metadata.update( 

813 { 

814 "chunk_index": 0, 

815 "total_chunks": 1, 

816 "parent_document_id": document.id, 

817 "chunking_method": "fallback_single", 

818 "entities": [], 

819 "pos_tags": [], 

820 "nlp_skipped": True, 

821 "skip_reason": "fallback_error", 

822 } 

823 ) 

824 return [chunk_doc] 

825 

826 def __del__(self): 

827 """Cleanup method.""" 

828 # Call shutdown to clean up resources 

829 self.shutdown() 

830 

831 def shutdown(self): 

832 """Shutdown the strategy and clean up resources.""" 

833 # Shutdown thread pool executor if it exists 

834 if hasattr(self, "_executor") and self._executor: 

835 self._executor.shutdown(wait=True) 

836 self._executor = None 

837 

838 # Clean up any cached data 

839 if hasattr(self, "_processed_chunks"): 

840 self._processed_chunks.clear() 

841 

842 # Note: semantic_analyzer is now handled in base class 

843 # No additional cleanup needed for HTML strategy