Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 98%
316 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""JSON-specific chunking strategy for structured data."""
3import json
4from dataclasses import dataclass, field
5from enum import Enum
6from typing import TYPE_CHECKING, Any, Optional
8import structlog
10from qdrant_loader.config import Settings
11from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
13from qdrant_loader.core.document import Document
15if TYPE_CHECKING:
16 pass
18logger = structlog.get_logger(__name__)
20# Performance constants to prevent timeouts
21MAX_JSON_SIZE_FOR_PARSING = 1_000_000 # 1MB limit for JSON parsing
22MAX_OBJECTS_TO_PROCESS = 200 # Reduced limit for objects to prevent timeouts
23MAX_CHUNK_SIZE_FOR_NLP = 20_000 # 20KB limit for NLP processing
24MAX_RECURSION_DEPTH = 5 # Limit recursion depth for nested structures
25MAX_ARRAY_ITEMS_TO_PROCESS = 50 # Limit array items to process
26MAX_OBJECT_KEYS_TO_PROCESS = 100 # Limit object keys to process
27SIMPLE_CHUNKING_THRESHOLD = 500_000 # Use simple chunking for files larger than 500KB
30class JSONElementType(Enum):
31 """Types of JSON elements."""
33 OBJECT = "object"
34 ARRAY = "array"
35 ARRAY_ITEM = "array_item"
36 PROPERTY = "property"
37 VALUE = "value"
38 ROOT = "root"
41@dataclass
42class JSONElement:
43 """Represents a JSON element with its metadata."""
45 name: str
46 element_type: JSONElementType
47 content: str
48 value: Any
49 path: str # JSON path like "root.users[0].name"
50 level: int = 0
51 parent: Optional["JSONElement"] = None
52 children: list["JSONElement"] = field(default_factory=list)
53 size: int = 0 # Size in characters
54 item_count: int = 0 # Number of items for arrays/objects
56 def add_child(self, child: "JSONElement"):
57 """Add a child element."""
58 self.children.append(child)
59 child.parent = self
62class JSONChunkingStrategy(BaseChunkingStrategy):
63 """Strategy for chunking JSON documents based on structure.
65 This strategy parses JSON structure and creates chunks based on:
66 - Top-level objects and arrays
67 - Large nested objects
68 - Array items (grouped when small)
69 - Preserving JSON structure and hierarchy
70 """
72 def __init__(self, settings: Settings):
73 """Initialize the JSON chunking strategy.
75 Args:
76 settings: Configuration settings
77 """
78 super().__init__(settings)
79 self.logger = logger
80 self.progress_tracker = ChunkingProgressTracker(logger)
82 # Cache for processed chunks
83 self._processed_chunks = {}
85 # Minimum size for standalone chunks
86 self.min_chunk_size = 200
88 # Maximum items to group together in arrays
89 self.max_array_items_per_chunk = 50
91 def _parse_json_structure(self, content: str) -> JSONElement | None:
92 """Parse JSON content into structured elements.
94 Args:
95 content: JSON content to parse
97 Returns:
98 Root JSON element or None if parsing fails
99 """
100 # Performance check: skip parsing for very large files
101 if len(content) > MAX_JSON_SIZE_FOR_PARSING:
102 self.logger.info(
103 f"JSON too large for structured parsing ({len(content)} bytes)"
104 )
105 return None
107 try:
108 data = json.loads(content)
109 root_element = self._create_json_element(
110 "root", data, JSONElementType.ROOT, "root"
111 )
112 # Initialize processed count for this document
113 processed_count = [0]
114 self._extract_json_elements(root_element, data, "root", 0, processed_count)
115 self.logger.debug(
116 f"Processed {processed_count[0]} JSON elements (limit: {MAX_OBJECTS_TO_PROCESS})"
117 )
118 return root_element
120 except json.JSONDecodeError as e:
121 self.logger.warning(f"Failed to parse JSON: {e}")
122 return None
123 except Exception as e:
124 self.logger.warning(f"Error parsing JSON structure: {e}")
125 return None
127 def _create_json_element(
128 self,
129 name: str,
130 value: Any,
131 element_type: JSONElementType,
132 path: str,
133 level: int = 0,
134 ) -> JSONElement:
135 """Create a JSON element from a value.
137 Args:
138 name: Element name
139 value: JSON value
140 element_type: Type of JSON element
141 path: JSON path
142 level: Nesting level
144 Returns:
145 JSONElement instance
146 """
147 # Convert value to JSON string for content
148 try:
149 content = json.dumps(value, indent=2, ensure_ascii=False)
150 except (TypeError, ValueError):
151 content = str(value)
153 # Calculate size and item count
154 size = len(content)
155 item_count = 0
157 if isinstance(value, dict):
158 item_count = len(value)
159 elif isinstance(value, list):
160 item_count = len(value)
162 return JSONElement(
163 name=name,
164 element_type=element_type,
165 content=content,
166 value=value,
167 path=path,
168 level=level,
169 size=size,
170 item_count=item_count,
171 )
173 def _extract_json_elements(
174 self,
175 parent_element: JSONElement,
176 data: Any,
177 path: str,
178 level: int = 0,
179 processed_count: list[int] | None = None,
180 ):
181 """Recursively extract JSON elements.
183 Args:
184 parent_element: Parent JSON element
185 data: JSON data to process
186 path: Current JSON path
187 level: Current nesting level
188 processed_count: Mutable list to track total processed objects
189 """
190 if processed_count is None:
191 processed_count = [0]
193 # Performance checks
194 if level > MAX_RECURSION_DEPTH: # Limit recursion depth
195 return
196 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: # Global limit
197 return
198 if len(parent_element.children) >= MAX_ARRAY_ITEMS_TO_PROCESS: # Local limit
199 return
201 if isinstance(data, dict):
202 for i, (key, value) in enumerate(data.items()):
203 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS:
204 break
205 if i >= MAX_OBJECT_KEYS_TO_PROCESS: # Limit keys per object
206 break
208 processed_count[0] += 1
209 child_path = f"{path}.{key}"
211 if isinstance(value, dict | list):
212 # Create element for complex values
213 element_type = (
214 JSONElementType.OBJECT
215 if isinstance(value, dict)
216 else JSONElementType.ARRAY
217 )
218 child_element = self._create_json_element(
219 key, value, element_type, child_path, level + 1
220 )
221 parent_element.add_child(child_element)
223 # Recursively process if not too large
224 if child_element.size < self.chunk_size:
225 self._extract_json_elements(
226 child_element, value, child_path, level + 1, processed_count
227 )
228 else:
229 # Create element for simple values
230 child_element = self._create_json_element(
231 key, value, JSONElementType.PROPERTY, child_path, level + 1
232 )
233 parent_element.add_child(child_element)
235 elif isinstance(data, list):
236 for i, item in enumerate(data):
237 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS:
238 break
239 if i >= MAX_ARRAY_ITEMS_TO_PROCESS: # Limit array items
240 break
242 processed_count[0] += 1
243 child_path = f"{path}[{i}]"
245 if isinstance(item, dict | list):
246 # Create element for complex array items
247 element_type = (
248 JSONElementType.OBJECT
249 if isinstance(item, dict)
250 else JSONElementType.ARRAY
251 )
252 child_element = self._create_json_element(
253 f"item_{i}", item, element_type, child_path, level + 1
254 )
255 parent_element.add_child(child_element)
257 # Recursively process if not too large
258 if child_element.size < self.chunk_size:
259 self._extract_json_elements(
260 child_element, item, child_path, level + 1, processed_count
261 )
262 else:
263 # Create element for simple array items
264 child_element = self._create_json_element(
265 f"item_{i}",
266 item,
267 JSONElementType.ARRAY_ITEM,
268 child_path,
269 level + 1,
270 )
271 parent_element.add_child(child_element)
273 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]:
274 """Group small JSON elements into larger chunks.
276 Args:
277 elements: List of JSON elements
279 Returns:
280 List of grouped elements
281 """
282 if not elements:
283 return []
285 grouped = []
286 current_group = []
287 current_size = 0
289 for element in elements:
290 # If element is large enough or is a significant structure, keep it separate
291 if (
292 element.size >= self.min_chunk_size
293 or element.element_type
294 in [JSONElementType.OBJECT, JSONElementType.ARRAY]
295 or element.item_count > MAX_OBJECT_KEYS_TO_PROCESS
296 ):
298 # First, add any accumulated small elements
299 if current_group:
300 grouped_element = self._create_grouped_element(current_group)
301 grouped.append(grouped_element)
302 current_group = []
303 current_size = 0
305 # Add the large element
306 grouped.append(element)
307 else:
308 # Accumulate small elements
309 current_group.append(element)
310 current_size += element.size
312 # If accumulated size is large enough, create a grouped element
313 if (
314 current_size >= self.min_chunk_size
315 or len(current_group) >= self.max_array_items_per_chunk
316 ):
317 grouped_element = self._create_grouped_element(current_group)
318 grouped.append(grouped_element)
319 current_group = []
320 current_size = 0
322 # Handle remaining small elements
323 if current_group:
324 grouped_element = self._create_grouped_element(current_group)
325 grouped.append(grouped_element)
327 return grouped
329 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement:
330 """Create a grouped element from multiple small elements.
332 Args:
333 elements: List of elements to group
335 Returns:
336 Grouped JSON element
337 """
338 if not elements:
339 raise ValueError("Cannot group empty list of elements")
341 if len(elements) == 1:
342 return elements[0]
344 # Create grouped content
345 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements):
346 # Group array items into an array
347 grouped_value = [elem.value for elem in elements]
348 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False)
349 element_type = JSONElementType.ARRAY
350 name = f"grouped_items_{len(elements)}"
351 else:
352 # Group mixed elements into an object
353 grouped_value = {}
354 for elem in elements:
355 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}"
356 grouped_value[key] = elem.value
357 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False)
358 element_type = JSONElementType.OBJECT
359 name = f"grouped_elements_{len(elements)}"
361 # Use the first element's path as base
362 base_path = elements[0].path
363 parent_path = (
364 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root"
365 )
366 grouped_path = f"{parent_path}.{name}"
368 grouped_element = JSONElement(
369 name=name,
370 element_type=element_type,
371 content=grouped_content,
372 value=grouped_value,
373 path=grouped_path,
374 level=min(elem.level for elem in elements),
375 size=len(grouped_content),
376 item_count=len(elements),
377 )
379 return grouped_element
381 def _split_large_element(self, element: JSONElement) -> list[JSONElement]:
382 """Split a large JSON element into smaller chunks.
384 Args:
385 element: Large JSON element to split
387 Returns:
388 List of smaller elements
389 """
390 if element.size <= self.chunk_size:
391 return [element]
393 chunks = []
395 if element.element_type == JSONElementType.ARRAY and isinstance(
396 element.value, list
397 ):
398 # Split array into smaller arrays
399 items = element.value
400 chunk_size = self.max_array_items_per_chunk
402 for i in range(0, len(items), chunk_size):
403 chunk_items = items[i : i + chunk_size]
404 chunk_content = json.dumps(chunk_items, indent=2, ensure_ascii=False)
406 chunk_element = JSONElement(
407 name=f"{element.name}_chunk_{i//chunk_size + 1}",
408 element_type=JSONElementType.ARRAY,
409 content=chunk_content,
410 value=chunk_items,
411 path=f"{element.path}_chunk_{i//chunk_size + 1}",
412 level=element.level,
413 size=len(chunk_content),
414 item_count=len(chunk_items),
415 )
416 chunks.append(chunk_element)
418 elif element.element_type == JSONElementType.OBJECT and isinstance(
419 element.value, dict
420 ):
421 # Split object by grouping properties
422 items = list(element.value.items())
423 current_chunk = {}
424 current_size = 0
425 chunk_index = 1
427 for key, value in items:
428 item_content = json.dumps({key: value}, indent=2, ensure_ascii=False)
429 item_size = len(item_content)
431 if current_size + item_size > self.chunk_size and current_chunk:
432 # Create chunk from current items
433 chunk_content = json.dumps(
434 current_chunk, indent=2, ensure_ascii=False
435 )
436 chunk_element = JSONElement(
437 name=f"{element.name}_chunk_{chunk_index}",
438 element_type=JSONElementType.OBJECT,
439 content=chunk_content,
440 value=current_chunk.copy(),
441 path=f"{element.path}_chunk_{chunk_index}",
442 level=element.level,
443 size=len(chunk_content),
444 item_count=len(current_chunk),
445 )
446 chunks.append(chunk_element)
448 # Start new chunk
449 current_chunk = {key: value}
450 current_size = item_size
451 chunk_index += 1
452 else:
453 current_chunk[key] = value
454 current_size += item_size
456 # Add remaining items
457 if current_chunk:
458 chunk_content = json.dumps(current_chunk, indent=2, ensure_ascii=False)
459 chunk_element = JSONElement(
460 name=f"{element.name}_chunk_{chunk_index}",
461 element_type=JSONElementType.OBJECT,
462 content=chunk_content,
463 value=current_chunk,
464 path=f"{element.path}_chunk_{chunk_index}",
465 level=element.level,
466 size=len(chunk_content),
467 item_count=len(current_chunk),
468 )
469 chunks.append(chunk_element)
470 else:
471 # For other types, split by lines as fallback
472 lines = element.content.split("\n")
473 current_chunk_lines = []
474 current_size = 0
475 chunk_index = 1
477 for line in lines:
478 line_size = len(line) + 1 # +1 for newline
480 if current_size + line_size > self.chunk_size and current_chunk_lines:
481 chunk_content = "\n".join(current_chunk_lines)
482 chunk_element = JSONElement(
483 name=f"{element.name}_chunk_{chunk_index}",
484 element_type=element.element_type,
485 content=chunk_content,
486 value=chunk_content, # Use content as value for text chunks
487 path=f"{element.path}_chunk_{chunk_index}",
488 level=element.level,
489 size=len(chunk_content),
490 item_count=len(current_chunk_lines),
491 )
492 chunks.append(chunk_element)
494 current_chunk_lines = [line]
495 current_size = line_size
496 chunk_index += 1
497 else:
498 current_chunk_lines.append(line)
499 current_size += line_size
501 # Add remaining lines
502 if current_chunk_lines:
503 chunk_content = "\n".join(current_chunk_lines)
504 chunk_element = JSONElement(
505 name=f"{element.name}_chunk_{chunk_index}",
506 element_type=element.element_type,
507 content=chunk_content,
508 value=chunk_content,
509 path=f"{element.path}_chunk_{chunk_index}",
510 level=element.level,
511 size=len(chunk_content),
512 item_count=len(current_chunk_lines),
513 )
514 chunks.append(chunk_element)
516 return chunks if chunks else [element]
518 def _extract_json_metadata(self, element: JSONElement) -> dict[str, Any]:
519 """Extract metadata from a JSON element.
521 Args:
522 element: JSON element to analyze
524 Returns:
525 Dictionary containing element metadata
526 """
527 metadata = {
528 "element_type": element.element_type.value,
529 "name": element.name,
530 "path": element.path,
531 "level": element.level,
532 "size": element.size,
533 "item_count": element.item_count,
534 "has_nested_objects": False,
535 "has_arrays": False,
536 "data_types": [],
537 }
539 # Analyze value types
540 if isinstance(element.value, dict):
541 metadata["data_types"] = list(
542 {type(v).__name__ for v in element.value.values()}
543 )
544 metadata["has_nested_objects"] = any(
545 isinstance(v, dict) for v in element.value.values()
546 )
547 metadata["has_arrays"] = any(
548 isinstance(v, list) for v in element.value.values()
549 )
550 elif isinstance(element.value, list) and element.value:
551 metadata["data_types"] = list({type(v).__name__ for v in element.value})
552 metadata["has_nested_objects"] = any(
553 isinstance(v, dict) for v in element.value
554 )
555 metadata["has_arrays"] = any(isinstance(v, list) for v in element.value)
556 else:
557 metadata["data_types"] = [type(element.value).__name__]
559 # Add parent context
560 if element.parent:
561 metadata.update(
562 {
563 "parent_name": element.parent.name,
564 "parent_type": element.parent.element_type.value,
565 "parent_path": element.parent.path,
566 }
567 )
569 return metadata
571 def chunk_document(self, document: Document) -> list[Document]:
572 """Chunk a JSON document using structural boundaries.
574 Args:
575 document: Document to chunk
577 Returns:
578 List of chunked documents
579 """
580 file_name = (
581 document.metadata.get("file_name")
582 or document.metadata.get("original_filename")
583 or document.title
584 or f"{document.source_type}:{document.source}"
585 )
587 # Start progress tracking
588 self.progress_tracker.start_chunking(
589 document.id,
590 document.source,
591 document.source_type,
592 len(document.content),
593 file_name,
594 )
596 try:
597 # Performance check: for very large files, use simple chunking
598 if len(document.content) > SIMPLE_CHUNKING_THRESHOLD:
599 self.progress_tracker.log_fallback(
600 document.id, f"Large JSON file ({len(document.content)} bytes)"
601 )
602 return self._fallback_chunking(document)
604 # Parse JSON structure
605 root_element = self._parse_json_structure(document.content)
607 if not root_element:
608 self.progress_tracker.log_fallback(document.id, "JSON parsing failed")
609 return self._fallback_chunking(document)
611 # Get all elements to chunk
612 elements_to_chunk = []
614 if root_element.children:
615 # Use top-level children as chunks
616 elements_to_chunk = root_element.children
617 else:
618 # Use root element if no children
619 elements_to_chunk = [root_element]
621 # Group small elements and split large ones
622 grouped_elements = self._group_small_elements(elements_to_chunk)
623 final_elements = []
625 for element in grouped_elements:
626 if element.size > self.chunk_size:
627 # Split large elements
628 split_elements = self._split_large_element(element)
629 final_elements.extend(split_elements)
630 else:
631 final_elements.append(element)
633 # Limit total elements
634 final_elements = final_elements[:MAX_OBJECTS_TO_PROCESS]
636 if not final_elements:
637 self.progress_tracker.finish_chunking(document.id, 0, "json")
638 return []
640 # Create chunked documents
641 chunked_docs = []
642 for i, element in enumerate(final_elements):
643 self.logger.debug(
644 f"Processing element {i+1}/{len(final_elements)}",
645 extra={
646 "element_name": element.name,
647 "element_type": element.element_type.value,
648 "content_size": element.size,
649 },
650 )
652 # Create chunk document with optimized metadata processing
653 skip_nlp = element.size > MAX_CHUNK_SIZE_FOR_NLP
655 if skip_nlp:
656 # Create chunk without expensive NLP processing
657 chunk_doc = self._create_optimized_chunk_document(
658 original_doc=document,
659 chunk_content=element.content,
660 chunk_index=i,
661 total_chunks=len(final_elements),
662 skip_nlp=True,
663 )
664 else:
665 # Use normal processing for smaller chunks
666 chunk_doc = self._create_chunk_document(
667 original_doc=document,
668 chunk_content=element.content,
669 chunk_index=i,
670 total_chunks=len(final_elements),
671 )
673 # Add JSON-specific metadata
674 json_metadata = self._extract_json_metadata(element)
675 json_metadata["chunking_strategy"] = "json"
676 json_metadata["chunking_method"] = "structured_json"
677 json_metadata["parent_document_id"] = document.id
678 chunk_doc.metadata.update(json_metadata)
680 chunked_docs.append(chunk_doc)
682 # Finish progress tracking
683 self.progress_tracker.finish_chunking(
684 document.id, len(chunked_docs), "json"
685 )
686 return chunked_docs
688 except Exception as e:
689 self.progress_tracker.log_error(document.id, str(e))
690 # Fallback to default chunking
691 self.progress_tracker.log_fallback(
692 document.id, f"JSON processing failed: {str(e)}"
693 )
694 return self._fallback_chunking(document)
696 def _create_optimized_chunk_document(
697 self,
698 original_doc: Document,
699 chunk_content: str,
700 chunk_index: int,
701 total_chunks: int,
702 skip_nlp: bool = False,
703 ) -> Document:
704 """Create a chunk document with optimized processing.
706 Args:
707 original_doc: Original document
708 chunk_content: Content of the chunk
709 chunk_index: Index of the chunk
710 total_chunks: Total number of chunks
711 skip_nlp: Whether to skip NLP processing
713 Returns:
714 Document: New document instance for the chunk
715 """
716 # Create enhanced metadata
717 metadata = original_doc.metadata.copy()
718 metadata.update(
719 {
720 "chunk_index": chunk_index,
721 "total_chunks": total_chunks,
722 }
723 )
725 if skip_nlp:
726 # Skip expensive NLP processing for large chunks
727 metadata.update(
728 {
729 "entities": [],
730 "pos_tags": [],
731 "nlp_skipped": True,
732 "skip_reason": "chunk_too_large",
733 }
734 )
735 else:
736 try:
737 # Process the chunk text to get additional features
738 processed = self._process_text(chunk_content)
739 metadata.update(
740 {
741 "entities": processed["entities"],
742 "pos_tags": processed["pos_tags"],
743 "nlp_skipped": False,
744 }
745 )
746 except Exception as e:
747 self.logger.warning(
748 f"NLP processing failed for chunk {chunk_index}: {e}"
749 )
750 metadata.update(
751 {
752 "entities": [],
753 "pos_tags": [],
754 "nlp_skipped": True,
755 "skip_reason": "nlp_error",
756 }
757 )
759 return Document(
760 content=chunk_content,
761 metadata=metadata,
762 source=original_doc.source,
763 source_type=original_doc.source_type,
764 url=original_doc.url,
765 title=original_doc.title,
766 content_type=original_doc.content_type,
767 )
769 def _fallback_chunking(self, document: Document) -> list[Document]:
770 """Fallback to simple text-based chunking when JSON parsing fails.
772 Args:
773 document: Document to chunk
775 Returns:
776 List of chunked documents
777 """
778 self.logger.warning("Falling back to simple text chunking for JSON document")
780 # Use simple line-based splitting for JSON
781 lines = document.content.split("\n")
782 chunks = []
783 current_chunk = []
784 current_size = 0
786 for line in lines:
787 line_size = len(line) + 1 # +1 for newline
789 if current_size + line_size > self.chunk_size and current_chunk:
790 chunks.append("\n".join(current_chunk))
791 current_chunk = [line]
792 current_size = line_size
793 else:
794 current_chunk.append(line)
795 current_size += line_size
797 # Add remaining lines
798 if current_chunk:
799 chunks.append("\n".join(current_chunk))
801 # Create chunk documents (limited)
802 chunked_docs = []
803 for i, chunk_content in enumerate(chunks[:MAX_OBJECTS_TO_PROCESS]):
804 chunk_doc = self._create_optimized_chunk_document(
805 original_doc=document,
806 chunk_content=chunk_content,
807 chunk_index=i,
808 total_chunks=len(chunks),
809 skip_nlp=len(chunk_content) > MAX_CHUNK_SIZE_FOR_NLP,
810 )
812 chunk_doc.id = Document.generate_chunk_id(document.id, i)
813 chunk_doc.metadata["parent_document_id"] = document.id
814 chunk_doc.metadata["chunking_method"] = "fallback_text"
816 chunked_docs.append(chunk_doc)
818 return chunked_docs
820 def _split_text(self, text: str) -> list[str]:
821 """Split text into chunks (required by base class).
823 Args:
824 text: Text to split
826 Returns:
827 List of text chunks
828 """
829 # This method is required by the base class but not used in our implementation
830 # We override chunk_document instead
831 return [text]