Coverage for src/qdrant_loader/core/chunking/strategy/markdown/markdown_strategy.py: 94%
81 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Refactored Markdown-specific chunking strategy using modular architecture."""
3from typing import TYPE_CHECKING
5import structlog
7from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
9from qdrant_loader.core.document import Document
11from .chunk_processor import ChunkProcessor
12from .document_parser import DocumentParser
13from .metadata_extractor import MetadataExtractor
14from .section_splitter import SectionSplitter
16if TYPE_CHECKING:
17 from qdrant_loader.config import Settings
19logger = structlog.get_logger(__name__)
22class MarkdownChunkingStrategy(BaseChunkingStrategy):
23 """Strategy for chunking markdown documents based on sections.
25 This strategy splits markdown documents into chunks based on section headers,
26 preserving the document structure and hierarchy. Each chunk includes:
27 - The section header and its content
28 - Parent section headers for context
29 - Section-specific metadata
30 - Semantic analysis results
32 The strategy uses a modular architecture with focused components:
33 - DocumentParser: Handles document structure analysis
34 - SectionSplitter: Manages different splitting strategies
35 - MetadataExtractor: Enriches chunks with metadata
36 - ChunkProcessor: Coordinates parallel processing and semantic analysis
37 """
39 def __init__(self, settings: "Settings"):
40 """Initialize the Markdown chunking strategy.
42 Args:
43 settings: Configuration settings
44 """
45 super().__init__(settings)
46 self.progress_tracker = ChunkingProgressTracker(logger)
48 # Initialize modular components
49 self.document_parser = DocumentParser()
50 self.section_splitter = SectionSplitter(settings)
51 self.metadata_extractor = MetadataExtractor(settings)
52 self.chunk_processor = ChunkProcessor(settings)
54 # Apply any chunk overlap that was set before components were initialized
55 if hasattr(self, "_chunk_overlap"):
56 self.chunk_overlap = self._chunk_overlap
58 def chunk_document(self, document: Document) -> list[Document]:
59 """Chunk a markdown document into semantic sections.
61 Args:
62 document: The document to chunk
64 Returns:
65 List of chunked documents
66 """
67 file_name = (
68 document.metadata.get("file_name")
69 or document.metadata.get("original_filename")
70 or document.title
71 or f"{document.source_type}:{document.source}"
72 )
74 # Start progress tracking
75 self.progress_tracker.start_chunking(
76 document.id,
77 document.source,
78 document.source_type,
79 len(document.content),
80 file_name,
81 )
83 # Provide user guidance on expected chunk count
84 estimated_chunks = self.chunk_processor.estimate_chunk_count(document.content)
85 logger.info(
86 f"Processing document: {document.title} ({len(document.content):,} chars)",
87 extra={
88 "estimated_chunks": estimated_chunks,
89 "chunk_size": self.settings.global_config.chunking.chunk_size,
90 "max_chunks_allowed": self.settings.global_config.chunking.max_chunks_per_document,
91 },
92 )
94 try:
95 # Split text into semantic chunks using the section splitter
96 logger.debug("Splitting document into sections")
97 chunks_metadata = self.section_splitter.split_sections(
98 document.content, document
99 )
101 if not chunks_metadata:
102 self.progress_tracker.finish_chunking(document.id, 0, "markdown")
103 return []
105 # Apply configuration-driven safety limit
106 max_chunks = self.settings.global_config.chunking.max_chunks_per_document
107 if len(chunks_metadata) > max_chunks:
108 logger.warning(
109 f"Document generated {len(chunks_metadata)} chunks, limiting to {max_chunks} per config. "
110 f"Consider increasing max_chunks_per_document in config or using larger chunk_size. "
111 f"Document: {document.title}"
112 )
113 chunks_metadata = chunks_metadata[:max_chunks]
115 # Create chunk documents
116 chunked_docs = []
117 for i, chunk_meta in enumerate(chunks_metadata):
118 chunk_content = chunk_meta["content"]
119 logger.debug(
120 f"Processing chunk {i+1}/{len(chunks_metadata)}",
121 extra={
122 "chunk_size": len(chunk_content),
123 "section_type": chunk_meta.get("section_type", "unknown"),
124 "level": chunk_meta.get("level", 0),
125 },
126 )
128 # Extract section title
129 section_title = chunk_meta.get("title")
130 if not section_title:
131 section_title = self.document_parser.extract_section_title(
132 chunk_content
133 )
134 chunk_meta["section_title"] = section_title
136 # 🔥 ENHANCED: Use hierarchical metadata extraction
137 enriched_metadata = (
138 self.metadata_extractor.extract_hierarchical_metadata(
139 chunk_content, chunk_meta, document
140 )
141 )
143 # Create chunk document using the chunk processor
144 # 🔥 FIX: Skip NLP for small documents or documents that might cause LDA issues
145 markdown_config = (
146 self.settings.global_config.chunking.strategies.markdown
147 )
148 skip_nlp = (
149 len(chunk_content) < markdown_config.min_content_length_for_nlp
150 or len(chunk_content.split())
151 < markdown_config.min_word_count_for_nlp
152 or chunk_content.count("\n")
153 < markdown_config.min_line_count_for_nlp
154 )
155 chunk_doc = self.chunk_processor.create_chunk_document(
156 original_doc=document,
157 chunk_content=chunk_content,
158 chunk_index=i,
159 total_chunks=len(chunks_metadata),
160 chunk_metadata=enriched_metadata,
161 skip_nlp=skip_nlp,
162 )
164 logger.debug(
165 "Created chunk document",
166 extra={
167 "chunk_id": chunk_doc.id,
168 "chunk_size": len(chunk_content),
169 "metadata_keys": list(chunk_doc.metadata.keys()),
170 },
171 )
173 chunked_docs.append(chunk_doc)
175 # Finish progress tracking
176 self.progress_tracker.finish_chunking(
177 document.id, len(chunked_docs), "markdown"
178 )
180 logger.info(
181 f"Markdown chunking completed for document: {document.title}",
182 extra={
183 "document_id": document.id,
184 "total_chunks": len(chunked_docs),
185 "document_size": len(document.content),
186 "avg_chunk_size": (
187 sum(len(d.content) for d in chunked_docs) // len(chunked_docs)
188 if chunked_docs
189 else 0
190 ),
191 },
192 )
194 return chunked_docs
196 except Exception as e:
197 self.progress_tracker.log_error(document.id, str(e))
198 # Fallback to default chunking
199 self.progress_tracker.log_fallback(
200 document.id, f"Markdown parsing failed: {str(e)}"
201 )
202 return self._fallback_chunking(document)
204 def _fallback_chunking(self, document: Document) -> list[Document]:
205 """Simple fallback chunking when the main strategy fails.
207 Args:
208 document: Document to chunk
210 Returns:
211 List of chunked documents
212 """
213 logger.info("Using fallback chunking strategy for document")
215 # Use the fallback splitter from section splitter
216 chunks = self.section_splitter.fallback_splitter.split_content(
217 document.content, self.settings.global_config.chunking.chunk_size
218 )
220 # Create chunked documents
221 chunked_docs = []
222 for i, chunk_content in enumerate(chunks):
223 chunk_doc = self.chunk_processor.create_chunk_document(
224 original_doc=document,
225 chunk_content=chunk_content,
226 chunk_index=i,
227 total_chunks=len(chunks),
228 chunk_metadata={"chunking_strategy": "fallback"},
229 skip_nlp=True, # Skip NLP for fallback mode
230 )
231 chunked_docs.append(chunk_doc)
233 return chunked_docs
235 @property
236 def chunk_overlap(self):
237 """Get chunk overlap setting."""
238 if hasattr(self, "section_splitter"):
239 return self.section_splitter.standard_splitter.chunk_overlap
240 return getattr(self, "_chunk_overlap", 200)
242 @chunk_overlap.setter
243 def chunk_overlap(self, value):
244 """Set chunk overlap setting."""
245 # Store the value for when components are initialized
246 self._chunk_overlap = value
248 if hasattr(self, "section_splitter"):
249 self.section_splitter.standard_splitter.chunk_overlap = value
250 self.section_splitter.excel_splitter.chunk_overlap = value
251 self.section_splitter.fallback_splitter.chunk_overlap = value
253 def shutdown(self):
254 """Shutdown all components and clean up resources."""
255 if hasattr(self, "chunk_processor"):
256 self.chunk_processor.shutdown()
258 def __del__(self):
259 """Cleanup on deletion."""
260 self.shutdown()