Coverage for src/qdrant_loader/core/chunking/strategy/markdown_strategy.py: 85%

333 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Markdown-specific chunking strategy.""" 

2 

3import concurrent.futures 

4import re 

5from dataclasses import dataclass, field 

6from enum import Enum 

7from typing import TYPE_CHECKING, Any, Optional 

8 

9import structlog 

10 

11from qdrant_loader.config import Settings 

12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

13from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

14from qdrant_loader.core.document import Document 

15from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer 

16 

17if TYPE_CHECKING: 

18 from qdrant_loader.config import Settings 

19 

20logger = structlog.get_logger(__name__) 

21 

22 

23class SectionType(Enum): 

24 """Types of sections in a markdown document.""" 

25 

26 HEADER = "header" 

27 CODE_BLOCK = "code_block" 

28 LIST = "list" 

29 TABLE = "table" 

30 QUOTE = "quote" 

31 PARAGRAPH = "paragraph" 

32 

33 

34@dataclass 

35class Section: 

36 """Represents a section in a markdown document.""" 

37 

38 content: str 

39 level: int = 0 

40 type: SectionType = SectionType.PARAGRAPH 

41 parent: Optional["Section"] = None 

42 children: list["Section"] = field(default_factory=list) 

43 

44 def add_child(self, child: "Section"): 

45 """Add a child section.""" 

46 self.children.append(child) 

47 child.parent = self 

48 

49 

50class MarkdownChunkingStrategy(BaseChunkingStrategy): 

51 """Strategy for chunking markdown documents based on sections. 

52 

53 This strategy splits markdown documents into chunks based on section headers, 

54 preserving the document structure and hierarchy. Each chunk includes: 

55 - The section header and its content 

56 - Parent section headers for context 

57 - Section-specific metadata 

58 - Semantic analysis results 

59 """ 

60 

61 def __init__(self, settings: Settings): 

62 """Initialize the Markdown chunking strategy. 

63 

64 Args: 

65 settings: Configuration settings 

66 """ 

67 super().__init__(settings) 

68 self.progress_tracker = ChunkingProgressTracker(logger) 

69 

70 # Initialize semantic analyzer 

71 self.semantic_analyzer = SemanticAnalyzer( 

72 spacy_model="en_core_web_sm", 

73 num_topics=settings.global_config.semantic_analysis.num_topics, 

74 passes=settings.global_config.semantic_analysis.lda_passes, 

75 ) 

76 

77 # Cache for processed chunks to avoid recomputation 

78 self._processed_chunks: dict[str, dict[str, Any]] = {} 

79 

80 # Initialize thread pool for parallel processing 

81 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) 

82 

83 def _identify_section_type(self, content: str) -> SectionType: 

84 """Identify the type of section based on its content. 

85 

86 Args: 

87 content: The section content to analyze 

88 

89 Returns: 

90 SectionType enum indicating the type of section 

91 """ 

92 if re.match(r"^#{1,6}\s+", content): 

93 return SectionType.HEADER 

94 elif re.match(r"^```", content): 

95 return SectionType.CODE_BLOCK 

96 elif re.match(r"^[*-]\s+", content): 

97 return SectionType.LIST 

98 elif re.match(r"^\|", content): 

99 return SectionType.TABLE 

100 elif re.match(r"^>", content): 

101 return SectionType.QUOTE 

102 return SectionType.PARAGRAPH 

103 

104 def _extract_section_metadata(self, section: Section) -> dict[str, Any]: 

105 """Extract metadata from a section. 

106 

107 Args: 

108 section: The section to analyze 

109 

110 Returns: 

111 Dictionary containing section metadata 

112 """ 

113 metadata = { 

114 "type": section.type.value, 

115 "level": section.level, 

116 "word_count": len(section.content.split()), 

117 "char_count": len(section.content), 

118 "has_code": bool(re.search(r"```", section.content)), 

119 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", section.content)), 

120 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", section.content)), 

121 "is_top_level": section.level <= 2, # Mark top-level sections 

122 } 

123 

124 # Add parent section info if available 

125 if section.parent: 

126 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.parent.content) 

127 if header_match: 

128 parent_title = header_match.group(2).strip() 

129 metadata["parent_title"] = parent_title 

130 metadata["parent_level"] = section.parent.level 

131 

132 # Add breadcrumb path for hierarchical context 

133 breadcrumb = self._build_section_breadcrumb(section) 

134 if breadcrumb: 

135 metadata["breadcrumb"] = breadcrumb 

136 

137 return metadata 

138 

139 def _build_section_breadcrumb(self, section: Section) -> str: 

140 """Build a breadcrumb path of section titles to capture hierarchy. 

141 

142 Args: 

143 section: The section to build breadcrumb for 

144 

145 Returns: 

146 String representing the hierarchical path 

147 """ 

148 breadcrumb_parts = [] 

149 current = section 

150 

151 # Walk up the parent chain to build the breadcrumb 

152 while current.parent: 

153 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", current.parent.content) 

154 if header_match: 

155 parent_title = header_match.group(2).strip() 

156 breadcrumb_parts.insert(0, parent_title) 

157 current = current.parent 

158 

159 # Add current section 

160 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.content) 

161 if header_match: 

162 title = header_match.group(2).strip() 

163 breadcrumb_parts.append(title) 

164 

165 return " > ".join(breadcrumb_parts) 

166 

167 def _parse_document_structure(self, text: str) -> list[dict[str, Any]]: 

168 """Parse document into a structured representation. 

169 

170 Args: 

171 text: The document text 

172 

173 Returns: 

174 List of dictionaries representing document elements 

175 """ 

176 elements = [] 

177 lines = text.split("\n") 

178 current_block = [] 

179 in_code_block = False 

180 

181 for line in lines: 

182 # Check for code block markers 

183 if line.startswith("```"): 

184 in_code_block = not in_code_block 

185 current_block.append(line) 

186 continue 

187 

188 # Inside code block, just accumulate lines 

189 if in_code_block: 

190 current_block.append(line) 

191 continue 

192 

193 # Check for headers 

194 header_match = re.match(r"^(#{1,6})\s+(.*?)$", line) 

195 if header_match and not in_code_block: 

196 # If we have a current block, save it 

197 if current_block: 

198 elements.append( 

199 { 

200 "type": "content", 

201 "text": "\n".join(current_block), 

202 "level": 0, 

203 } 

204 ) 

205 current_block = [] 

206 

207 # Save the header 

208 level = len(header_match.group(1)) 

209 elements.append( 

210 { 

211 "type": "header", 

212 "text": line, 

213 "level": level, 

214 "title": header_match.group(2).strip(), 

215 } 

216 ) 

217 else: 

218 current_block.append(line) 

219 

220 # Save the last block if not empty 

221 if current_block: 

222 elements.append( 

223 {"type": "content", "text": "\n".join(current_block), "level": 0} 

224 ) 

225 

226 return elements 

227 

228 def _get_section_path( 

229 self, header_item: dict[str, Any], structure: list[dict[str, Any]] 

230 ) -> list[str]: 

231 """Get the path of parent headers for a section. 

232 

233 Args: 

234 header_item: The header item 

235 structure: The document structure 

236 

237 Returns: 

238 List of parent section titles 

239 """ 

240 path = [] 

241 current_level = header_item["level"] 

242 

243 # Go backward through structure to find parent headers 

244 for item in reversed(structure[: structure.index(header_item)]): 

245 if item["type"] == "header" and item["level"] < current_level: 

246 path.insert(0, item["title"]) 

247 current_level = item["level"] 

248 

249 return path 

250 

251 def _merge_related_sections( 

252 self, sections: list[dict[str, Any]] 

253 ) -> list[dict[str, Any]]: 

254 """Merge small related sections to maintain context. 

255 

256 Args: 

257 sections: List of section dictionaries 

258 

259 Returns: 

260 List of merged section dictionaries 

261 """ 

262 if not sections: 

263 return [] 

264 

265 merged = [] 

266 current_section = sections[0].copy() 

267 min_section_size = 500 # Minimum characters for a standalone section 

268 

269 for i in range(1, len(sections)): 

270 next_section = sections[i] 

271 

272 # If current section is small and next section is a subsection, merge them 

273 if ( 

274 len(current_section["content"]) < min_section_size 

275 and next_section["level"] > current_section["level"] 

276 ): 

277 current_section["content"] += "\n" + next_section["content"] 

278 # Keep other metadata from the parent section 

279 else: 

280 merged.append(current_section) 

281 current_section = next_section.copy() 

282 

283 # Add the last section 

284 merged.append(current_section) 

285 return merged 

286 

287 def _split_text(self, text: str) -> list[dict[str, Any]]: 

288 """Split text into chunks at level 0 (title) and level 1 headers only. 

289 Returns list of dictionaries with chunk text and metadata. 

290 """ 

291 structure = self._parse_document_structure(text) 

292 sections = [] 

293 current_section = None 

294 current_level = None 

295 current_title = None 

296 current_path = [] 

297 

298 for item in structure: 

299 if item["type"] == "header": 

300 level = item["level"] 

301 if level == 1 or (level == 0 and not sections): 

302 # Save previous section if exists 

303 if current_section is not None: 

304 sections.append( 

305 { 

306 "content": current_section, 

307 "level": current_level, 

308 "title": current_title, 

309 "path": list(current_path), 

310 } 

311 ) 

312 # Start new section 

313 current_section = item["text"] + "\n" 

314 current_level = level 

315 current_title = item["title"] 

316 current_path = self._get_section_path(item, structure) 

317 else: 

318 # For deeper headers, just add to current section 

319 if current_section is not None: 

320 current_section += item["text"] + "\n" 

321 else: 

322 if current_section is not None: 

323 current_section += item["text"] + "\n" 

324 else: 

325 # If no section started yet, treat as preamble 

326 current_section = item["text"] + "\n" 

327 current_level = 0 

328 current_title = "Preamble" 

329 current_path = [] 

330 # Add the last section 

331 if current_section is not None: 

332 sections.append( 

333 { 

334 "content": current_section, 

335 "level": current_level, 

336 "title": current_title, 

337 "path": list(current_path), 

338 } 

339 ) 

340 # Ensure each section has proper metadata 

341 for section in sections: 

342 if "level" not in section: 

343 section["level"] = 0 

344 if "title" not in section: 

345 section["title"] = self._extract_section_title(section["content"]) 

346 if "path" not in section: 

347 section["path"] = [] 

348 return sections 

349 

350 def _split_large_section(self, content: str, max_size: int) -> list[str]: 

351 """Split a large section into smaller chunks while preserving markdown structure. 

352 

353 Args: 

354 content: Section content to split 

355 max_size: Maximum chunk size 

356 

357 Returns: 

358 List of content chunks 

359 """ 

360 chunks = [] 

361 current_chunk = "" 

362 

363 # Safety limit to prevent infinite loops 

364 MAX_CHUNKS_PER_SECTION = 100 

365 

366 # Split by paragraphs first 

367 paragraphs = re.split(r"\n\s*\n", content) 

368 

369 for para in paragraphs: 

370 # Safety check 

371 if len(chunks) >= MAX_CHUNKS_PER_SECTION: 

372 logger.warning( 

373 f"Reached maximum chunks per section limit ({MAX_CHUNKS_PER_SECTION}). " 

374 f"Section may be truncated." 

375 ) 

376 break 

377 

378 # If adding this paragraph would exceed max_size 

379 if len(current_chunk) + len(para) + 2 > max_size: # +2 for newlines 

380 # If current chunk is not empty, save it 

381 if current_chunk.strip(): 

382 chunks.append(current_chunk.strip()) 

383 

384 # If paragraph itself is too large, split by sentences 

385 if len(para) > max_size: 

386 sentences = re.split(r"(?<=[.!?])\s+", para) 

387 current_chunk = "" 

388 

389 for sentence in sentences: 

390 # Safety check 

391 if len(chunks) >= MAX_CHUNKS_PER_SECTION: 

392 break 

393 

394 # If sentence itself is too large, split by words 

395 if len(sentence) > max_size: 

396 words = sentence.split() 

397 for word in words: 

398 # Safety check 

399 if len(chunks) >= MAX_CHUNKS_PER_SECTION: 

400 break 

401 

402 # Handle extremely long words by truncating them 

403 if len(word) > max_size: 

404 logger.warning( 

405 f"Word longer than max_size ({len(word)} > {max_size}), truncating: {word[:50]}..." 

406 ) 

407 word = ( 

408 word[: max_size - 10] + "..." 

409 ) # Truncate with ellipsis 

410 

411 if len(current_chunk) + len(word) + 1 > max_size: 

412 if ( 

413 current_chunk.strip() 

414 ): # Only add non-empty chunks 

415 chunks.append(current_chunk.strip()) 

416 current_chunk = word + " " 

417 else: 

418 current_chunk += word + " " 

419 # Normal sentence handling 

420 elif len(current_chunk) + len(sentence) + 1 > max_size: 

421 if current_chunk.strip(): # Only add non-empty chunks 

422 chunks.append(current_chunk.strip()) 

423 current_chunk = sentence + " " 

424 else: 

425 current_chunk += sentence + " " 

426 else: 

427 current_chunk = para + "\n\n" 

428 else: 

429 current_chunk += para + "\n\n" 

430 

431 # Add the last chunk if not empty 

432 if current_chunk.strip(): 

433 chunks.append(current_chunk.strip()) 

434 

435 # Final safety check 

436 if len(chunks) > MAX_CHUNKS_PER_SECTION: 

437 logger.warning( 

438 f"Generated {len(chunks)} chunks for section, limiting to {MAX_CHUNKS_PER_SECTION}" 

439 ) 

440 chunks = chunks[:MAX_CHUNKS_PER_SECTION] 

441 

442 return chunks 

443 

444 def _process_chunk( 

445 self, chunk: str, chunk_index: int, total_chunks: int 

446 ) -> dict[str, Any]: 

447 """Process a single chunk in parallel. 

448 

449 Args: 

450 chunk: The chunk to process 

451 chunk_index: Index of the chunk 

452 total_chunks: Total number of chunks 

453 

454 Returns: 

455 Dictionary containing processing results 

456 """ 

457 logger.debug( 

458 "Processing chunk", 

459 chunk_index=chunk_index, 

460 total_chunks=total_chunks, 

461 chunk_length=len(chunk), 

462 ) 

463 

464 # Check cache first 

465 if chunk in self._processed_chunks: 

466 return self._processed_chunks[chunk] 

467 

468 # Perform semantic analysis 

469 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index) 

470 analysis_result = self.semantic_analyzer.analyze_text( 

471 chunk, doc_id=f"chunk_{chunk_index}" 

472 ) 

473 

474 # Cache results 

475 results = { 

476 "entities": analysis_result.entities, 

477 "pos_tags": analysis_result.pos_tags, 

478 "dependencies": analysis_result.dependencies, 

479 "topics": analysis_result.topics, 

480 "key_phrases": analysis_result.key_phrases, 

481 "document_similarity": analysis_result.document_similarity, 

482 } 

483 self._processed_chunks[chunk] = results 

484 

485 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index) 

486 return results 

487 

488 def _extract_section_title(self, chunk: str) -> str: 

489 """Extract section title from a chunk. 

490 

491 Args: 

492 chunk: The text chunk 

493 

494 Returns: 

495 Section title or default title 

496 """ 

497 # Try to find header at the beginning of the chunk 

498 header_match = re.match(r"^(#{1,6})\s+(.*?)(?:\n|$)", chunk) 

499 if header_match: 

500 return header_match.group(2).strip() 

501 

502 # Try to find the first sentence if no header 

503 first_sentence_match = re.match(r"^([^\.!?]+[\.!?])", chunk) 

504 if first_sentence_match: 

505 title = first_sentence_match.group(1).strip() 

506 # Truncate if too long 

507 if len(title) > 50: 

508 title = title[:50] + "..." 

509 return title 

510 

511 return "Untitled Section" 

512 

513 def shutdown(self): 

514 """Shutdown the thread pool executor.""" 

515 if hasattr(self, "_executor") and self._executor: 

516 self._executor.shutdown(wait=True) 

517 self._executor = None 

518 

519 def chunk_document(self, document: Document) -> list[Document]: 

520 """Chunk a markdown document into semantic sections. 

521 

522 Args: 

523 document: The document to chunk 

524 

525 Returns: 

526 List of chunked documents 

527 """ 

528 file_name = ( 

529 document.metadata.get("file_name") 

530 or document.metadata.get("original_filename") 

531 or document.title 

532 or f"{document.source_type}:{document.source}" 

533 ) 

534 

535 # Start progress tracking 

536 self.progress_tracker.start_chunking( 

537 document.id, 

538 document.source, 

539 document.source_type, 

540 len(document.content), 

541 file_name, 

542 ) 

543 

544 try: 

545 # Split text into semantic chunks 

546 logger.debug("Parsing document structure") 

547 chunks_metadata = self._split_text(document.content) 

548 

549 if not chunks_metadata: 

550 self.progress_tracker.finish_chunking(document.id, 0, "markdown") 

551 return [] 

552 

553 # Safety limit to prevent excessive chunking 

554 MAX_CHUNKS_PER_DOCUMENT = 500 

555 if len(chunks_metadata) > MAX_CHUNKS_PER_DOCUMENT: 

556 logger.warning( 

557 f"Document generated {len(chunks_metadata)} chunks, limiting to {MAX_CHUNKS_PER_DOCUMENT}. " 

558 f"Document may be truncated. Document: {document.title}" 

559 ) 

560 chunks_metadata = chunks_metadata[:MAX_CHUNKS_PER_DOCUMENT] 

561 

562 # Create chunk documents 

563 chunked_docs = [] 

564 for i, chunk_meta in enumerate(chunks_metadata): 

565 chunk_content = chunk_meta["content"] 

566 logger.debug( 

567 f"Processing chunk {i+1}/{len(chunks_metadata)}", 

568 extra={ 

569 "chunk_size": len(chunk_content), 

570 "section_type": chunk_meta.get("section_type", "unknown"), 

571 "level": chunk_meta.get("level", 0), 

572 }, 

573 ) 

574 

575 # Create chunk document with enhanced metadata 

576 chunk_doc = self._create_chunk_document( 

577 original_doc=document, 

578 chunk_content=chunk_content, 

579 chunk_index=i, 

580 total_chunks=len(chunks_metadata), 

581 skip_nlp=False, 

582 ) 

583 

584 # Add markdown-specific metadata 

585 chunk_doc.metadata.update(chunk_meta) 

586 chunk_doc.metadata["chunking_strategy"] = "markdown" 

587 chunk_doc.metadata["parent_document_id"] = document.id 

588 

589 # Add additional metadata fields expected by tests 

590 section_title = chunk_meta.get("title") 

591 if not section_title: 

592 section_title = self._extract_section_title(chunk_content) 

593 chunk_doc.metadata["section_title"] = section_title 

594 chunk_doc.metadata["cross_references"] = self._extract_cross_references( 

595 chunk_content 

596 ) 

597 chunk_doc.metadata["hierarchy"] = self._map_hierarchical_relationships( 

598 chunk_content 

599 ) 

600 chunk_doc.metadata["entities"] = self._extract_entities(chunk_content) 

601 

602 # Add topic analysis 

603 topic_analysis = self._analyze_topic(chunk_content) 

604 chunk_doc.metadata["topic_analysis"] = topic_analysis 

605 

606 logger.debug( 

607 "Created chunk document", 

608 extra={ 

609 "chunk_id": chunk_doc.id, 

610 "chunk_size": len(chunk_content), 

611 "metadata_keys": list(chunk_doc.metadata.keys()), 

612 }, 

613 ) 

614 

615 chunked_docs.append(chunk_doc) 

616 

617 # Finish progress tracking 

618 self.progress_tracker.finish_chunking( 

619 document.id, len(chunked_docs), "markdown" 

620 ) 

621 

622 logger.info( 

623 f"Markdown chunking completed for document: {document.title}", 

624 extra={ 

625 "document_id": document.id, 

626 "total_chunks": len(chunked_docs), 

627 "document_size": len(document.content), 

628 "avg_chunk_size": ( 

629 sum(len(d.content) for d in chunked_docs) // len(chunked_docs) 

630 if chunked_docs 

631 else 0 

632 ), 

633 }, 

634 ) 

635 

636 return chunked_docs 

637 

638 except Exception as e: 

639 self.progress_tracker.log_error(document.id, str(e)) 

640 # Fallback to default chunking 

641 self.progress_tracker.log_fallback( 

642 document.id, f"Markdown parsing failed: {str(e)}" 

643 ) 

644 return self._fallback_chunking(document) 

645 

646 def _fallback_chunking(self, document: Document) -> list[Document]: 

647 """Simple fallback chunking when the main strategy fails. 

648 

649 Args: 

650 document: Document to chunk 

651 

652 Returns: 

653 List of chunked documents 

654 """ 

655 logger.info("Using fallback chunking strategy for document") 

656 

657 # Simple chunking implementation based on fixed size 

658 chunk_size = self.settings.global_config.chunking.chunk_size 

659 

660 text = document.content 

661 chunks = [] 

662 

663 # Split by paragraphs first 

664 paragraphs = re.split(r"\n\s*\n", text) 

665 current_chunk = "" 

666 

667 for para in paragraphs: 

668 if len(current_chunk) + len(para) <= chunk_size: 

669 current_chunk += para + "\n\n" 

670 else: 

671 if current_chunk: 

672 chunks.append(current_chunk.strip()) 

673 current_chunk = para + "\n\n" 

674 

675 # Add the last chunk if not empty 

676 if current_chunk: 

677 chunks.append(current_chunk.strip()) 

678 

679 # Create chunked documents 

680 chunked_docs = [] 

681 for i, chunk_content in enumerate(chunks): 

682 chunk_doc = self._create_chunk_document( 

683 original_doc=document, 

684 chunk_content=chunk_content, 

685 chunk_index=i, 

686 total_chunks=len(chunks), 

687 ) 

688 chunked_docs.append(chunk_doc) 

689 

690 return chunked_docs 

691 

692 def _extract_cross_references(self, text: str) -> list[dict[str, str]]: 

693 """Extract cross-references from text. 

694 

695 Args: 

696 text: Text to analyze 

697 

698 Returns: 

699 List of cross-references 

700 """ 

701 # Simple implementation - extract markdown links 

702 references = [] 

703 lines = text.split("\n") 

704 for line in lines: 

705 if "[" in line and "](" in line: 

706 # Extract link text and URL 

707 parts = line.split("](") 

708 if len(parts) == 2: 

709 link_text = parts[0].split("[")[1] 

710 url = parts[1].split(")")[0] 

711 references.append({"text": link_text, "url": url}) 

712 return references 

713 

714 def _extract_entities(self, text: str) -> list[dict[str, str]]: 

715 """Extract named entities from text. 

716 

717 Args: 

718 text: Text to analyze 

719 

720 Returns: 

721 List of entities 

722 """ 

723 # Simple implementation - extract capitalized phrases 

724 entities = [] 

725 words = text.split() 

726 current_entity = [] 

727 

728 for word in words: 

729 if word[0].isupper(): 

730 current_entity.append(word) 

731 elif current_entity: 

732 entities.append( 

733 { 

734 "text": " ".join(current_entity), 

735 "type": "UNKNOWN", # Could be enhanced with NER 

736 } 

737 ) 

738 current_entity = [] 

739 

740 if current_entity: 

741 entities.append({"text": " ".join(current_entity), "type": "UNKNOWN"}) 

742 

743 return entities 

744 

745 def _map_hierarchical_relationships(self, text: str) -> dict[str, Any]: 

746 """Map hierarchical relationships in text. 

747 

748 Args: 

749 text: Text to analyze 

750 

751 Returns: 

752 Dictionary of hierarchical relationships 

753 """ 

754 hierarchy = {} 

755 current_path = [] 

756 

757 lines = text.split("\n") 

758 for line in lines: 

759 if line.startswith("#"): 

760 level = len(line.split()[0]) 

761 title = line.lstrip("#").strip() 

762 

763 # Update current path 

764 while len(current_path) >= level: 

765 current_path.pop() 

766 current_path.append(title) 

767 

768 # Add to hierarchy 

769 current = hierarchy 

770 for part in current_path[:-1]: 

771 if part not in current: 

772 current[part] = {} 

773 current = current[part] 

774 current[current_path[-1]] = {} 

775 

776 return hierarchy 

777 

778 def _analyze_topic(self, text: str) -> dict[str, Any]: 

779 """Analyze topic of text. 

780 

781 Args: 

782 text: Text to analyze 

783 

784 Returns: 

785 Dictionary with topic analysis results 

786 """ 

787 # Simple implementation - return basic topic info 

788 return { 

789 "topics": ["general"], # Could be enhanced with LDA 

790 "coherence": 0.5, # Could be enhanced with topic coherence metrics 

791 } 

792 

793 def __del__(self): 

794 self.shutdown() 

795 if hasattr(self, "semantic_analyzer"): 

796 self.semantic_analyzer.clear_cache()