Coverage for src/qdrant_loader/core/chunking/strategy/json/json_document_parser.py: 87%
183 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""JSON document parser for structure analysis and element extraction."""
3import json
4from dataclasses import dataclass, field
5from enum import Enum
6from typing import Any
8import structlog
10from qdrant_loader.config import Settings
11from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser
13logger = structlog.get_logger(__name__)
16class JSONElementType(Enum):
17 """Types of JSON elements."""
19 OBJECT = "object"
20 ARRAY = "array"
21 ARRAY_ITEM = "array_item"
22 PROPERTY = "property"
23 VALUE = "value"
24 ROOT = "root"
27@dataclass
28class JSONElement:
29 """Represents a JSON element with metadata."""
31 name: str
32 element_type: JSONElementType
33 content: str
34 value: Any
35 path: str
36 level: int = 0
37 size: int = 0
38 item_count: int = 0
39 children: list["JSONElement"] = field(default_factory=list)
41 def add_child(self, child: "JSONElement"):
42 """Add a child element."""
43 self.children.append(child)
46class JSONDocumentParser(BaseDocumentParser):
47 """Parser for JSON document structure analysis."""
49 def __init__(self, settings: Settings):
50 """Initialize JSON document parser.
52 Args:
53 settings: Configuration settings
54 """
55 self.settings = settings
56 self.json_config = settings.global_config.chunking.strategies.json_strategy
57 self.chunk_size = settings.global_config.chunking.chunk_size
59 def parse_document_structure(self, content: str) -> dict[str, Any]:
60 """Parse JSON document structure and analyze composition.
62 Args:
63 content: JSON content to analyze
65 Returns:
66 Dictionary containing structure analysis
67 """
68 try:
69 data = json.loads(content)
71 structure = {
72 "valid_json": True,
73 "root_type": type(data).__name__,
74 "total_size": len(content),
75 "nesting_depth": self._calculate_nesting_depth(data),
76 "total_elements": self._count_elements(data),
77 "schema_summary": self._infer_basic_schema(data),
78 "complexity_score": self._calculate_complexity_score(data),
79 "data_types": self._analyze_data_types(data),
80 "key_patterns": (
81 self._analyze_key_patterns(data) if isinstance(data, dict) else []
82 ),
83 "array_stats": (
84 self._analyze_arrays(data) if isinstance(data, list | dict) else {}
85 ),
86 }
88 except json.JSONDecodeError as e:
89 structure = {
90 "valid_json": False,
91 "error": str(e),
92 "total_size": len(content),
93 "estimated_elements": max(1, len(content) // 100), # Rough estimate
94 }
96 return structure
98 def extract_section_metadata(self, section: JSONElement) -> dict[str, Any]:
99 """Extract metadata from a JSON section/element.
101 Args:
102 section: JSON element to analyze
104 Returns:
105 Dictionary containing section metadata
106 """
107 metadata = {
108 "element_name": section.name,
109 "element_type": section.element_type.value,
110 "json_path": section.path,
111 "nesting_level": section.level,
112 "content_size": section.size,
113 "item_count": section.item_count,
114 "child_count": len(section.children),
115 "has_nested_structures": any(
116 child.element_type in [JSONElementType.OBJECT, JSONElementType.ARRAY]
117 for child in section.children
118 ),
119 }
121 # Analyze the actual value if available
122 if hasattr(section, "value") and section.value is not None:
123 metadata.update(
124 {
125 "value_type": type(section.value).__name__,
126 "schema_info": self._infer_basic_schema(section.value),
127 "complexity_score": self._calculate_complexity_score(section.value),
128 }
129 )
131 return metadata
133 def parse_json_structure(self, content: str) -> JSONElement | None:
134 """Parse JSON content into a structured element tree.
136 Args:
137 content: JSON content to parse
139 Returns:
140 Root JSONElement or None if parsing fails
141 """
142 try:
143 data = json.loads(content)
145 # Create root element
146 root_type = (
147 JSONElementType.OBJECT
148 if isinstance(data, dict)
149 else (
150 JSONElementType.ARRAY
151 if isinstance(data, list)
152 else JSONElementType.VALUE
153 )
154 )
156 root_element = self._create_json_element("root", data, root_type, "$", 0)
158 # Extract child elements
159 if root_element.size < self.json_config.max_json_size_for_parsing:
160 self._extract_json_elements(root_element, data, "$")
162 return root_element
164 except json.JSONDecodeError:
165 logger.warning("Failed to parse JSON content")
166 return None
168 def _create_json_element(
169 self,
170 name: str,
171 value: Any,
172 element_type: JSONElementType,
173 path: str,
174 level: int = 0,
175 ) -> JSONElement:
176 """Create a JSON element from a value.
178 Args:
179 name: Element name
180 value: JSON value
181 element_type: Type of JSON element
182 path: JSON path
183 level: Nesting level
185 Returns:
186 JSONElement instance
187 """
188 # Convert value to JSON string for content
189 try:
190 content = json.dumps(value, indent=2, ensure_ascii=False)
191 except (TypeError, ValueError):
192 content = str(value)
194 # Calculate size and item count
195 size = len(content)
196 item_count = 0
198 if isinstance(value, dict):
199 item_count = len(value)
200 elif isinstance(value, list):
201 item_count = len(value)
203 return JSONElement(
204 name=name,
205 element_type=element_type,
206 content=content,
207 value=value,
208 path=path,
209 level=level,
210 size=size,
211 item_count=item_count,
212 )
214 def _extract_json_elements(
215 self,
216 parent_element: JSONElement,
217 data: Any,
218 path: str,
219 level: int = 0,
220 processed_count: list[int] | None = None,
221 ):
222 """Recursively extract JSON elements.
224 Args:
225 parent_element: Parent JSON element
226 data: JSON data to process
227 path: Current JSON path
228 level: Current nesting level
229 processed_count: Mutable list to track total processed objects
230 """
231 if processed_count is None:
232 processed_count = [0]
234 # Performance checks
235 if level > self.json_config.max_recursion_depth:
236 return
237 if processed_count[0] >= self.json_config.max_objects_to_process:
238 return
239 if len(parent_element.children) >= self.json_config.max_array_items_per_chunk:
240 return
242 if isinstance(data, dict):
243 for i, (key, value) in enumerate(data.items()):
244 if processed_count[0] >= self.json_config.max_objects_to_process:
245 break
246 if i >= self.json_config.max_object_keys_to_process:
247 break
249 processed_count[0] += 1
250 child_path = f"{path}.{key}"
252 if isinstance(value, dict | list):
253 # Create element for complex values
254 element_type = (
255 JSONElementType.OBJECT
256 if isinstance(value, dict)
257 else JSONElementType.ARRAY
258 )
259 child_element = self._create_json_element(
260 key, value, element_type, child_path, level + 1
261 )
262 parent_element.add_child(child_element)
264 # Recursively process if not too large
265 if child_element.size < self.chunk_size:
266 self._extract_json_elements(
267 child_element, value, child_path, level + 1, processed_count
268 )
269 else:
270 # Create element for simple values
271 child_element = self._create_json_element(
272 key, value, JSONElementType.PROPERTY, child_path, level + 1
273 )
274 parent_element.add_child(child_element)
276 elif isinstance(data, list):
277 for i, item in enumerate(data):
278 if processed_count[0] >= self.json_config.max_objects_to_process:
279 break
280 if i >= self.json_config.max_array_items_per_chunk:
281 break
283 processed_count[0] += 1
284 child_path = f"{path}[{i}]"
286 if isinstance(item, dict | list):
287 # Create element for complex array items
288 element_type = (
289 JSONElementType.OBJECT
290 if isinstance(item, dict)
291 else JSONElementType.ARRAY
292 )
293 child_element = self._create_json_element(
294 f"item_{i}", item, element_type, child_path, level + 1
295 )
296 parent_element.add_child(child_element)
298 # Recursively process if not too large
299 if child_element.size < self.chunk_size:
300 self._extract_json_elements(
301 child_element, item, child_path, level + 1, processed_count
302 )
303 else:
304 # Create element for simple array items
305 child_element = self._create_json_element(
306 f"item_{i}",
307 item,
308 JSONElementType.ARRAY_ITEM,
309 child_path,
310 level + 1,
311 )
312 parent_element.add_child(child_element)
314 def _calculate_nesting_depth(self, data: Any, current_depth: int = 0) -> int:
315 """Calculate maximum nesting depth of JSON structure."""
316 if not isinstance(data, dict | list):
317 return current_depth
319 max_depth = current_depth
321 if isinstance(data, dict):
322 for value in data.values():
323 depth = self._calculate_nesting_depth(value, current_depth + 1)
324 max_depth = max(max_depth, depth)
325 elif isinstance(data, list):
326 for item in data:
327 depth = self._calculate_nesting_depth(item, current_depth + 1)
328 max_depth = max(max_depth, depth)
330 return max_depth
332 def _count_elements(self, data: Any) -> int:
333 """Count total number of elements in JSON structure."""
334 if isinstance(data, dict):
335 return 1 + sum(self._count_elements(value) for value in data.values())
336 elif isinstance(data, list):
337 return 1 + sum(self._count_elements(item) for item in data)
338 else:
339 return 1
341 def _infer_basic_schema(self, data: Any) -> dict[str, Any]:
342 """Infer basic schema information from JSON data."""
343 if isinstance(data, dict):
344 return {
345 "type": "object",
346 "properties": {
347 key: self._infer_basic_schema(value) for key, value in data.items()
348 },
349 "property_count": len(data),
350 }
351 elif isinstance(data, list):
352 if data:
353 # Analyze first few items to infer array schema
354 sample_items = data[:5]
355 item_types = [self._infer_basic_schema(item) for item in sample_items]
356 return {
357 "type": "array",
358 "length": len(data),
359 "item_schema": item_types[0] if item_types else {"type": "unknown"},
360 }
361 else:
362 return {"type": "array", "length": 0}
363 else:
364 return {"type": type(data).__name__}
366 def _calculate_complexity_score(self, data: Any) -> float:
367 """Calculate complexity score for JSON data."""
368 if isinstance(data, dict):
369 return (
370 1.0
371 + sum(
372 self._calculate_complexity_score(value) for value in data.values()
373 )
374 * 0.5
375 )
376 elif isinstance(data, list):
377 return (
378 1.0 + sum(self._calculate_complexity_score(item) for item in data) * 0.3
379 )
380 else:
381 return 0.1
383 def _analyze_data_types(self, data: Any) -> dict[str, int]:
384 """Analyze distribution of data types in JSON structure."""
385 type_counts = {}
387 def count_types(obj):
388 obj_type = type(obj).__name__
389 type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
391 if isinstance(obj, dict):
392 for value in obj.values():
393 count_types(value)
394 elif isinstance(obj, list):
395 for item in obj:
396 count_types(item)
398 count_types(data)
399 return type_counts
401 def _analyze_key_patterns(self, data: Any) -> list[str]:
402 """Analyze patterns in JSON keys."""
403 if not isinstance(data, dict):
404 return []
406 keys = list(data.keys())
407 patterns = []
409 # Check for common patterns
410 if any(key.startswith("_") for key in keys):
411 patterns.append("private_keys")
412 if any(key.isupper() for key in keys):
413 patterns.append("uppercase_keys")
414 if any("_" in key for key in keys):
415 patterns.append("snake_case")
416 if any(key[0].isupper() for key in keys if key):
417 patterns.append("camel_case")
419 return patterns
421 def _analyze_arrays(self, data: Any) -> dict[str, Any]:
422 """Analyze array statistics in JSON structure."""
423 arrays_found = []
425 def find_arrays(obj, path="$"):
426 if isinstance(obj, list):
427 arrays_found.append(
428 {
429 "path": path,
430 "length": len(obj),
431 "item_types": [
432 type(item).__name__ for item in obj[:5]
433 ], # Sample first 5
434 }
435 )
436 for i, item in enumerate(obj):
437 find_arrays(item, f"{path}[{i}]")
438 elif isinstance(obj, dict):
439 for key, value in obj.items():
440 find_arrays(value, f"{path}.{key}")
442 find_arrays(data)
444 return {
445 "total_arrays": len(arrays_found),
446 "array_details": arrays_found[:10], # Limit to first 10 for performance
447 "max_array_length": max((arr["length"] for arr in arrays_found), default=0),
448 }