Coverage for src/qdrant_loader/core/chunking/strategy/markdown/markdown_strategy.py: 94%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Refactored Markdown-specific chunking strategy using modular architecture.""" 

2 

3from typing import TYPE_CHECKING 

4 

5import structlog 

6 

7from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

9from qdrant_loader.core.document import Document 

10 

11from .chunk_processor import ChunkProcessor 

12from .document_parser import DocumentParser 

13from .metadata_extractor import MetadataExtractor 

14from .section_splitter import SectionSplitter 

15 

16if TYPE_CHECKING: 

17 from qdrant_loader.config import Settings 

18 

19logger = structlog.get_logger(__name__) 

20 

21 

22class MarkdownChunkingStrategy(BaseChunkingStrategy): 

23 """Strategy for chunking markdown documents based on sections. 

24 

25 This strategy splits markdown documents into chunks based on section headers, 

26 preserving the document structure and hierarchy. Each chunk includes: 

27 - The section header and its content 

28 - Parent section headers for context 

29 - Section-specific metadata 

30 - Semantic analysis results 

31 

32 The strategy uses a modular architecture with focused components: 

33 - DocumentParser: Handles document structure analysis 

34 - SectionSplitter: Manages different splitting strategies 

35 - MetadataExtractor: Enriches chunks with metadata 

36 - ChunkProcessor: Coordinates parallel processing and semantic analysis 

37 """ 

38 

39 def __init__(self, settings: "Settings"): 

40 """Initialize the Markdown chunking strategy. 

41 

42 Args: 

43 settings: Configuration settings 

44 """ 

45 super().__init__(settings) 

46 self.progress_tracker = ChunkingProgressTracker(logger) 

47 

48 # Initialize modular components 

49 self.document_parser = DocumentParser() 

50 self.section_splitter = SectionSplitter(settings) 

51 self.metadata_extractor = MetadataExtractor(settings) 

52 self.chunk_processor = ChunkProcessor(settings) 

53 

54 # Apply any chunk overlap that was set before components were initialized 

55 if hasattr(self, "_chunk_overlap"): 

56 self.chunk_overlap = self._chunk_overlap 

57 

58 def chunk_document(self, document: Document) -> list[Document]: 

59 """Chunk a markdown document into semantic sections. 

60 

61 Args: 

62 document: The document to chunk 

63 

64 Returns: 

65 List of chunked documents 

66 """ 

67 file_name = ( 

68 document.metadata.get("file_name") 

69 or document.metadata.get("original_filename") 

70 or document.title 

71 or f"{document.source_type}:{document.source}" 

72 ) 

73 

74 # Start progress tracking 

75 self.progress_tracker.start_chunking( 

76 document.id, 

77 document.source, 

78 document.source_type, 

79 len(document.content), 

80 file_name, 

81 ) 

82 

83 # Provide user guidance on expected chunk count 

84 estimated_chunks = self.chunk_processor.estimate_chunk_count(document.content) 

85 logger.info( 

86 f"Processing document: {document.title} ({len(document.content):,} chars)", 

87 extra={ 

88 "estimated_chunks": estimated_chunks, 

89 "chunk_size": self.settings.global_config.chunking.chunk_size, 

90 "max_chunks_allowed": self.settings.global_config.chunking.max_chunks_per_document, 

91 }, 

92 ) 

93 

94 try: 

95 # Split text into semantic chunks using the section splitter 

96 logger.debug("Splitting document into sections") 

97 chunks_metadata = self.section_splitter.split_sections( 

98 document.content, document 

99 ) 

100 

101 if not chunks_metadata: 

102 self.progress_tracker.finish_chunking(document.id, 0, "markdown") 

103 return [] 

104 

105 # Apply configuration-driven safety limit 

106 max_chunks = self.settings.global_config.chunking.max_chunks_per_document 

107 if len(chunks_metadata) > max_chunks: 

108 logger.warning( 

109 f"Document generated {len(chunks_metadata)} chunks, limiting to {max_chunks} per config. " 

110 f"Consider increasing max_chunks_per_document in config or using larger chunk_size. " 

111 f"Document: {document.title}" 

112 ) 

113 chunks_metadata = chunks_metadata[:max_chunks] 

114 

115 # Create chunk documents 

116 chunked_docs = [] 

117 for i, chunk_meta in enumerate(chunks_metadata): 

118 chunk_content = chunk_meta["content"] 

119 logger.debug( 

120 f"Processing chunk {i+1}/{len(chunks_metadata)}", 

121 extra={ 

122 "chunk_size": len(chunk_content), 

123 "section_type": chunk_meta.get("section_type", "unknown"), 

124 "level": chunk_meta.get("level", 0), 

125 }, 

126 ) 

127 

128 # Extract section title 

129 section_title = chunk_meta.get("title") 

130 if not section_title: 

131 section_title = self.document_parser.extract_section_title( 

132 chunk_content 

133 ) 

134 chunk_meta["section_title"] = section_title 

135 

136 # 🔥 ENHANCED: Use hierarchical metadata extraction 

137 enriched_metadata = ( 

138 self.metadata_extractor.extract_hierarchical_metadata( 

139 chunk_content, chunk_meta, document 

140 ) 

141 ) 

142 

143 # Create chunk document using the chunk processor 

144 # 🔥 FIX: Skip NLP for small documents or documents that might cause LDA issues 

145 markdown_config = ( 

146 self.settings.global_config.chunking.strategies.markdown 

147 ) 

148 skip_nlp = ( 

149 len(chunk_content) < markdown_config.min_content_length_for_nlp 

150 or len(chunk_content.split()) 

151 < markdown_config.min_word_count_for_nlp 

152 or chunk_content.count("\n") 

153 < markdown_config.min_line_count_for_nlp 

154 ) 

155 chunk_doc = self.chunk_processor.create_chunk_document( 

156 original_doc=document, 

157 chunk_content=chunk_content, 

158 chunk_index=i, 

159 total_chunks=len(chunks_metadata), 

160 chunk_metadata=enriched_metadata, 

161 skip_nlp=skip_nlp, 

162 ) 

163 

164 logger.debug( 

165 "Created chunk document", 

166 extra={ 

167 "chunk_id": chunk_doc.id, 

168 "chunk_size": len(chunk_content), 

169 "metadata_keys": list(chunk_doc.metadata.keys()), 

170 }, 

171 ) 

172 

173 chunked_docs.append(chunk_doc) 

174 

175 # Finish progress tracking 

176 self.progress_tracker.finish_chunking( 

177 document.id, len(chunked_docs), "markdown" 

178 ) 

179 

180 logger.info( 

181 f"Markdown chunking completed for document: {document.title}", 

182 extra={ 

183 "document_id": document.id, 

184 "total_chunks": len(chunked_docs), 

185 "document_size": len(document.content), 

186 "avg_chunk_size": ( 

187 sum(len(d.content) for d in chunked_docs) // len(chunked_docs) 

188 if chunked_docs 

189 else 0 

190 ), 

191 }, 

192 ) 

193 

194 return chunked_docs 

195 

196 except Exception as e: 

197 self.progress_tracker.log_error(document.id, str(e)) 

198 # Fallback to default chunking 

199 self.progress_tracker.log_fallback( 

200 document.id, f"Markdown parsing failed: {str(e)}" 

201 ) 

202 return self._fallback_chunking(document) 

203 

204 def _fallback_chunking(self, document: Document) -> list[Document]: 

205 """Simple fallback chunking when the main strategy fails. 

206 

207 Args: 

208 document: Document to chunk 

209 

210 Returns: 

211 List of chunked documents 

212 """ 

213 logger.info("Using fallback chunking strategy for document") 

214 

215 # Use the fallback splitter from section splitter 

216 chunks = self.section_splitter.fallback_splitter.split_content( 

217 document.content, self.settings.global_config.chunking.chunk_size 

218 ) 

219 

220 # Create chunked documents 

221 chunked_docs = [] 

222 for i, chunk_content in enumerate(chunks): 

223 chunk_doc = self.chunk_processor.create_chunk_document( 

224 original_doc=document, 

225 chunk_content=chunk_content, 

226 chunk_index=i, 

227 total_chunks=len(chunks), 

228 chunk_metadata={"chunking_strategy": "fallback"}, 

229 skip_nlp=True, # Skip NLP for fallback mode 

230 ) 

231 chunked_docs.append(chunk_doc) 

232 

233 return chunked_docs 

234 

235 @property 

236 def chunk_overlap(self): 

237 """Get chunk overlap setting.""" 

238 if hasattr(self, "section_splitter"): 

239 return self.section_splitter.standard_splitter.chunk_overlap 

240 return getattr(self, "_chunk_overlap", 200) 

241 

242 @chunk_overlap.setter 

243 def chunk_overlap(self, value): 

244 """Set chunk overlap setting.""" 

245 # Store the value for when components are initialized 

246 self._chunk_overlap = value 

247 

248 if hasattr(self, "section_splitter"): 

249 self.section_splitter.standard_splitter.chunk_overlap = value 

250 self.section_splitter.excel_splitter.chunk_overlap = value 

251 self.section_splitter.fallback_splitter.chunk_overlap = value 

252 

253 def shutdown(self): 

254 """Shutdown all components and clean up resources.""" 

255 if hasattr(self, "chunk_processor"): 

256 self.chunk_processor.shutdown() 

257 

258 def __del__(self): 

259 """Cleanup on deletion.""" 

260 self.shutdown()