Coverage for src/qdrant_loader/core/chunking/strategy/markdown/markdown_strategy.py: 94%
110 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Refactored Markdown-specific chunking strategy using modular architecture."""
3from typing import TYPE_CHECKING
5import structlog
7from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
9from qdrant_loader.core.document import Document
11from .chunk_processor import ChunkProcessor
12from .document_parser import DocumentParser
13from .metadata_extractor import MetadataExtractor
14from .section_splitter import SectionSplitter
16if TYPE_CHECKING:
17 from qdrant_loader.config import Settings
19logger = structlog.get_logger(__name__)
22class MarkdownChunkingStrategy(BaseChunkingStrategy):
23 """Strategy for chunking markdown documents based on sections.
25 This strategy splits markdown documents into chunks based on section headers,
26 preserving the document structure and hierarchy. Each chunk includes:
27 - The section header and its content
28 - Parent section headers for context
29 - Section-specific metadata
30 - Semantic analysis results
32 The strategy uses a modular architecture with focused components:
33 - DocumentParser: Handles document structure analysis
34 - SectionSplitter: Manages different splitting strategies
35 - MetadataExtractor: Enriches chunks with metadata
36 - ChunkProcessor: Coordinates parallel processing and semantic analysis
37 """
39 def __init__(self, settings: "Settings"):
40 """Initialize the Markdown chunking strategy.
42 Args:
43 settings: Configuration settings
44 """
45 super().__init__(settings)
46 self.progress_tracker = ChunkingProgressTracker(logger)
48 # Initialize modular components
49 self.document_parser = DocumentParser()
50 self.section_splitter = SectionSplitter(settings)
51 self.metadata_extractor = MetadataExtractor()
52 self.chunk_processor = ChunkProcessor(settings)
54 # Apply any chunk overlap that was set before components were initialized
55 if hasattr(self, '_chunk_overlap'):
56 self.chunk_overlap = self._chunk_overlap
58 def chunk_document(self, document: Document) -> list[Document]:
59 """Chunk a markdown document into semantic sections.
61 Args:
62 document: The document to chunk
64 Returns:
65 List of chunked documents
66 """
67 file_name = (
68 document.metadata.get("file_name")
69 or document.metadata.get("original_filename")
70 or document.title
71 or f"{document.source_type}:{document.source}"
72 )
74 # Start progress tracking
75 self.progress_tracker.start_chunking(
76 document.id,
77 document.source,
78 document.source_type,
79 len(document.content),
80 file_name,
81 )
83 # Provide user guidance on expected chunk count
84 estimated_chunks = self.chunk_processor.estimate_chunk_count(document.content)
85 logger.info(
86 f"Processing document: {document.title} ({len(document.content):,} chars)",
87 extra={
88 "estimated_chunks": estimated_chunks,
89 "chunk_size": self.settings.global_config.chunking.chunk_size,
90 "max_chunks_allowed": self.settings.global_config.chunking.max_chunks_per_document,
91 }
92 )
94 try:
95 # Split text into semantic chunks using the section splitter
96 logger.debug("Splitting document into sections")
97 chunks_metadata = self.section_splitter.split_sections(document.content, document)
99 if not chunks_metadata:
100 self.progress_tracker.finish_chunking(document.id, 0, "markdown")
101 return []
103 # Apply configuration-driven safety limit
104 max_chunks = self.settings.global_config.chunking.max_chunks_per_document
105 if len(chunks_metadata) > max_chunks:
106 logger.warning(
107 f"Document generated {len(chunks_metadata)} chunks, limiting to {max_chunks} per config. "
108 f"Consider increasing max_chunks_per_document in config or using larger chunk_size. "
109 f"Document: {document.title}"
110 )
111 chunks_metadata = chunks_metadata[:max_chunks]
113 # Create chunk documents
114 chunked_docs = []
115 for i, chunk_meta in enumerate(chunks_metadata):
116 chunk_content = chunk_meta["content"]
117 logger.debug(
118 f"Processing chunk {i+1}/{len(chunks_metadata)}",
119 extra={
120 "chunk_size": len(chunk_content),
121 "section_type": chunk_meta.get("section_type", "unknown"),
122 "level": chunk_meta.get("level", 0),
123 },
124 )
126 # Extract section title
127 section_title = chunk_meta.get("title")
128 if not section_title:
129 section_title = self.document_parser.extract_section_title(chunk_content)
130 chunk_meta["section_title"] = section_title
132 # 🔥 ENHANCED: Use hierarchical metadata extraction
133 enriched_metadata = self.metadata_extractor.extract_hierarchical_metadata(
134 chunk_content, chunk_meta, document
135 )
137 # Create chunk document using the chunk processor
138 # 🔥 FIX: Skip NLP for small documents or documents that might cause LDA issues
139 skip_nlp = (
140 len(chunk_content) < 100 or # Too short
141 len(chunk_content.split()) < 20 or # Too few words
142 chunk_content.count('\n') < 3 # Too simple structure
143 )
144 chunk_doc = self.chunk_processor.create_chunk_document(
145 original_doc=document,
146 chunk_content=chunk_content,
147 chunk_index=i,
148 total_chunks=len(chunks_metadata),
149 chunk_metadata=enriched_metadata,
150 skip_nlp=skip_nlp,
151 )
153 logger.debug(
154 "Created chunk document",
155 extra={
156 "chunk_id": chunk_doc.id,
157 "chunk_size": len(chunk_content),
158 "metadata_keys": list(chunk_doc.metadata.keys()),
159 },
160 )
162 chunked_docs.append(chunk_doc)
164 # Finish progress tracking
165 self.progress_tracker.finish_chunking(
166 document.id, len(chunked_docs), "markdown"
167 )
169 logger.info(
170 f"Markdown chunking completed for document: {document.title}",
171 extra={
172 "document_id": document.id,
173 "total_chunks": len(chunked_docs),
174 "document_size": len(document.content),
175 "avg_chunk_size": (
176 sum(len(d.content) for d in chunked_docs) // len(chunked_docs)
177 if chunked_docs
178 else 0
179 ),
180 },
181 )
183 return chunked_docs
185 except Exception as e:
186 self.progress_tracker.log_error(document.id, str(e))
187 # Fallback to default chunking
188 self.progress_tracker.log_fallback(
189 document.id, f"Markdown parsing failed: {str(e)}"
190 )
191 return self._fallback_chunking(document)
193 def _fallback_chunking(self, document: Document) -> list[Document]:
194 """Simple fallback chunking when the main strategy fails.
196 Args:
197 document: Document to chunk
199 Returns:
200 List of chunked documents
201 """
202 logger.info("Using fallback chunking strategy for document")
204 # Use the fallback splitter from section splitter
205 chunks = self.section_splitter.fallback_splitter.split_content(
206 document.content,
207 self.settings.global_config.chunking.chunk_size
208 )
210 # Create chunked documents
211 chunked_docs = []
212 for i, chunk_content in enumerate(chunks):
213 chunk_doc = self.chunk_processor.create_chunk_document(
214 original_doc=document,
215 chunk_content=chunk_content,
216 chunk_index=i,
217 total_chunks=len(chunks),
218 chunk_metadata={"chunking_strategy": "fallback"},
219 skip_nlp=True, # Skip NLP for fallback mode
220 )
221 chunked_docs.append(chunk_doc)
223 return chunked_docs
225 def _split_text(self, text: str) -> list[dict]:
226 """Split text into chunks based on markdown structure.
228 Args:
229 text: The text to split into chunks
231 Returns:
232 List of text chunks
233 """
234 # Split text into sections using the section splitter
235 sections_metadata = self.section_splitter.split_sections(text)
237 # Return the sections metadata (for backward compatibility with tests)
238 return sections_metadata
240 # Proxy methods for backward compatibility with tests
241 @property
242 def semantic_analyzer(self):
243 """Access semantic analyzer for compatibility."""
244 return self.chunk_processor.semantic_analyzer
246 def _identify_section_type(self, content: str):
247 """Identify section type - delegates to section identifier."""
248 return self.document_parser.section_identifier.identify_section_type(content)
250 def _extract_section_metadata(self, section):
251 """Extract section metadata - delegates to document parser."""
252 return self.document_parser.extract_section_metadata(section)
254 def _build_section_breadcrumb(self, section):
255 """Build section breadcrumb - delegates to hierarchy builder."""
256 return self.document_parser.hierarchy_builder.build_section_breadcrumb(section)
258 def _parse_document_structure(self, text: str):
259 """Parse document structure - delegates to document parser."""
260 return self.document_parser.parse_document_structure(text)
262 def _split_large_section(self, content: str, max_size: int):
263 """Split large section - delegates to section splitter."""
264 return self.section_splitter.standard_splitter.split_content(content, max_size)
266 def _process_chunk(self, chunk: str, chunk_index: int, total_chunks: int):
267 """Process chunk - delegates to chunk processor."""
268 return self.chunk_processor.process_chunk(chunk, chunk_index, total_chunks)
270 def _extract_section_title(self, chunk: str):
271 """Extract section title - delegates to document parser."""
272 return self.document_parser.extract_section_title(chunk)
274 def _extract_cross_references(self, text: str):
275 """Extract cross references - delegates to metadata extractor."""
276 return self.metadata_extractor.cross_reference_extractor.extract_cross_references(text)
278 def _extract_entities(self, text: str):
279 """Extract entities - delegates to metadata extractor."""
280 return self.metadata_extractor.entity_extractor.extract_entities(text)
282 def _map_hierarchical_relationships(self, text: str):
283 """Map hierarchical relationships - delegates to metadata extractor."""
284 return self.metadata_extractor.hierarchy_extractor.map_hierarchical_relationships(text)
286 def _analyze_topic(self, text: str):
287 """Analyze topic - delegates to metadata extractor."""
288 return self.metadata_extractor.topic_analyzer.analyze_topic(text)
290 @property
291 def chunk_overlap(self):
292 """Get chunk overlap setting."""
293 if hasattr(self, 'section_splitter'):
294 return self.section_splitter.standard_splitter.chunk_overlap
295 return getattr(self, '_chunk_overlap', 200)
297 @chunk_overlap.setter
298 def chunk_overlap(self, value):
299 """Set chunk overlap setting."""
300 # Store the value for when components are initialized
301 self._chunk_overlap = value
303 if hasattr(self, 'section_splitter'):
304 self.section_splitter.standard_splitter.chunk_overlap = value
305 self.section_splitter.excel_splitter.chunk_overlap = value
306 self.section_splitter.fallback_splitter.chunk_overlap = value
308 def shutdown(self):
309 """Shutdown all components and clean up resources."""
310 if hasattr(self, "chunk_processor"):
311 self.chunk_processor.shutdown()
313 def __del__(self):
314 """Cleanup on deletion."""
315 self.shutdown()