Coverage for src/qdrant_loader/core/chunking/strategy/json/json_section_splitter.py: 87%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""JSON section splitter for intelligent element grouping and splitting.""" 

2 

3import json 

4from typing import Any 

5 

6import structlog 

7 

8from qdrant_loader.config import Settings 

9from qdrant_loader.core.chunking.strategy.base.section_splitter import ( 

10 BaseSectionSplitter, 

11) 

12from qdrant_loader.core.chunking.strategy.json.json_document_parser import ( 

13 JSONElement, 

14 JSONElementType, 

15) 

16from qdrant_loader.core.document import Document 

17 

18logger = structlog.get_logger(__name__) 

19 

20 

21class JSONSectionSplitter(BaseSectionSplitter): 

22 """Section splitter for JSON documents.""" 

23 

24 def __init__(self, settings: Settings): 

25 """Initialize JSON section splitter. 

26 

27 Args: 

28 settings: Configuration settings 

29 """ 

30 super().__init__(settings) 

31 self.json_config = settings.global_config.chunking.strategies.json_strategy 

32 self.min_chunk_size = 200 # Minimum size for standalone chunks 

33 

34 def split_sections( 

35 self, content: str, document: Document = None 

36 ) -> list[dict[str, Any]]: 

37 """Split JSON content into logical sections. 

38 

39 Args: 

40 content: JSON content to split 

41 document: Source document (optional) 

42 

43 Returns: 

44 List of section dictionaries with content and metadata 

45 """ 

46 # This method is kept for base class compatibility 

47 # The real JSON splitting happens in split_json_elements 

48 return [{"content": content, "metadata": {}}] 

49 

50 def split_json_elements(self, elements: list[JSONElement]) -> list[JSONElement]: 

51 """Split JSON elements into optimally-sized chunks. 

52 

53 Args: 

54 elements: List of JSON elements to process 

55 

56 Returns: 

57 List of optimally grouped/split elements 

58 """ 

59 if not elements: 

60 return [] 

61 

62 # Step 1: Group small elements 

63 grouped_elements = self._group_small_elements(elements) 

64 

65 # Step 2: Split large elements 

66 final_elements = [] 

67 for element in grouped_elements: 

68 if element.size > self.chunk_size: 

69 split_elements = self._split_large_element(element) 

70 final_elements.extend(split_elements) 

71 else: 

72 final_elements.append(element) 

73 

74 # Step 3: Apply limits 

75 final_elements = final_elements[: self.json_config.max_objects_to_process] 

76 

77 return final_elements 

78 

79 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]: 

80 """Group small JSON elements into larger chunks. 

81 

82 Args: 

83 elements: List of JSON elements 

84 

85 Returns: 

86 List of grouped elements 

87 """ 

88 if not elements: 

89 return [] 

90 

91 grouped = [] 

92 current_group = [] 

93 current_size = 0 

94 

95 for element in elements: 

96 # If element is large enough or is a significant structure, keep it separate 

97 if ( 

98 element.size >= self.min_chunk_size 

99 or element.element_type 

100 in [JSONElementType.OBJECT, JSONElementType.ARRAY] 

101 or element.item_count > self.json_config.max_object_keys_to_process 

102 ): 

103 # First, add any accumulated small elements 

104 if current_group: 

105 grouped_element = self._create_grouped_element(current_group) 

106 grouped.append(grouped_element) 

107 current_group = [] 

108 current_size = 0 

109 

110 # Add the large element 

111 grouped.append(element) 

112 else: 

113 # Accumulate small elements 

114 current_group.append(element) 

115 current_size += element.size 

116 

117 # If accumulated size is large enough, create a grouped element 

118 if ( 

119 current_size >= self.min_chunk_size 

120 or len(current_group) >= self.json_config.max_array_items_per_chunk 

121 ): 

122 grouped_element = self._create_grouped_element(current_group) 

123 grouped.append(grouped_element) 

124 current_group = [] 

125 current_size = 0 

126 

127 # Handle remaining small elements 

128 if current_group: 

129 grouped_element = self._create_grouped_element(current_group) 

130 grouped.append(grouped_element) 

131 

132 return grouped 

133 

134 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement: 

135 """Create a grouped element from multiple small elements. 

136 

137 Args: 

138 elements: List of elements to group 

139 

140 Returns: 

141 Grouped JSON element 

142 """ 

143 if not elements: 

144 raise ValueError("Cannot group empty list of elements") 

145 

146 if len(elements) == 1: 

147 return elements[0] 

148 

149 # Create grouped content 

150 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements): 

151 # Group array items into an array 

152 grouped_value = [elem.value for elem in elements] 

153 try: 

154 grouped_content = json.dumps( 

155 grouped_value, indent=2, ensure_ascii=False 

156 ) 

157 except (TypeError, ValueError): 

158 grouped_content = str(grouped_value) 

159 element_type = JSONElementType.ARRAY 

160 name = f"grouped_items_{len(elements)}" 

161 else: 

162 # Group mixed elements into an object 

163 grouped_value = {} 

164 for elem in elements: 

165 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}" 

166 grouped_value[key] = elem.value 

167 try: 

168 grouped_content = json.dumps( 

169 grouped_value, indent=2, ensure_ascii=False 

170 ) 

171 except (TypeError, ValueError): 

172 grouped_content = str(grouped_value) 

173 element_type = JSONElementType.OBJECT 

174 name = f"grouped_elements_{len(elements)}" 

175 

176 # Use the first element's path as base 

177 base_path = elements[0].path 

178 parent_path = ( 

179 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root" 

180 ) 

181 grouped_path = f"{parent_path}.{name}" 

182 

183 grouped_element = JSONElement( 

184 name=name, 

185 element_type=element_type, 

186 content=grouped_content, 

187 value=grouped_value, 

188 path=grouped_path, 

189 level=min(elem.level for elem in elements), 

190 size=len(grouped_content), 

191 item_count=len(elements), 

192 ) 

193 

194 return grouped_element 

195 

196 def _split_large_element(self, element: JSONElement) -> list[JSONElement]: 

197 """Split a large JSON element into smaller chunks. 

198 

199 Args: 

200 element: Large JSON element to split 

201 

202 Returns: 

203 List of smaller elements 

204 """ 

205 if element.size <= self.chunk_size: 

206 return [element] 

207 

208 chunks = [] 

209 

210 if element.element_type == JSONElementType.ARRAY and isinstance( 

211 element.value, list 

212 ): 

213 # Split array into smaller arrays 

214 items = element.value 

215 chunk_size = self.json_config.max_array_items_per_chunk 

216 

217 for i in range(0, len(items), chunk_size): 

218 chunk_items = items[i : i + chunk_size] 

219 try: 

220 chunk_content = json.dumps( 

221 chunk_items, indent=2, ensure_ascii=False 

222 ) 

223 except (TypeError, ValueError): 

224 chunk_content = str(chunk_items) 

225 

226 chunk_element = JSONElement( 

227 name=f"{element.name}_chunk_{i//chunk_size + 1}", 

228 element_type=JSONElementType.ARRAY, 

229 content=chunk_content, 

230 value=chunk_items, 

231 path=f"{element.path}_chunk_{i//chunk_size + 1}", 

232 level=element.level, 

233 size=len(chunk_content), 

234 item_count=len(chunk_items), 

235 ) 

236 chunks.append(chunk_element) 

237 

238 elif element.element_type == JSONElementType.OBJECT and isinstance( 

239 element.value, dict 

240 ): 

241 # Split object by grouping properties 

242 items = list(element.value.items()) 

243 current_chunk = {} 

244 current_size = 0 

245 chunk_index = 1 

246 

247 for key, value in items: 

248 try: 

249 item_content = json.dumps( 

250 {key: value}, indent=2, ensure_ascii=False 

251 ) 

252 except (TypeError, ValueError): 

253 item_content = f'"{key}": {str(value)}' 

254 item_size = len(item_content) 

255 

256 if current_size + item_size > self.chunk_size and current_chunk: 

257 # Create chunk from current items 

258 try: 

259 chunk_content = json.dumps( 

260 current_chunk, indent=2, ensure_ascii=False 

261 ) 

262 except (TypeError, ValueError): 

263 chunk_content = str(current_chunk) 

264 

265 chunk_element = JSONElement( 

266 name=f"{element.name}_chunk_{chunk_index}", 

267 element_type=JSONElementType.OBJECT, 

268 content=chunk_content, 

269 value=current_chunk.copy(), 

270 path=f"{element.path}_chunk_{chunk_index}", 

271 level=element.level, 

272 size=len(chunk_content), 

273 item_count=len(current_chunk), 

274 ) 

275 chunks.append(chunk_element) 

276 

277 # Start new chunk 

278 current_chunk = {key: value} 

279 current_size = item_size 

280 chunk_index += 1 

281 else: 

282 current_chunk[key] = value 

283 current_size += item_size 

284 

285 # Add remaining items 

286 if current_chunk: 

287 try: 

288 chunk_content = json.dumps( 

289 current_chunk, indent=2, ensure_ascii=False 

290 ) 

291 except (TypeError, ValueError): 

292 chunk_content = str(current_chunk) 

293 

294 chunk_element = JSONElement( 

295 name=f"{element.name}_chunk_{chunk_index}", 

296 element_type=JSONElementType.OBJECT, 

297 content=chunk_content, 

298 value=current_chunk, 

299 path=f"{element.path}_chunk_{chunk_index}", 

300 level=element.level, 

301 size=len(chunk_content), 

302 item_count=len(current_chunk), 

303 ) 

304 chunks.append(chunk_element) 

305 else: 

306 # For other types, split by lines as fallback 

307 lines = element.content.split("\n") 

308 current_chunk_lines = [] 

309 current_size = 0 

310 chunk_index = 1 

311 

312 for line in lines: 

313 line_size = len(line) + 1 # +1 for newline 

314 

315 if current_size + line_size > self.chunk_size and current_chunk_lines: 

316 chunk_content = "\n".join(current_chunk_lines) 

317 chunk_element = JSONElement( 

318 name=f"{element.name}_chunk_{chunk_index}", 

319 element_type=element.element_type, 

320 content=chunk_content, 

321 value=chunk_content, # Use content as value for text chunks 

322 path=f"{element.path}_chunk_{chunk_index}", 

323 level=element.level, 

324 size=len(chunk_content), 

325 item_count=len(current_chunk_lines), 

326 ) 

327 chunks.append(chunk_element) 

328 

329 current_chunk_lines = [line] 

330 current_size = line_size 

331 chunk_index += 1 

332 else: 

333 current_chunk_lines.append(line) 

334 current_size += line_size 

335 

336 # Add remaining lines 

337 if current_chunk_lines: 

338 chunk_content = "\n".join(current_chunk_lines) 

339 chunk_element = JSONElement( 

340 name=f"{element.name}_chunk_{chunk_index}", 

341 element_type=element.element_type, 

342 content=chunk_content, 

343 value=chunk_content, 

344 path=f"{element.path}_chunk_{chunk_index}", 

345 level=element.level, 

346 size=len(chunk_content), 

347 item_count=len(current_chunk_lines), 

348 ) 

349 chunks.append(chunk_element) 

350 

351 return chunks if chunks else [element] 

352 

353 def merge_small_sections( 

354 self, sections: list[dict[str, Any]] 

355 ) -> list[dict[str, Any]]: 

356 """Merge small JSON sections to optimize chunk sizes. 

357 

358 Args: 

359 sections: List of section dictionaries 

360 

361 Returns: 

362 List of merged sections 

363 """ 

364 if not sections: 

365 return [] 

366 

367 merged = [] 

368 current_merged = None 

369 current_size = 0 

370 

371 for section in sections: 

372 section_size = len(section.get("content", "")) 

373 

374 # If section is large enough or we have a full merged section, finalize current 

375 if ( 

376 section_size >= self.min_chunk_size 

377 or current_size + section_size > self.chunk_size 

378 ) and current_merged: 

379 merged.append(current_merged) 

380 current_merged = None 

381 current_size = 0 

382 

383 # Start new merged section if needed 

384 if current_merged is None: 

385 current_merged = section.copy() 

386 current_size = section_size 

387 else: 

388 # Merge into existing section 

389 current_merged["content"] += "\n" + section.get("content", "") 

390 # Merge metadata 

391 if "metadata" in section: 

392 current_merged.setdefault("metadata", {}).update( 

393 section["metadata"] 

394 ) 

395 current_size += section_size + 1 # +1 for newline 

396 

397 # Add final merged section 

398 if current_merged: 

399 merged.append(current_merged) 

400 

401 return merged