Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 98%
318 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""JSON-specific chunking strategy for structured data."""
3import json
4import re
5from dataclasses import dataclass, field
6from enum import Enum
7from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union
9import structlog
11from qdrant_loader.config import Settings
12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
13from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
14from qdrant_loader.core.document import Document
15from qdrant_loader.utils.logging import LoggingConfig
17if TYPE_CHECKING:
18 pass
20logger = structlog.get_logger(__name__)
22# Performance constants to prevent timeouts
23MAX_JSON_SIZE_FOR_PARSING = 1_000_000 # 1MB limit for JSON parsing
24MAX_OBJECTS_TO_PROCESS = 200 # Reduced limit for objects to prevent timeouts
25MAX_CHUNK_SIZE_FOR_NLP = 20_000 # 20KB limit for NLP processing
26MAX_RECURSION_DEPTH = 5 # Limit recursion depth for nested structures
27MAX_ARRAY_ITEMS_TO_PROCESS = 50 # Limit array items to process
28MAX_OBJECT_KEYS_TO_PROCESS = 100 # Limit object keys to process
29SIMPLE_CHUNKING_THRESHOLD = 500_000 # Use simple chunking for files larger than 500KB
32class JSONElementType(Enum):
33 """Types of JSON elements."""
35 OBJECT = "object"
36 ARRAY = "array"
37 ARRAY_ITEM = "array_item"
38 PROPERTY = "property"
39 VALUE = "value"
40 ROOT = "root"
43@dataclass
44class JSONElement:
45 """Represents a JSON element with its metadata."""
47 name: str
48 element_type: JSONElementType
49 content: str
50 value: Any
51 path: str # JSON path like "root.users[0].name"
52 level: int = 0
53 parent: Optional["JSONElement"] = None
54 children: list["JSONElement"] = field(default_factory=list)
55 size: int = 0 # Size in characters
56 item_count: int = 0 # Number of items for arrays/objects
58 def add_child(self, child: "JSONElement"):
59 """Add a child element."""
60 self.children.append(child)
61 child.parent = self
64class JSONChunkingStrategy(BaseChunkingStrategy):
65 """Strategy for chunking JSON documents based on structure.
67 This strategy parses JSON structure and creates chunks based on:
68 - Top-level objects and arrays
69 - Large nested objects
70 - Array items (grouped when small)
71 - Preserving JSON structure and hierarchy
72 """
74 def __init__(self, settings: Settings):
75 """Initialize the JSON chunking strategy.
77 Args:
78 settings: Configuration settings
79 """
80 super().__init__(settings)
81 self.logger = logger
82 self.progress_tracker = ChunkingProgressTracker(logger)
84 # Cache for processed chunks
85 self._processed_chunks = {}
87 # Minimum size for standalone chunks
88 self.min_chunk_size = 200
90 # Maximum items to group together in arrays
91 self.max_array_items_per_chunk = 50
93 def _parse_json_structure(self, content: str) -> JSONElement | None:
94 """Parse JSON content into structured elements.
96 Args:
97 content: JSON content to parse
99 Returns:
100 Root JSON element or None if parsing fails
101 """
102 # Performance check: skip parsing for very large files
103 if len(content) > MAX_JSON_SIZE_FOR_PARSING:
104 self.logger.info(
105 f"JSON too large for structured parsing ({len(content)} bytes)"
106 )
107 return None
109 try:
110 data = json.loads(content)
111 root_element = self._create_json_element(
112 "root", data, JSONElementType.ROOT, "root"
113 )
114 # Initialize processed count for this document
115 processed_count = [0]
116 self._extract_json_elements(root_element, data, "root", 0, processed_count)
117 self.logger.debug(
118 f"Processed {processed_count[0]} JSON elements (limit: {MAX_OBJECTS_TO_PROCESS})"
119 )
120 return root_element
122 except json.JSONDecodeError as e:
123 self.logger.warning(f"Failed to parse JSON: {e}")
124 return None
125 except Exception as e:
126 self.logger.warning(f"Error parsing JSON structure: {e}")
127 return None
129 def _create_json_element(
130 self,
131 name: str,
132 value: Any,
133 element_type: JSONElementType,
134 path: str,
135 level: int = 0,
136 ) -> JSONElement:
137 """Create a JSON element from a value.
139 Args:
140 name: Element name
141 value: JSON value
142 element_type: Type of JSON element
143 path: JSON path
144 level: Nesting level
146 Returns:
147 JSONElement instance
148 """
149 # Convert value to JSON string for content
150 try:
151 content = json.dumps(value, indent=2, ensure_ascii=False)
152 except (TypeError, ValueError):
153 content = str(value)
155 # Calculate size and item count
156 size = len(content)
157 item_count = 0
159 if isinstance(value, dict):
160 item_count = len(value)
161 elif isinstance(value, list):
162 item_count = len(value)
164 return JSONElement(
165 name=name,
166 element_type=element_type,
167 content=content,
168 value=value,
169 path=path,
170 level=level,
171 size=size,
172 item_count=item_count,
173 )
175 def _extract_json_elements(
176 self,
177 parent_element: JSONElement,
178 data: Any,
179 path: str,
180 level: int = 0,
181 processed_count: list[int] | None = None,
182 ):
183 """Recursively extract JSON elements.
185 Args:
186 parent_element: Parent JSON element
187 data: JSON data to process
188 path: Current JSON path
189 level: Current nesting level
190 processed_count: Mutable list to track total processed objects
191 """
192 if processed_count is None:
193 processed_count = [0]
195 # Performance checks
196 if level > MAX_RECURSION_DEPTH: # Limit recursion depth
197 return
198 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: # Global limit
199 return
200 if len(parent_element.children) >= MAX_ARRAY_ITEMS_TO_PROCESS: # Local limit
201 return
203 if isinstance(data, dict):
204 for i, (key, value) in enumerate(data.items()):
205 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS:
206 break
207 if i >= MAX_OBJECT_KEYS_TO_PROCESS: # Limit keys per object
208 break
210 processed_count[0] += 1
211 child_path = f"{path}.{key}"
213 if isinstance(value, dict | list):
214 # Create element for complex values
215 element_type = (
216 JSONElementType.OBJECT
217 if isinstance(value, dict)
218 else JSONElementType.ARRAY
219 )
220 child_element = self._create_json_element(
221 key, value, element_type, child_path, level + 1
222 )
223 parent_element.add_child(child_element)
225 # Recursively process if not too large
226 if child_element.size < self.chunk_size:
227 self._extract_json_elements(
228 child_element, value, child_path, level + 1, processed_count
229 )
230 else:
231 # Create element for simple values
232 child_element = self._create_json_element(
233 key, value, JSONElementType.PROPERTY, child_path, level + 1
234 )
235 parent_element.add_child(child_element)
237 elif isinstance(data, list):
238 for i, item in enumerate(data):
239 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS:
240 break
241 if i >= MAX_ARRAY_ITEMS_TO_PROCESS: # Limit array items
242 break
244 processed_count[0] += 1
245 child_path = f"{path}[{i}]"
247 if isinstance(item, dict | list):
248 # Create element for complex array items
249 element_type = (
250 JSONElementType.OBJECT
251 if isinstance(item, dict)
252 else JSONElementType.ARRAY
253 )
254 child_element = self._create_json_element(
255 f"item_{i}", item, element_type, child_path, level + 1
256 )
257 parent_element.add_child(child_element)
259 # Recursively process if not too large
260 if child_element.size < self.chunk_size:
261 self._extract_json_elements(
262 child_element, item, child_path, level + 1, processed_count
263 )
264 else:
265 # Create element for simple array items
266 child_element = self._create_json_element(
267 f"item_{i}",
268 item,
269 JSONElementType.ARRAY_ITEM,
270 child_path,
271 level + 1,
272 )
273 parent_element.add_child(child_element)
275 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]:
276 """Group small JSON elements into larger chunks.
278 Args:
279 elements: List of JSON elements
281 Returns:
282 List of grouped elements
283 """
284 if not elements:
285 return []
287 grouped = []
288 current_group = []
289 current_size = 0
291 for element in elements:
292 # If element is large enough or is a significant structure, keep it separate
293 if (
294 element.size >= self.min_chunk_size
295 or element.element_type
296 in [JSONElementType.OBJECT, JSONElementType.ARRAY]
297 or element.item_count > MAX_OBJECT_KEYS_TO_PROCESS
298 ):
300 # First, add any accumulated small elements
301 if current_group:
302 grouped_element = self._create_grouped_element(current_group)
303 grouped.append(grouped_element)
304 current_group = []
305 current_size = 0
307 # Add the large element
308 grouped.append(element)
309 else:
310 # Accumulate small elements
311 current_group.append(element)
312 current_size += element.size
314 # If accumulated size is large enough, create a grouped element
315 if (
316 current_size >= self.min_chunk_size
317 or len(current_group) >= self.max_array_items_per_chunk
318 ):
319 grouped_element = self._create_grouped_element(current_group)
320 grouped.append(grouped_element)
321 current_group = []
322 current_size = 0
324 # Handle remaining small elements
325 if current_group:
326 grouped_element = self._create_grouped_element(current_group)
327 grouped.append(grouped_element)
329 return grouped
331 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement:
332 """Create a grouped element from multiple small elements.
334 Args:
335 elements: List of elements to group
337 Returns:
338 Grouped JSON element
339 """
340 if not elements:
341 raise ValueError("Cannot group empty list of elements")
343 if len(elements) == 1:
344 return elements[0]
346 # Create grouped content
347 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements):
348 # Group array items into an array
349 grouped_value = [elem.value for elem in elements]
350 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False)
351 element_type = JSONElementType.ARRAY
352 name = f"grouped_items_{len(elements)}"
353 else:
354 # Group mixed elements into an object
355 grouped_value = {}
356 for elem in elements:
357 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}"
358 grouped_value[key] = elem.value
359 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False)
360 element_type = JSONElementType.OBJECT
361 name = f"grouped_elements_{len(elements)}"
363 # Use the first element's path as base
364 base_path = elements[0].path
365 parent_path = (
366 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root"
367 )
368 grouped_path = f"{parent_path}.{name}"
370 grouped_element = JSONElement(
371 name=name,
372 element_type=element_type,
373 content=grouped_content,
374 value=grouped_value,
375 path=grouped_path,
376 level=min(elem.level for elem in elements),
377 size=len(grouped_content),
378 item_count=len(elements),
379 )
381 return grouped_element
383 def _split_large_element(self, element: JSONElement) -> list[JSONElement]:
384 """Split a large JSON element into smaller chunks.
386 Args:
387 element: Large JSON element to split
389 Returns:
390 List of smaller elements
391 """
392 if element.size <= self.chunk_size:
393 return [element]
395 chunks = []
397 if element.element_type == JSONElementType.ARRAY and isinstance(
398 element.value, list
399 ):
400 # Split array into smaller arrays
401 items = element.value
402 chunk_size = self.max_array_items_per_chunk
404 for i in range(0, len(items), chunk_size):
405 chunk_items = items[i : i + chunk_size]
406 chunk_content = json.dumps(chunk_items, indent=2, ensure_ascii=False)
408 chunk_element = JSONElement(
409 name=f"{element.name}_chunk_{i//chunk_size + 1}",
410 element_type=JSONElementType.ARRAY,
411 content=chunk_content,
412 value=chunk_items,
413 path=f"{element.path}_chunk_{i//chunk_size + 1}",
414 level=element.level,
415 size=len(chunk_content),
416 item_count=len(chunk_items),
417 )
418 chunks.append(chunk_element)
420 elif element.element_type == JSONElementType.OBJECT and isinstance(
421 element.value, dict
422 ):
423 # Split object by grouping properties
424 items = list(element.value.items())
425 current_chunk = {}
426 current_size = 0
427 chunk_index = 1
429 for key, value in items:
430 item_content = json.dumps({key: value}, indent=2, ensure_ascii=False)
431 item_size = len(item_content)
433 if current_size + item_size > self.chunk_size and current_chunk:
434 # Create chunk from current items
435 chunk_content = json.dumps(
436 current_chunk, indent=2, ensure_ascii=False
437 )
438 chunk_element = JSONElement(
439 name=f"{element.name}_chunk_{chunk_index}",
440 element_type=JSONElementType.OBJECT,
441 content=chunk_content,
442 value=current_chunk.copy(),
443 path=f"{element.path}_chunk_{chunk_index}",
444 level=element.level,
445 size=len(chunk_content),
446 item_count=len(current_chunk),
447 )
448 chunks.append(chunk_element)
450 # Start new chunk
451 current_chunk = {key: value}
452 current_size = item_size
453 chunk_index += 1
454 else:
455 current_chunk[key] = value
456 current_size += item_size
458 # Add remaining items
459 if current_chunk:
460 chunk_content = json.dumps(current_chunk, indent=2, ensure_ascii=False)
461 chunk_element = JSONElement(
462 name=f"{element.name}_chunk_{chunk_index}",
463 element_type=JSONElementType.OBJECT,
464 content=chunk_content,
465 value=current_chunk,
466 path=f"{element.path}_chunk_{chunk_index}",
467 level=element.level,
468 size=len(chunk_content),
469 item_count=len(current_chunk),
470 )
471 chunks.append(chunk_element)
472 else:
473 # For other types, split by lines as fallback
474 lines = element.content.split("\n")
475 current_chunk_lines = []
476 current_size = 0
477 chunk_index = 1
479 for line in lines:
480 line_size = len(line) + 1 # +1 for newline
482 if current_size + line_size > self.chunk_size and current_chunk_lines:
483 chunk_content = "\n".join(current_chunk_lines)
484 chunk_element = JSONElement(
485 name=f"{element.name}_chunk_{chunk_index}",
486 element_type=element.element_type,
487 content=chunk_content,
488 value=chunk_content, # Use content as value for text chunks
489 path=f"{element.path}_chunk_{chunk_index}",
490 level=element.level,
491 size=len(chunk_content),
492 item_count=len(current_chunk_lines),
493 )
494 chunks.append(chunk_element)
496 current_chunk_lines = [line]
497 current_size = line_size
498 chunk_index += 1
499 else:
500 current_chunk_lines.append(line)
501 current_size += line_size
503 # Add remaining lines
504 if current_chunk_lines:
505 chunk_content = "\n".join(current_chunk_lines)
506 chunk_element = JSONElement(
507 name=f"{element.name}_chunk_{chunk_index}",
508 element_type=element.element_type,
509 content=chunk_content,
510 value=chunk_content,
511 path=f"{element.path}_chunk_{chunk_index}",
512 level=element.level,
513 size=len(chunk_content),
514 item_count=len(current_chunk_lines),
515 )
516 chunks.append(chunk_element)
518 return chunks if chunks else [element]
520 def _extract_json_metadata(self, element: JSONElement) -> dict[str, Any]:
521 """Extract metadata from a JSON element.
523 Args:
524 element: JSON element to analyze
526 Returns:
527 Dictionary containing element metadata
528 """
529 metadata = {
530 "element_type": element.element_type.value,
531 "name": element.name,
532 "path": element.path,
533 "level": element.level,
534 "size": element.size,
535 "item_count": element.item_count,
536 "has_nested_objects": False,
537 "has_arrays": False,
538 "data_types": [],
539 }
541 # Analyze value types
542 if isinstance(element.value, dict):
543 metadata["data_types"] = list(
544 {type(v).__name__ for v in element.value.values()}
545 )
546 metadata["has_nested_objects"] = any(
547 isinstance(v, dict) for v in element.value.values()
548 )
549 metadata["has_arrays"] = any(
550 isinstance(v, list) for v in element.value.values()
551 )
552 elif isinstance(element.value, list) and element.value:
553 metadata["data_types"] = list({type(v).__name__ for v in element.value})
554 metadata["has_nested_objects"] = any(
555 isinstance(v, dict) for v in element.value
556 )
557 metadata["has_arrays"] = any(isinstance(v, list) for v in element.value)
558 else:
559 metadata["data_types"] = [type(element.value).__name__]
561 # Add parent context
562 if element.parent:
563 metadata.update(
564 {
565 "parent_name": element.parent.name,
566 "parent_type": element.parent.element_type.value,
567 "parent_path": element.parent.path,
568 }
569 )
571 return metadata
573 def chunk_document(self, document: Document) -> list[Document]:
574 """Chunk a JSON document using structural boundaries.
576 Args:
577 document: Document to chunk
579 Returns:
580 List of chunked documents
581 """
582 file_name = (
583 document.metadata.get("file_name")
584 or document.metadata.get("original_filename")
585 or document.title
586 or f"{document.source_type}:{document.source}"
587 )
589 # Start progress tracking
590 self.progress_tracker.start_chunking(
591 document.id,
592 document.source,
593 document.source_type,
594 len(document.content),
595 file_name,
596 )
598 try:
599 # Performance check: for very large files, use simple chunking
600 if len(document.content) > SIMPLE_CHUNKING_THRESHOLD:
601 self.progress_tracker.log_fallback(
602 document.id, f"Large JSON file ({len(document.content)} bytes)"
603 )
604 return self._fallback_chunking(document)
606 # Parse JSON structure
607 root_element = self._parse_json_structure(document.content)
609 if not root_element:
610 self.progress_tracker.log_fallback(document.id, "JSON parsing failed")
611 return self._fallback_chunking(document)
613 # Get all elements to chunk
614 elements_to_chunk = []
616 if root_element.children:
617 # Use top-level children as chunks
618 elements_to_chunk = root_element.children
619 else:
620 # Use root element if no children
621 elements_to_chunk = [root_element]
623 # Group small elements and split large ones
624 grouped_elements = self._group_small_elements(elements_to_chunk)
625 final_elements = []
627 for element in grouped_elements:
628 if element.size > self.chunk_size:
629 # Split large elements
630 split_elements = self._split_large_element(element)
631 final_elements.extend(split_elements)
632 else:
633 final_elements.append(element)
635 # Limit total elements
636 final_elements = final_elements[:MAX_OBJECTS_TO_PROCESS]
638 if not final_elements:
639 self.progress_tracker.finish_chunking(document.id, 0, "json")
640 return []
642 # Create chunked documents
643 chunked_docs = []
644 for i, element in enumerate(final_elements):
645 self.logger.debug(
646 f"Processing element {i+1}/{len(final_elements)}",
647 extra={
648 "element_name": element.name,
649 "element_type": element.element_type.value,
650 "content_size": element.size,
651 },
652 )
654 # Create chunk document with optimized metadata processing
655 skip_nlp = element.size > MAX_CHUNK_SIZE_FOR_NLP
657 if skip_nlp:
658 # Create chunk without expensive NLP processing
659 chunk_doc = self._create_optimized_chunk_document(
660 original_doc=document,
661 chunk_content=element.content,
662 chunk_index=i,
663 total_chunks=len(final_elements),
664 skip_nlp=True,
665 )
666 else:
667 # Use normal processing for smaller chunks
668 chunk_doc = self._create_chunk_document(
669 original_doc=document,
670 chunk_content=element.content,
671 chunk_index=i,
672 total_chunks=len(final_elements),
673 )
675 # Add JSON-specific metadata
676 json_metadata = self._extract_json_metadata(element)
677 json_metadata["chunking_strategy"] = "json"
678 json_metadata["chunking_method"] = "structured_json"
679 json_metadata["parent_document_id"] = document.id
680 chunk_doc.metadata.update(json_metadata)
682 chunked_docs.append(chunk_doc)
684 # Finish progress tracking
685 self.progress_tracker.finish_chunking(
686 document.id, len(chunked_docs), "json"
687 )
688 return chunked_docs
690 except Exception as e:
691 self.progress_tracker.log_error(document.id, str(e))
692 # Fallback to default chunking
693 self.progress_tracker.log_fallback(
694 document.id, f"JSON processing failed: {str(e)}"
695 )
696 return self._fallback_chunking(document)
698 def _create_optimized_chunk_document(
699 self,
700 original_doc: Document,
701 chunk_content: str,
702 chunk_index: int,
703 total_chunks: int,
704 skip_nlp: bool = False,
705 ) -> Document:
706 """Create a chunk document with optimized processing.
708 Args:
709 original_doc: Original document
710 chunk_content: Content of the chunk
711 chunk_index: Index of the chunk
712 total_chunks: Total number of chunks
713 skip_nlp: Whether to skip NLP processing
715 Returns:
716 Document: New document instance for the chunk
717 """
718 # Create enhanced metadata
719 metadata = original_doc.metadata.copy()
720 metadata.update(
721 {
722 "chunk_index": chunk_index,
723 "total_chunks": total_chunks,
724 }
725 )
727 if skip_nlp:
728 # Skip expensive NLP processing for large chunks
729 metadata.update(
730 {
731 "entities": [],
732 "pos_tags": [],
733 "nlp_skipped": True,
734 "skip_reason": "chunk_too_large",
735 }
736 )
737 else:
738 try:
739 # Process the chunk text to get additional features
740 processed = self._process_text(chunk_content)
741 metadata.update(
742 {
743 "entities": processed["entities"],
744 "pos_tags": processed["pos_tags"],
745 "nlp_skipped": False,
746 }
747 )
748 except Exception as e:
749 self.logger.warning(
750 f"NLP processing failed for chunk {chunk_index}: {e}"
751 )
752 metadata.update(
753 {
754 "entities": [],
755 "pos_tags": [],
756 "nlp_skipped": True,
757 "skip_reason": "nlp_error",
758 }
759 )
761 return Document(
762 content=chunk_content,
763 metadata=metadata,
764 source=original_doc.source,
765 source_type=original_doc.source_type,
766 url=original_doc.url,
767 title=original_doc.title,
768 content_type=original_doc.content_type,
769 )
771 def _fallback_chunking(self, document: Document) -> list[Document]:
772 """Fallback to simple text-based chunking when JSON parsing fails.
774 Args:
775 document: Document to chunk
777 Returns:
778 List of chunked documents
779 """
780 self.logger.warning("Falling back to simple text chunking for JSON document")
782 # Use simple line-based splitting for JSON
783 lines = document.content.split("\n")
784 chunks = []
785 current_chunk = []
786 current_size = 0
788 for line in lines:
789 line_size = len(line) + 1 # +1 for newline
791 if current_size + line_size > self.chunk_size and current_chunk:
792 chunks.append("\n".join(current_chunk))
793 current_chunk = [line]
794 current_size = line_size
795 else:
796 current_chunk.append(line)
797 current_size += line_size
799 # Add remaining lines
800 if current_chunk:
801 chunks.append("\n".join(current_chunk))
803 # Create chunk documents (limited)
804 chunked_docs = []
805 for i, chunk_content in enumerate(chunks[:MAX_OBJECTS_TO_PROCESS]):
806 chunk_doc = self._create_optimized_chunk_document(
807 original_doc=document,
808 chunk_content=chunk_content,
809 chunk_index=i,
810 total_chunks=len(chunks),
811 skip_nlp=len(chunk_content) > MAX_CHUNK_SIZE_FOR_NLP,
812 )
814 chunk_doc.id = Document.generate_chunk_id(document.id, i)
815 chunk_doc.metadata["parent_document_id"] = document.id
816 chunk_doc.metadata["chunking_method"] = "fallback_text"
818 chunked_docs.append(chunk_doc)
820 return chunked_docs
822 def _split_text(self, text: str) -> list[str]:
823 """Split text into chunks (required by base class).
825 Args:
826 text: Text to split
828 Returns:
829 List of text chunks
830 """
831 # This method is required by the base class but not used in our implementation
832 # We override chunk_document instead
833 return [text]