Coverage for src/qdrant_loader/core/chunking/strategy/json/json_document_parser.py: 87%

183 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""JSON document parser for structure analysis and element extraction.""" 

2 

3import json 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import Any 

7 

8import structlog 

9 

10from qdrant_loader.config import Settings 

11from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16class JSONElementType(Enum): 

17 """Types of JSON elements.""" 

18 

19 OBJECT = "object" 

20 ARRAY = "array" 

21 ARRAY_ITEM = "array_item" 

22 PROPERTY = "property" 

23 VALUE = "value" 

24 ROOT = "root" 

25 

26 

27@dataclass 

28class JSONElement: 

29 """Represents a JSON element with metadata.""" 

30 

31 name: str 

32 element_type: JSONElementType 

33 content: str 

34 value: Any 

35 path: str 

36 level: int = 0 

37 size: int = 0 

38 item_count: int = 0 

39 children: list["JSONElement"] = field(default_factory=list) 

40 

41 def add_child(self, child: "JSONElement"): 

42 """Add a child element.""" 

43 self.children.append(child) 

44 

45 

46class JSONDocumentParser(BaseDocumentParser): 

47 """Parser for JSON document structure analysis.""" 

48 

49 def __init__(self, settings: Settings): 

50 """Initialize JSON document parser. 

51 

52 Args: 

53 settings: Configuration settings 

54 """ 

55 self.settings = settings 

56 self.json_config = settings.global_config.chunking.strategies.json_strategy 

57 self.chunk_size = settings.global_config.chunking.chunk_size 

58 

59 def parse_document_structure(self, content: str) -> dict[str, Any]: 

60 """Parse JSON document structure and analyze composition. 

61 

62 Args: 

63 content: JSON content to analyze 

64 

65 Returns: 

66 Dictionary containing structure analysis 

67 """ 

68 try: 

69 data = json.loads(content) 

70 

71 structure = { 

72 "valid_json": True, 

73 "root_type": type(data).__name__, 

74 "total_size": len(content), 

75 "nesting_depth": self._calculate_nesting_depth(data), 

76 "total_elements": self._count_elements(data), 

77 "schema_summary": self._infer_basic_schema(data), 

78 "complexity_score": self._calculate_complexity_score(data), 

79 "data_types": self._analyze_data_types(data), 

80 "key_patterns": ( 

81 self._analyze_key_patterns(data) if isinstance(data, dict) else [] 

82 ), 

83 "array_stats": ( 

84 self._analyze_arrays(data) if isinstance(data, list | dict) else {} 

85 ), 

86 } 

87 

88 except json.JSONDecodeError as e: 

89 structure = { 

90 "valid_json": False, 

91 "error": str(e), 

92 "total_size": len(content), 

93 "estimated_elements": max(1, len(content) // 100), # Rough estimate 

94 } 

95 

96 return structure 

97 

98 def extract_section_metadata(self, section: JSONElement) -> dict[str, Any]: 

99 """Extract metadata from a JSON section/element. 

100 

101 Args: 

102 section: JSON element to analyze 

103 

104 Returns: 

105 Dictionary containing section metadata 

106 """ 

107 metadata = { 

108 "element_name": section.name, 

109 "element_type": section.element_type.value, 

110 "json_path": section.path, 

111 "nesting_level": section.level, 

112 "content_size": section.size, 

113 "item_count": section.item_count, 

114 "child_count": len(section.children), 

115 "has_nested_structures": any( 

116 child.element_type in [JSONElementType.OBJECT, JSONElementType.ARRAY] 

117 for child in section.children 

118 ), 

119 } 

120 

121 # Analyze the actual value if available 

122 if hasattr(section, "value") and section.value is not None: 

123 metadata.update( 

124 { 

125 "value_type": type(section.value).__name__, 

126 "schema_info": self._infer_basic_schema(section.value), 

127 "complexity_score": self._calculate_complexity_score(section.value), 

128 } 

129 ) 

130 

131 return metadata 

132 

133 def parse_json_structure(self, content: str) -> JSONElement | None: 

134 """Parse JSON content into a structured element tree. 

135 

136 Args: 

137 content: JSON content to parse 

138 

139 Returns: 

140 Root JSONElement or None if parsing fails 

141 """ 

142 try: 

143 data = json.loads(content) 

144 

145 # Create root element 

146 root_type = ( 

147 JSONElementType.OBJECT 

148 if isinstance(data, dict) 

149 else ( 

150 JSONElementType.ARRAY 

151 if isinstance(data, list) 

152 else JSONElementType.VALUE 

153 ) 

154 ) 

155 

156 root_element = self._create_json_element("root", data, root_type, "$", 0) 

157 

158 # Extract child elements 

159 if root_element.size < self.json_config.max_json_size_for_parsing: 

160 self._extract_json_elements(root_element, data, "$") 

161 

162 return root_element 

163 

164 except json.JSONDecodeError: 

165 logger.warning("Failed to parse JSON content") 

166 return None 

167 

168 def _create_json_element( 

169 self, 

170 name: str, 

171 value: Any, 

172 element_type: JSONElementType, 

173 path: str, 

174 level: int = 0, 

175 ) -> JSONElement: 

176 """Create a JSON element from a value. 

177 

178 Args: 

179 name: Element name 

180 value: JSON value 

181 element_type: Type of JSON element 

182 path: JSON path 

183 level: Nesting level 

184 

185 Returns: 

186 JSONElement instance 

187 """ 

188 # Convert value to JSON string for content 

189 try: 

190 content = json.dumps(value, indent=2, ensure_ascii=False) 

191 except (TypeError, ValueError): 

192 content = str(value) 

193 

194 # Calculate size and item count 

195 size = len(content) 

196 item_count = 0 

197 

198 if isinstance(value, dict): 

199 item_count = len(value) 

200 elif isinstance(value, list): 

201 item_count = len(value) 

202 

203 return JSONElement( 

204 name=name, 

205 element_type=element_type, 

206 content=content, 

207 value=value, 

208 path=path, 

209 level=level, 

210 size=size, 

211 item_count=item_count, 

212 ) 

213 

214 def _extract_json_elements( 

215 self, 

216 parent_element: JSONElement, 

217 data: Any, 

218 path: str, 

219 level: int = 0, 

220 processed_count: list[int] | None = None, 

221 ): 

222 """Recursively extract JSON elements. 

223 

224 Args: 

225 parent_element: Parent JSON element 

226 data: JSON data to process 

227 path: Current JSON path 

228 level: Current nesting level 

229 processed_count: Mutable list to track total processed objects 

230 """ 

231 if processed_count is None: 

232 processed_count = [0] 

233 

234 # Performance checks 

235 if level > self.json_config.max_recursion_depth: 

236 return 

237 if processed_count[0] >= self.json_config.max_objects_to_process: 

238 return 

239 if len(parent_element.children) >= self.json_config.max_array_items_per_chunk: 

240 return 

241 

242 if isinstance(data, dict): 

243 for i, (key, value) in enumerate(data.items()): 

244 if processed_count[0] >= self.json_config.max_objects_to_process: 

245 break 

246 if i >= self.json_config.max_object_keys_to_process: 

247 break 

248 

249 processed_count[0] += 1 

250 child_path = f"{path}.{key}" 

251 

252 if isinstance(value, dict | list): 

253 # Create element for complex values 

254 element_type = ( 

255 JSONElementType.OBJECT 

256 if isinstance(value, dict) 

257 else JSONElementType.ARRAY 

258 ) 

259 child_element = self._create_json_element( 

260 key, value, element_type, child_path, level + 1 

261 ) 

262 parent_element.add_child(child_element) 

263 

264 # Recursively process if not too large 

265 if child_element.size < self.chunk_size: 

266 self._extract_json_elements( 

267 child_element, value, child_path, level + 1, processed_count 

268 ) 

269 else: 

270 # Create element for simple values 

271 child_element = self._create_json_element( 

272 key, value, JSONElementType.PROPERTY, child_path, level + 1 

273 ) 

274 parent_element.add_child(child_element) 

275 

276 elif isinstance(data, list): 

277 for i, item in enumerate(data): 

278 if processed_count[0] >= self.json_config.max_objects_to_process: 

279 break 

280 if i >= self.json_config.max_array_items_per_chunk: 

281 break 

282 

283 processed_count[0] += 1 

284 child_path = f"{path}[{i}]" 

285 

286 if isinstance(item, dict | list): 

287 # Create element for complex array items 

288 element_type = ( 

289 JSONElementType.OBJECT 

290 if isinstance(item, dict) 

291 else JSONElementType.ARRAY 

292 ) 

293 child_element = self._create_json_element( 

294 f"item_{i}", item, element_type, child_path, level + 1 

295 ) 

296 parent_element.add_child(child_element) 

297 

298 # Recursively process if not too large 

299 if child_element.size < self.chunk_size: 

300 self._extract_json_elements( 

301 child_element, item, child_path, level + 1, processed_count 

302 ) 

303 else: 

304 # Create element for simple array items 

305 child_element = self._create_json_element( 

306 f"item_{i}", 

307 item, 

308 JSONElementType.ARRAY_ITEM, 

309 child_path, 

310 level + 1, 

311 ) 

312 parent_element.add_child(child_element) 

313 

314 def _calculate_nesting_depth(self, data: Any, current_depth: int = 0) -> int: 

315 """Calculate maximum nesting depth of JSON structure.""" 

316 if not isinstance(data, dict | list): 

317 return current_depth 

318 

319 max_depth = current_depth 

320 

321 if isinstance(data, dict): 

322 for value in data.values(): 

323 depth = self._calculate_nesting_depth(value, current_depth + 1) 

324 max_depth = max(max_depth, depth) 

325 elif isinstance(data, list): 

326 for item in data: 

327 depth = self._calculate_nesting_depth(item, current_depth + 1) 

328 max_depth = max(max_depth, depth) 

329 

330 return max_depth 

331 

332 def _count_elements(self, data: Any) -> int: 

333 """Count total number of elements in JSON structure.""" 

334 if isinstance(data, dict): 

335 return 1 + sum(self._count_elements(value) for value in data.values()) 

336 elif isinstance(data, list): 

337 return 1 + sum(self._count_elements(item) for item in data) 

338 else: 

339 return 1 

340 

341 def _infer_basic_schema(self, data: Any) -> dict[str, Any]: 

342 """Infer basic schema information from JSON data.""" 

343 if isinstance(data, dict): 

344 return { 

345 "type": "object", 

346 "properties": { 

347 key: self._infer_basic_schema(value) for key, value in data.items() 

348 }, 

349 "property_count": len(data), 

350 } 

351 elif isinstance(data, list): 

352 if data: 

353 # Analyze first few items to infer array schema 

354 sample_items = data[:5] 

355 item_types = [self._infer_basic_schema(item) for item in sample_items] 

356 return { 

357 "type": "array", 

358 "length": len(data), 

359 "item_schema": item_types[0] if item_types else {"type": "unknown"}, 

360 } 

361 else: 

362 return {"type": "array", "length": 0} 

363 else: 

364 return {"type": type(data).__name__} 

365 

366 def _calculate_complexity_score(self, data: Any) -> float: 

367 """Calculate complexity score for JSON data.""" 

368 if isinstance(data, dict): 

369 return ( 

370 1.0 

371 + sum( 

372 self._calculate_complexity_score(value) for value in data.values() 

373 ) 

374 * 0.5 

375 ) 

376 elif isinstance(data, list): 

377 return ( 

378 1.0 + sum(self._calculate_complexity_score(item) for item in data) * 0.3 

379 ) 

380 else: 

381 return 0.1 

382 

383 def _analyze_data_types(self, data: Any) -> dict[str, int]: 

384 """Analyze distribution of data types in JSON structure.""" 

385 type_counts = {} 

386 

387 def count_types(obj): 

388 obj_type = type(obj).__name__ 

389 type_counts[obj_type] = type_counts.get(obj_type, 0) + 1 

390 

391 if isinstance(obj, dict): 

392 for value in obj.values(): 

393 count_types(value) 

394 elif isinstance(obj, list): 

395 for item in obj: 

396 count_types(item) 

397 

398 count_types(data) 

399 return type_counts 

400 

401 def _analyze_key_patterns(self, data: Any) -> list[str]: 

402 """Analyze patterns in JSON keys.""" 

403 if not isinstance(data, dict): 

404 return [] 

405 

406 keys = list(data.keys()) 

407 patterns = [] 

408 

409 # Check for common patterns 

410 if any(key.startswith("_") for key in keys): 

411 patterns.append("private_keys") 

412 if any(key.isupper() for key in keys): 

413 patterns.append("uppercase_keys") 

414 if any("_" in key for key in keys): 

415 patterns.append("snake_case") 

416 if any(key[0].isupper() for key in keys if key): 

417 patterns.append("camel_case") 

418 

419 return patterns 

420 

421 def _analyze_arrays(self, data: Any) -> dict[str, Any]: 

422 """Analyze array statistics in JSON structure.""" 

423 arrays_found = [] 

424 

425 def find_arrays(obj, path="$"): 

426 if isinstance(obj, list): 

427 arrays_found.append( 

428 { 

429 "path": path, 

430 "length": len(obj), 

431 "item_types": [ 

432 type(item).__name__ for item in obj[:5] 

433 ], # Sample first 5 

434 } 

435 ) 

436 for i, item in enumerate(obj): 

437 find_arrays(item, f"{path}[{i}]") 

438 elif isinstance(obj, dict): 

439 for key, value in obj.items(): 

440 find_arrays(value, f"{path}.{key}") 

441 

442 find_arrays(data) 

443 

444 return { 

445 "total_arrays": len(arrays_found), 

446 "array_details": arrays_found[:10], # Limit to first 10 for performance 

447 "max_array_length": max((arr["length"] for arr in arrays_found), default=0), 

448 }