Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 76%
128 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""JSON-specific chunking strategy for structured data using modular architecture."""
3import json
5import structlog
7from qdrant_loader.config import Settings
8from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
9from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
10from qdrant_loader.core.chunking.strategy.json.json_chunk_processor import (
11 JSONChunkProcessor,
12)
13from qdrant_loader.core.chunking.strategy.json.json_document_parser import (
14 JSONDocumentParser,
15)
16from qdrant_loader.core.chunking.strategy.json.json_metadata_extractor import (
17 JSONMetadataExtractor,
18)
19from qdrant_loader.core.chunking.strategy.json.json_section_splitter import (
20 JSONSectionSplitter,
21)
22from qdrant_loader.core.document import Document
24logger = structlog.get_logger(__name__)
27class JSONChunkingStrategy(BaseChunkingStrategy):
28 """Modern JSON chunking strategy using modular architecture.
30 This strategy parses JSON structure and creates chunks based on:
31 - Schema-aware structural boundaries
32 - Intelligent element grouping and splitting
33 - Enhanced metadata extraction with schema inference
34 - JSON-specific optimization for NLP processing
35 """
37 def __init__(self, settings: Settings):
38 """Initialize the JSON chunking strategy.
40 Args:
41 settings: Configuration settings
42 """
43 super().__init__(settings)
44 self.logger = logger
45 self.progress_tracker = ChunkingProgressTracker(logger)
47 # Initialize modular components
48 self.document_parser = JSONDocumentParser(settings)
49 self.section_splitter = JSONSectionSplitter(settings)
50 self.metadata_extractor = JSONMetadataExtractor(settings)
51 self.chunk_processor = JSONChunkProcessor(settings)
53 # JSON-specific configuration
54 self.json_config = settings.global_config.chunking.strategies.json_strategy
55 self.simple_chunking_threshold = (
56 500_000 # Use simple chunking for files larger than 500KB
57 )
59 def chunk_document(self, document: Document) -> list[Document]:
60 """Chunk a JSON document using modern modular approach.
62 Args:
63 document: Document to chunk
65 Returns:
66 List of chunked documents
67 """
68 file_name = (
69 document.metadata.get("file_name")
70 or document.metadata.get("original_filename")
71 or document.title
72 or f"{document.source_type}:{document.source}"
73 )
75 # Start progress tracking
76 self.progress_tracker.start_chunking(
77 document.id,
78 document.source,
79 document.source_type,
80 len(document.content),
81 file_name,
82 )
84 try:
85 # Performance check: for very large files, use simple chunking
86 if len(document.content) > self.simple_chunking_threshold:
87 self.progress_tracker.log_fallback(
88 document.id, f"Large JSON file ({len(document.content)} bytes)"
89 )
90 return self._fallback_chunking(document)
92 # Step 1: Parse document structure using JSONDocumentParser
93 document_structure = self.document_parser.parse_document_structure(
94 document.content
95 )
97 if not document_structure.get("valid_json", False):
98 self.progress_tracker.log_fallback(
99 document.id, "Invalid JSON structure"
100 )
101 return self._fallback_chunking(document)
103 # Step 2: Parse JSON into element tree
104 root_element = self.document_parser.parse_json_structure(document.content)
106 if not root_element:
107 self.progress_tracker.log_fallback(document.id, "JSON parsing failed")
108 return self._fallback_chunking(document)
110 # Step 3: Get elements to chunk
111 elements_to_chunk = []
112 if root_element.children:
113 # Use top-level children as chunks
114 elements_to_chunk = root_element.children
115 else:
116 # Use root element if no children
117 elements_to_chunk = [root_element]
119 # Step 4: Apply section splitter for grouping and splitting
120 final_elements = self.section_splitter.split_json_elements(
121 elements_to_chunk
122 )
124 if not final_elements:
125 self.progress_tracker.finish_chunking(document.id, 0, "json")
126 return []
128 # Step 5: Create chunked documents using chunk processor
129 chunked_docs = []
130 for i, element in enumerate(final_elements):
131 self.logger.debug(
132 f"Processing element {i+1}/{len(final_elements)}",
133 extra={
134 "element_name": element.name,
135 "element_type": element.element_type.value,
136 "content_size": element.size,
137 },
138 )
140 # Extract element-specific metadata
141 element_metadata = (
142 self.metadata_extractor.extract_json_element_metadata(element)
143 )
145 # Extract hierarchical metadata from content
146 hierarchical_metadata = (
147 self.metadata_extractor.extract_hierarchical_metadata(
148 element.content, element_metadata, document
149 )
150 )
152 # Create chunk document using processor
153 chunk_doc = self.chunk_processor.create_json_element_chunk_document(
154 original_doc=document,
155 element=element,
156 chunk_index=i,
157 total_chunks=len(final_elements),
158 element_metadata=hierarchical_metadata,
159 )
161 chunked_docs.append(chunk_doc)
163 # Log completion
164 self.progress_tracker.finish_chunking(
165 document.id, len(chunked_docs), "json"
166 )
167 self.logger.info(
168 f"Successfully chunked JSON document into {len(chunked_docs)} chunks using modular architecture",
169 extra={
170 "document_id": document.id,
171 "original_size": len(document.content),
172 "chunks_created": len(chunked_docs),
173 "schema_inference_enabled": self.json_config.enable_schema_inference,
174 },
175 )
177 return chunked_docs
179 except Exception as e:
180 self.logger.error(
181 f"Error chunking JSON document: {e}",
182 extra={"document_id": document.id, "error": str(e)},
183 exc_info=True,
184 )
185 self.progress_tracker.log_fallback(document.id, f"Error: {e}")
186 return self._fallback_chunking(document)
188 def _fallback_chunking(self, document: Document) -> list[Document]:
189 """Fallback to simple text-based chunking for problematic JSON.
191 Args:
192 document: Document to chunk
194 Returns:
195 List of chunked documents using simple strategy
196 """
197 try:
198 # Use text-based chunking as fallback
199 content = document.content
200 chunks = []
202 chunk_size = self.settings.global_config.chunking.chunk_size
203 overlap = self.settings.global_config.chunking.chunk_overlap
205 # Simple chunking by lines to preserve some JSON structure
206 lines = content.split("\n")
207 current_chunk_lines = []
208 current_size = 0
209 chunk_index = 0
211 for line in lines:
212 line_size = len(line) + 1 # +1 for newline
214 if current_size + line_size > chunk_size and current_chunk_lines:
215 # Create chunk from current lines
216 chunk_content = "\n".join(current_chunk_lines)
218 # Create basic metadata for fallback chunk
219 fallback_metadata = {
220 "chunk_index": chunk_index,
221 "chunk_size": len(chunk_content),
222 "content_type": "json_fallback",
223 "processing_mode": "fallback",
224 "chunking_strategy": "json_fallback",
225 }
227 chunk_doc = self.chunk_processor.create_chunk_document(
228 original_doc=document,
229 chunk_content=chunk_content,
230 chunk_index=chunk_index,
231 total_chunks=-1, # Unknown at this point
232 chunk_metadata=fallback_metadata,
233 skip_nlp=True, # Skip NLP for fallback chunks
234 )
236 chunks.append(chunk_doc)
238 # Setup for next chunk with overlap
239 overlap_lines = (
240 current_chunk_lines[-overlap // 50 :] if overlap > 0 else []
241 )
242 current_chunk_lines = overlap_lines + [line]
243 current_size = sum(
244 len(line_item) + 1 for line_item in current_chunk_lines
245 )
246 chunk_index += 1
247 else:
248 current_chunk_lines.append(line)
249 current_size += line_size
251 # Add final chunk
252 if current_chunk_lines:
253 chunk_content = "\n".join(current_chunk_lines)
254 fallback_metadata = {
255 "chunk_index": chunk_index,
256 "chunk_size": len(chunk_content),
257 "content_type": "json_fallback",
258 "processing_mode": "fallback",
259 "chunking_strategy": "json_fallback",
260 }
262 chunk_doc = self.chunk_processor.create_chunk_document(
263 original_doc=document,
264 chunk_content=chunk_content,
265 chunk_index=chunk_index,
266 total_chunks=chunk_index + 1,
267 chunk_metadata=fallback_metadata,
268 skip_nlp=True,
269 )
270 chunks.append(chunk_doc)
272 # Update total_chunks in all chunk metadata
273 for chunk in chunks:
274 chunk.metadata["total_chunks"] = len(chunks)
276 self.logger.warning(
277 f"Used fallback chunking for JSON document, created {len(chunks)} chunks",
278 extra={"document_id": document.id, "chunks_created": len(chunks)},
279 )
281 return chunks
283 except Exception as e:
284 self.logger.error(
285 f"Fallback chunking failed: {e}",
286 extra={"document_id": document.id, "error": str(e)},
287 exc_info=True,
288 )
289 # Ultimate fallback: return original document as single chunk
290 return [document]
292 def get_strategy_name(self) -> str:
293 """Get the name of this chunking strategy.
295 Returns:
296 Strategy name
297 """
298 return "json_modular"
300 def supports_document_type(self, document: Document) -> bool:
301 """Check if this strategy supports the given document type.
303 Args:
304 document: Document to check
306 Returns:
307 True if this strategy can handle the document
308 """
309 # Check file extension
310 if hasattr(document, "source") and document.source:
311 if document.source.lower().endswith(".json"):
312 return True
314 # Check content type metadata
315 content_type = document.metadata.get("content_type", "").lower()
316 if "json" in content_type:
317 return True
319 # Try to parse as JSON
320 try:
321 json.loads(document.content[:1000]) # Test first 1KB
322 return True
323 except (json.JSONDecodeError, AttributeError):
324 return False
326 def estimate_chunk_count(self, document: Document) -> int:
327 """Estimate the number of chunks this strategy will create.
329 Args:
330 document: Document to estimate for
332 Returns:
333 Estimated number of chunks
334 """
335 try:
336 # Quick structure analysis for estimation
337 structure = self.document_parser.parse_document_structure(document.content)
339 if structure.get("valid_json", False):
340 total_elements = structure.get("total_elements", 1)
341 complexity_score = structure.get("complexity_score", 1.0)
343 # Estimate based on elements and complexity
344 estimated_chunks = max(1, int(total_elements * complexity_score / 10))
346 # Apply limits
347 max_chunks = self.json_config.max_objects_to_process
348 return min(estimated_chunks, max_chunks)
349 else:
350 # Fallback estimation
351 return max(
352 1,
353 len(document.content)
354 // self.settings.global_config.chunking.chunk_size,
355 )
357 except Exception:
358 # Ultimate fallback
359 return max(
360 1,
361 len(document.content)
362 // self.settings.global_config.chunking.chunk_size,
363 )
365 def shutdown(self):
366 """Clean up resources used by the strategy."""
367 # Clean up any cached data
368 if hasattr(self, "_processed_chunks"):
369 self._processed_chunks.clear()
371 # Log shutdown
372 self.logger.debug("JSON chunking strategy (modular) shutdown completed")
374 def __str__(self) -> str:
375 """String representation of the strategy."""
376 return f"JSONChunkingStrategy(modular, schema_inference={self.json_config.enable_schema_inference})"
378 def __repr__(self) -> str:
379 """Detailed string representation of the strategy."""
380 return (
381 f"JSONChunkingStrategy("
382 f"modular=True, "
383 f"max_objects={self.json_config.max_objects_to_process}, "
384 f"max_chunk_size_for_nlp={self.json_config.max_chunk_size_for_nlp}, "
385 f"schema_inference={self.json_config.enable_schema_inference}"
386 f")"
387 )