Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 76%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""JSON-specific chunking strategy for structured data using modular architecture.""" 

2 

3import json 

4 

5import structlog 

6 

7from qdrant_loader.config import Settings 

8from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

9from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

10from qdrant_loader.core.chunking.strategy.json.json_chunk_processor import ( 

11 JSONChunkProcessor, 

12) 

13from qdrant_loader.core.chunking.strategy.json.json_document_parser import ( 

14 JSONDocumentParser, 

15) 

16from qdrant_loader.core.chunking.strategy.json.json_metadata_extractor import ( 

17 JSONMetadataExtractor, 

18) 

19from qdrant_loader.core.chunking.strategy.json.json_section_splitter import ( 

20 JSONSectionSplitter, 

21) 

22from qdrant_loader.core.document import Document 

23 

24logger = structlog.get_logger(__name__) 

25 

26 

27class JSONChunkingStrategy(BaseChunkingStrategy): 

28 """Modern JSON chunking strategy using modular architecture. 

29 

30 This strategy parses JSON structure and creates chunks based on: 

31 - Schema-aware structural boundaries 

32 - Intelligent element grouping and splitting 

33 - Enhanced metadata extraction with schema inference 

34 - JSON-specific optimization for NLP processing 

35 """ 

36 

37 def __init__(self, settings: Settings): 

38 """Initialize the JSON chunking strategy. 

39 

40 Args: 

41 settings: Configuration settings 

42 """ 

43 super().__init__(settings) 

44 self.logger = logger 

45 self.progress_tracker = ChunkingProgressTracker(logger) 

46 

47 # Initialize modular components 

48 self.document_parser = JSONDocumentParser(settings) 

49 self.section_splitter = JSONSectionSplitter(settings) 

50 self.metadata_extractor = JSONMetadataExtractor(settings) 

51 self.chunk_processor = JSONChunkProcessor(settings) 

52 

53 # JSON-specific configuration 

54 self.json_config = settings.global_config.chunking.strategies.json_strategy 

55 self.simple_chunking_threshold = ( 

56 500_000 # Use simple chunking for files larger than 500KB 

57 ) 

58 

59 def chunk_document(self, document: Document) -> list[Document]: 

60 """Chunk a JSON document using modern modular approach. 

61 

62 Args: 

63 document: Document to chunk 

64 

65 Returns: 

66 List of chunked documents 

67 """ 

68 file_name = ( 

69 document.metadata.get("file_name") 

70 or document.metadata.get("original_filename") 

71 or document.title 

72 or f"{document.source_type}:{document.source}" 

73 ) 

74 

75 # Start progress tracking 

76 self.progress_tracker.start_chunking( 

77 document.id, 

78 document.source, 

79 document.source_type, 

80 len(document.content), 

81 file_name, 

82 ) 

83 

84 try: 

85 # Performance check: for very large files, use simple chunking 

86 if len(document.content) > self.simple_chunking_threshold: 

87 self.progress_tracker.log_fallback( 

88 document.id, f"Large JSON file ({len(document.content)} bytes)" 

89 ) 

90 return self._fallback_chunking(document) 

91 

92 # Step 1: Parse document structure using JSONDocumentParser 

93 document_structure = self.document_parser.parse_document_structure( 

94 document.content 

95 ) 

96 

97 if not document_structure.get("valid_json", False): 

98 self.progress_tracker.log_fallback( 

99 document.id, "Invalid JSON structure" 

100 ) 

101 return self._fallback_chunking(document) 

102 

103 # Step 2: Parse JSON into element tree 

104 root_element = self.document_parser.parse_json_structure(document.content) 

105 

106 if not root_element: 

107 self.progress_tracker.log_fallback(document.id, "JSON parsing failed") 

108 return self._fallback_chunking(document) 

109 

110 # Step 3: Get elements to chunk 

111 elements_to_chunk = [] 

112 if root_element.children: 

113 # Use top-level children as chunks 

114 elements_to_chunk = root_element.children 

115 else: 

116 # Use root element if no children 

117 elements_to_chunk = [root_element] 

118 

119 # Step 4: Apply section splitter for grouping and splitting 

120 final_elements = self.section_splitter.split_json_elements( 

121 elements_to_chunk 

122 ) 

123 

124 if not final_elements: 

125 self.progress_tracker.finish_chunking(document.id, 0, "json") 

126 return [] 

127 

128 # Step 5: Create chunked documents using chunk processor 

129 chunked_docs = [] 

130 for i, element in enumerate(final_elements): 

131 self.logger.debug( 

132 f"Processing element {i+1}/{len(final_elements)}", 

133 extra={ 

134 "element_name": element.name, 

135 "element_type": element.element_type.value, 

136 "content_size": element.size, 

137 }, 

138 ) 

139 

140 # Extract element-specific metadata 

141 element_metadata = ( 

142 self.metadata_extractor.extract_json_element_metadata(element) 

143 ) 

144 

145 # Extract hierarchical metadata from content 

146 hierarchical_metadata = ( 

147 self.metadata_extractor.extract_hierarchical_metadata( 

148 element.content, element_metadata, document 

149 ) 

150 ) 

151 

152 # Create chunk document using processor 

153 chunk_doc = self.chunk_processor.create_json_element_chunk_document( 

154 original_doc=document, 

155 element=element, 

156 chunk_index=i, 

157 total_chunks=len(final_elements), 

158 element_metadata=hierarchical_metadata, 

159 ) 

160 

161 chunked_docs.append(chunk_doc) 

162 

163 # Log completion 

164 self.progress_tracker.finish_chunking( 

165 document.id, len(chunked_docs), "json" 

166 ) 

167 self.logger.info( 

168 f"Successfully chunked JSON document into {len(chunked_docs)} chunks using modular architecture", 

169 extra={ 

170 "document_id": document.id, 

171 "original_size": len(document.content), 

172 "chunks_created": len(chunked_docs), 

173 "schema_inference_enabled": self.json_config.enable_schema_inference, 

174 }, 

175 ) 

176 

177 return chunked_docs 

178 

179 except Exception as e: 

180 self.logger.error( 

181 f"Error chunking JSON document: {e}", 

182 extra={"document_id": document.id, "error": str(e)}, 

183 exc_info=True, 

184 ) 

185 self.progress_tracker.log_fallback(document.id, f"Error: {e}") 

186 return self._fallback_chunking(document) 

187 

188 def _fallback_chunking(self, document: Document) -> list[Document]: 

189 """Fallback to simple text-based chunking for problematic JSON. 

190 

191 Args: 

192 document: Document to chunk 

193 

194 Returns: 

195 List of chunked documents using simple strategy 

196 """ 

197 try: 

198 # Use text-based chunking as fallback 

199 content = document.content 

200 chunks = [] 

201 

202 chunk_size = self.settings.global_config.chunking.chunk_size 

203 overlap = self.settings.global_config.chunking.chunk_overlap 

204 

205 # Simple chunking by lines to preserve some JSON structure 

206 lines = content.split("\n") 

207 current_chunk_lines = [] 

208 current_size = 0 

209 chunk_index = 0 

210 

211 for line in lines: 

212 line_size = len(line) + 1 # +1 for newline 

213 

214 if current_size + line_size > chunk_size and current_chunk_lines: 

215 # Create chunk from current lines 

216 chunk_content = "\n".join(current_chunk_lines) 

217 

218 # Create basic metadata for fallback chunk 

219 fallback_metadata = { 

220 "chunk_index": chunk_index, 

221 "chunk_size": len(chunk_content), 

222 "content_type": "json_fallback", 

223 "processing_mode": "fallback", 

224 "chunking_strategy": "json_fallback", 

225 } 

226 

227 chunk_doc = self.chunk_processor.create_chunk_document( 

228 original_doc=document, 

229 chunk_content=chunk_content, 

230 chunk_index=chunk_index, 

231 total_chunks=-1, # Unknown at this point 

232 chunk_metadata=fallback_metadata, 

233 skip_nlp=True, # Skip NLP for fallback chunks 

234 ) 

235 

236 chunks.append(chunk_doc) 

237 

238 # Setup for next chunk with overlap 

239 overlap_lines = ( 

240 current_chunk_lines[-overlap // 50 :] if overlap > 0 else [] 

241 ) 

242 current_chunk_lines = overlap_lines + [line] 

243 current_size = sum( 

244 len(line_item) + 1 for line_item in current_chunk_lines 

245 ) 

246 chunk_index += 1 

247 else: 

248 current_chunk_lines.append(line) 

249 current_size += line_size 

250 

251 # Add final chunk 

252 if current_chunk_lines: 

253 chunk_content = "\n".join(current_chunk_lines) 

254 fallback_metadata = { 

255 "chunk_index": chunk_index, 

256 "chunk_size": len(chunk_content), 

257 "content_type": "json_fallback", 

258 "processing_mode": "fallback", 

259 "chunking_strategy": "json_fallback", 

260 } 

261 

262 chunk_doc = self.chunk_processor.create_chunk_document( 

263 original_doc=document, 

264 chunk_content=chunk_content, 

265 chunk_index=chunk_index, 

266 total_chunks=chunk_index + 1, 

267 chunk_metadata=fallback_metadata, 

268 skip_nlp=True, 

269 ) 

270 chunks.append(chunk_doc) 

271 

272 # Update total_chunks in all chunk metadata 

273 for chunk in chunks: 

274 chunk.metadata["total_chunks"] = len(chunks) 

275 

276 self.logger.warning( 

277 f"Used fallback chunking for JSON document, created {len(chunks)} chunks", 

278 extra={"document_id": document.id, "chunks_created": len(chunks)}, 

279 ) 

280 

281 return chunks 

282 

283 except Exception as e: 

284 self.logger.error( 

285 f"Fallback chunking failed: {e}", 

286 extra={"document_id": document.id, "error": str(e)}, 

287 exc_info=True, 

288 ) 

289 # Ultimate fallback: return original document as single chunk 

290 return [document] 

291 

292 def get_strategy_name(self) -> str: 

293 """Get the name of this chunking strategy. 

294 

295 Returns: 

296 Strategy name 

297 """ 

298 return "json_modular" 

299 

300 def supports_document_type(self, document: Document) -> bool: 

301 """Check if this strategy supports the given document type. 

302 

303 Args: 

304 document: Document to check 

305 

306 Returns: 

307 True if this strategy can handle the document 

308 """ 

309 # Check file extension 

310 if hasattr(document, "source") and document.source: 

311 if document.source.lower().endswith(".json"): 

312 return True 

313 

314 # Check content type metadata 

315 content_type = document.metadata.get("content_type", "").lower() 

316 if "json" in content_type: 

317 return True 

318 

319 # Try to parse as JSON 

320 try: 

321 json.loads(document.content[:1000]) # Test first 1KB 

322 return True 

323 except (json.JSONDecodeError, AttributeError): 

324 return False 

325 

326 def estimate_chunk_count(self, document: Document) -> int: 

327 """Estimate the number of chunks this strategy will create. 

328 

329 Args: 

330 document: Document to estimate for 

331 

332 Returns: 

333 Estimated number of chunks 

334 """ 

335 try: 

336 # Quick structure analysis for estimation 

337 structure = self.document_parser.parse_document_structure(document.content) 

338 

339 if structure.get("valid_json", False): 

340 total_elements = structure.get("total_elements", 1) 

341 complexity_score = structure.get("complexity_score", 1.0) 

342 

343 # Estimate based on elements and complexity 

344 estimated_chunks = max(1, int(total_elements * complexity_score / 10)) 

345 

346 # Apply limits 

347 max_chunks = self.json_config.max_objects_to_process 

348 return min(estimated_chunks, max_chunks) 

349 else: 

350 # Fallback estimation 

351 return max( 

352 1, 

353 len(document.content) 

354 // self.settings.global_config.chunking.chunk_size, 

355 ) 

356 

357 except Exception: 

358 # Ultimate fallback 

359 return max( 

360 1, 

361 len(document.content) 

362 // self.settings.global_config.chunking.chunk_size, 

363 ) 

364 

365 def shutdown(self): 

366 """Clean up resources used by the strategy.""" 

367 # Clean up any cached data 

368 if hasattr(self, "_processed_chunks"): 

369 self._processed_chunks.clear() 

370 

371 # Log shutdown 

372 self.logger.debug("JSON chunking strategy (modular) shutdown completed") 

373 

374 def __str__(self) -> str: 

375 """String representation of the strategy.""" 

376 return f"JSONChunkingStrategy(modular, schema_inference={self.json_config.enable_schema_inference})" 

377 

378 def __repr__(self) -> str: 

379 """Detailed string representation of the strategy.""" 

380 return ( 

381 f"JSONChunkingStrategy(" 

382 f"modular=True, " 

383 f"max_objects={self.json_config.max_objects_to_process}, " 

384 f"max_chunk_size_for_nlp={self.json_config.max_chunk_size_for_nlp}, " 

385 f"schema_inference={self.json_config.enable_schema_inference}" 

386 f")" 

387 )