Coverage for src/qdrant_loader/core/chunking/strategy/markdown/markdown_strategy.py: 94%

110 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Refactored Markdown-specific chunking strategy using modular architecture.""" 

2 

3from typing import TYPE_CHECKING 

4 

5import structlog 

6 

7from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

8from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

9from qdrant_loader.core.document import Document 

10 

11from .chunk_processor import ChunkProcessor 

12from .document_parser import DocumentParser 

13from .metadata_extractor import MetadataExtractor 

14from .section_splitter import SectionSplitter 

15 

16if TYPE_CHECKING: 

17 from qdrant_loader.config import Settings 

18 

19logger = structlog.get_logger(__name__) 

20 

21 

22class MarkdownChunkingStrategy(BaseChunkingStrategy): 

23 """Strategy for chunking markdown documents based on sections. 

24 

25 This strategy splits markdown documents into chunks based on section headers, 

26 preserving the document structure and hierarchy. Each chunk includes: 

27 - The section header and its content 

28 - Parent section headers for context 

29 - Section-specific metadata 

30 - Semantic analysis results 

31 

32 The strategy uses a modular architecture with focused components: 

33 - DocumentParser: Handles document structure analysis 

34 - SectionSplitter: Manages different splitting strategies 

35 - MetadataExtractor: Enriches chunks with metadata 

36 - ChunkProcessor: Coordinates parallel processing and semantic analysis 

37 """ 

38 

39 def __init__(self, settings: "Settings"): 

40 """Initialize the Markdown chunking strategy. 

41 

42 Args: 

43 settings: Configuration settings 

44 """ 

45 super().__init__(settings) 

46 self.progress_tracker = ChunkingProgressTracker(logger) 

47 

48 # Initialize modular components 

49 self.document_parser = DocumentParser() 

50 self.section_splitter = SectionSplitter(settings) 

51 self.metadata_extractor = MetadataExtractor() 

52 self.chunk_processor = ChunkProcessor(settings) 

53 

54 # Apply any chunk overlap that was set before components were initialized 

55 if hasattr(self, '_chunk_overlap'): 

56 self.chunk_overlap = self._chunk_overlap 

57 

58 def chunk_document(self, document: Document) -> list[Document]: 

59 """Chunk a markdown document into semantic sections. 

60 

61 Args: 

62 document: The document to chunk 

63 

64 Returns: 

65 List of chunked documents 

66 """ 

67 file_name = ( 

68 document.metadata.get("file_name") 

69 or document.metadata.get("original_filename") 

70 or document.title 

71 or f"{document.source_type}:{document.source}" 

72 ) 

73 

74 # Start progress tracking 

75 self.progress_tracker.start_chunking( 

76 document.id, 

77 document.source, 

78 document.source_type, 

79 len(document.content), 

80 file_name, 

81 ) 

82 

83 # Provide user guidance on expected chunk count 

84 estimated_chunks = self.chunk_processor.estimate_chunk_count(document.content) 

85 logger.info( 

86 f"Processing document: {document.title} ({len(document.content):,} chars)", 

87 extra={ 

88 "estimated_chunks": estimated_chunks, 

89 "chunk_size": self.settings.global_config.chunking.chunk_size, 

90 "max_chunks_allowed": self.settings.global_config.chunking.max_chunks_per_document, 

91 } 

92 ) 

93 

94 try: 

95 # Split text into semantic chunks using the section splitter 

96 logger.debug("Splitting document into sections") 

97 chunks_metadata = self.section_splitter.split_sections(document.content, document) 

98 

99 if not chunks_metadata: 

100 self.progress_tracker.finish_chunking(document.id, 0, "markdown") 

101 return [] 

102 

103 # Apply configuration-driven safety limit 

104 max_chunks = self.settings.global_config.chunking.max_chunks_per_document 

105 if len(chunks_metadata) > max_chunks: 

106 logger.warning( 

107 f"Document generated {len(chunks_metadata)} chunks, limiting to {max_chunks} per config. " 

108 f"Consider increasing max_chunks_per_document in config or using larger chunk_size. " 

109 f"Document: {document.title}" 

110 ) 

111 chunks_metadata = chunks_metadata[:max_chunks] 

112 

113 # Create chunk documents 

114 chunked_docs = [] 

115 for i, chunk_meta in enumerate(chunks_metadata): 

116 chunk_content = chunk_meta["content"] 

117 logger.debug( 

118 f"Processing chunk {i+1}/{len(chunks_metadata)}", 

119 extra={ 

120 "chunk_size": len(chunk_content), 

121 "section_type": chunk_meta.get("section_type", "unknown"), 

122 "level": chunk_meta.get("level", 0), 

123 }, 

124 ) 

125 

126 # Extract section title 

127 section_title = chunk_meta.get("title") 

128 if not section_title: 

129 section_title = self.document_parser.extract_section_title(chunk_content) 

130 chunk_meta["section_title"] = section_title 

131 

132 # 🔥 ENHANCED: Use hierarchical metadata extraction 

133 enriched_metadata = self.metadata_extractor.extract_hierarchical_metadata( 

134 chunk_content, chunk_meta, document 

135 ) 

136 

137 # Create chunk document using the chunk processor 

138 # 🔥 FIX: Skip NLP for small documents or documents that might cause LDA issues 

139 skip_nlp = ( 

140 len(chunk_content) < 100 or # Too short 

141 len(chunk_content.split()) < 20 or # Too few words 

142 chunk_content.count('\n') < 3 # Too simple structure 

143 ) 

144 chunk_doc = self.chunk_processor.create_chunk_document( 

145 original_doc=document, 

146 chunk_content=chunk_content, 

147 chunk_index=i, 

148 total_chunks=len(chunks_metadata), 

149 chunk_metadata=enriched_metadata, 

150 skip_nlp=skip_nlp, 

151 ) 

152 

153 logger.debug( 

154 "Created chunk document", 

155 extra={ 

156 "chunk_id": chunk_doc.id, 

157 "chunk_size": len(chunk_content), 

158 "metadata_keys": list(chunk_doc.metadata.keys()), 

159 }, 

160 ) 

161 

162 chunked_docs.append(chunk_doc) 

163 

164 # Finish progress tracking 

165 self.progress_tracker.finish_chunking( 

166 document.id, len(chunked_docs), "markdown" 

167 ) 

168 

169 logger.info( 

170 f"Markdown chunking completed for document: {document.title}", 

171 extra={ 

172 "document_id": document.id, 

173 "total_chunks": len(chunked_docs), 

174 "document_size": len(document.content), 

175 "avg_chunk_size": ( 

176 sum(len(d.content) for d in chunked_docs) // len(chunked_docs) 

177 if chunked_docs 

178 else 0 

179 ), 

180 }, 

181 ) 

182 

183 return chunked_docs 

184 

185 except Exception as e: 

186 self.progress_tracker.log_error(document.id, str(e)) 

187 # Fallback to default chunking 

188 self.progress_tracker.log_fallback( 

189 document.id, f"Markdown parsing failed: {str(e)}" 

190 ) 

191 return self._fallback_chunking(document) 

192 

193 def _fallback_chunking(self, document: Document) -> list[Document]: 

194 """Simple fallback chunking when the main strategy fails. 

195 

196 Args: 

197 document: Document to chunk 

198 

199 Returns: 

200 List of chunked documents 

201 """ 

202 logger.info("Using fallback chunking strategy for document") 

203 

204 # Use the fallback splitter from section splitter 

205 chunks = self.section_splitter.fallback_splitter.split_content( 

206 document.content, 

207 self.settings.global_config.chunking.chunk_size 

208 ) 

209 

210 # Create chunked documents 

211 chunked_docs = [] 

212 for i, chunk_content in enumerate(chunks): 

213 chunk_doc = self.chunk_processor.create_chunk_document( 

214 original_doc=document, 

215 chunk_content=chunk_content, 

216 chunk_index=i, 

217 total_chunks=len(chunks), 

218 chunk_metadata={"chunking_strategy": "fallback"}, 

219 skip_nlp=True, # Skip NLP for fallback mode 

220 ) 

221 chunked_docs.append(chunk_doc) 

222 

223 return chunked_docs 

224 

225 def _split_text(self, text: str) -> list[dict]: 

226 """Split text into chunks based on markdown structure. 

227 

228 Args: 

229 text: The text to split into chunks 

230 

231 Returns: 

232 List of text chunks 

233 """ 

234 # Split text into sections using the section splitter 

235 sections_metadata = self.section_splitter.split_sections(text) 

236 

237 # Return the sections metadata (for backward compatibility with tests) 

238 return sections_metadata 

239 

240 # Proxy methods for backward compatibility with tests 

241 @property 

242 def semantic_analyzer(self): 

243 """Access semantic analyzer for compatibility.""" 

244 return self.chunk_processor.semantic_analyzer 

245 

246 def _identify_section_type(self, content: str): 

247 """Identify section type - delegates to section identifier.""" 

248 return self.document_parser.section_identifier.identify_section_type(content) 

249 

250 def _extract_section_metadata(self, section): 

251 """Extract section metadata - delegates to document parser.""" 

252 return self.document_parser.extract_section_metadata(section) 

253 

254 def _build_section_breadcrumb(self, section): 

255 """Build section breadcrumb - delegates to hierarchy builder.""" 

256 return self.document_parser.hierarchy_builder.build_section_breadcrumb(section) 

257 

258 def _parse_document_structure(self, text: str): 

259 """Parse document structure - delegates to document parser.""" 

260 return self.document_parser.parse_document_structure(text) 

261 

262 def _split_large_section(self, content: str, max_size: int): 

263 """Split large section - delegates to section splitter.""" 

264 return self.section_splitter.standard_splitter.split_content(content, max_size) 

265 

266 def _process_chunk(self, chunk: str, chunk_index: int, total_chunks: int): 

267 """Process chunk - delegates to chunk processor.""" 

268 return self.chunk_processor.process_chunk(chunk, chunk_index, total_chunks) 

269 

270 def _extract_section_title(self, chunk: str): 

271 """Extract section title - delegates to document parser.""" 

272 return self.document_parser.extract_section_title(chunk) 

273 

274 def _extract_cross_references(self, text: str): 

275 """Extract cross references - delegates to metadata extractor.""" 

276 return self.metadata_extractor.cross_reference_extractor.extract_cross_references(text) 

277 

278 def _extract_entities(self, text: str): 

279 """Extract entities - delegates to metadata extractor.""" 

280 return self.metadata_extractor.entity_extractor.extract_entities(text) 

281 

282 def _map_hierarchical_relationships(self, text: str): 

283 """Map hierarchical relationships - delegates to metadata extractor.""" 

284 return self.metadata_extractor.hierarchy_extractor.map_hierarchical_relationships(text) 

285 

286 def _analyze_topic(self, text: str): 

287 """Analyze topic - delegates to metadata extractor.""" 

288 return self.metadata_extractor.topic_analyzer.analyze_topic(text) 

289 

290 @property 

291 def chunk_overlap(self): 

292 """Get chunk overlap setting.""" 

293 if hasattr(self, 'section_splitter'): 

294 return self.section_splitter.standard_splitter.chunk_overlap 

295 return getattr(self, '_chunk_overlap', 200) 

296 

297 @chunk_overlap.setter 

298 def chunk_overlap(self, value): 

299 """Set chunk overlap setting.""" 

300 # Store the value for when components are initialized 

301 self._chunk_overlap = value 

302 

303 if hasattr(self, 'section_splitter'): 

304 self.section_splitter.standard_splitter.chunk_overlap = value 

305 self.section_splitter.excel_splitter.chunk_overlap = value 

306 self.section_splitter.fallback_splitter.chunk_overlap = value 

307 

308 def shutdown(self): 

309 """Shutdown all components and clean up resources.""" 

310 if hasattr(self, "chunk_processor"): 

311 self.chunk_processor.shutdown() 

312 

313 def __del__(self): 

314 """Cleanup on deletion.""" 

315 self.shutdown()