Coverage for src/qdrant_loader/core/chunking/strategy/json/json_section_splitter.py: 87%
167 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""JSON section splitter for intelligent element grouping and splitting."""
3import json
4from typing import Any
6import structlog
8from qdrant_loader.config import Settings
9from qdrant_loader.core.chunking.strategy.base.section_splitter import (
10 BaseSectionSplitter,
11)
12from qdrant_loader.core.chunking.strategy.json.json_document_parser import (
13 JSONElement,
14 JSONElementType,
15)
16from qdrant_loader.core.document import Document
18logger = structlog.get_logger(__name__)
21class JSONSectionSplitter(BaseSectionSplitter):
22 """Section splitter for JSON documents."""
24 def __init__(self, settings: Settings):
25 """Initialize JSON section splitter.
27 Args:
28 settings: Configuration settings
29 """
30 super().__init__(settings)
31 self.json_config = settings.global_config.chunking.strategies.json_strategy
32 self.min_chunk_size = 200 # Minimum size for standalone chunks
34 def split_sections(
35 self, content: str, document: Document = None
36 ) -> list[dict[str, Any]]:
37 """Split JSON content into logical sections.
39 Args:
40 content: JSON content to split
41 document: Source document (optional)
43 Returns:
44 List of section dictionaries with content and metadata
45 """
46 # This method is kept for base class compatibility
47 # The real JSON splitting happens in split_json_elements
48 return [{"content": content, "metadata": {}}]
50 def split_json_elements(self, elements: list[JSONElement]) -> list[JSONElement]:
51 """Split JSON elements into optimally-sized chunks.
53 Args:
54 elements: List of JSON elements to process
56 Returns:
57 List of optimally grouped/split elements
58 """
59 if not elements:
60 return []
62 # Step 1: Group small elements
63 grouped_elements = self._group_small_elements(elements)
65 # Step 2: Split large elements
66 final_elements = []
67 for element in grouped_elements:
68 if element.size > self.chunk_size:
69 split_elements = self._split_large_element(element)
70 final_elements.extend(split_elements)
71 else:
72 final_elements.append(element)
74 # Step 3: Apply limits
75 final_elements = final_elements[: self.json_config.max_objects_to_process]
77 return final_elements
79 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]:
80 """Group small JSON elements into larger chunks.
82 Args:
83 elements: List of JSON elements
85 Returns:
86 List of grouped elements
87 """
88 if not elements:
89 return []
91 grouped = []
92 current_group = []
93 current_size = 0
95 for element in elements:
96 # If element is large enough or is a significant structure, keep it separate
97 if (
98 element.size >= self.min_chunk_size
99 or element.element_type
100 in [JSONElementType.OBJECT, JSONElementType.ARRAY]
101 or element.item_count > self.json_config.max_object_keys_to_process
102 ):
103 # First, add any accumulated small elements
104 if current_group:
105 grouped_element = self._create_grouped_element(current_group)
106 grouped.append(grouped_element)
107 current_group = []
108 current_size = 0
110 # Add the large element
111 grouped.append(element)
112 else:
113 # Accumulate small elements
114 current_group.append(element)
115 current_size += element.size
117 # If accumulated size is large enough, create a grouped element
118 if (
119 current_size >= self.min_chunk_size
120 or len(current_group) >= self.json_config.max_array_items_per_chunk
121 ):
122 grouped_element = self._create_grouped_element(current_group)
123 grouped.append(grouped_element)
124 current_group = []
125 current_size = 0
127 # Handle remaining small elements
128 if current_group:
129 grouped_element = self._create_grouped_element(current_group)
130 grouped.append(grouped_element)
132 return grouped
134 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement:
135 """Create a grouped element from multiple small elements.
137 Args:
138 elements: List of elements to group
140 Returns:
141 Grouped JSON element
142 """
143 if not elements:
144 raise ValueError("Cannot group empty list of elements")
146 if len(elements) == 1:
147 return elements[0]
149 # Create grouped content
150 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements):
151 # Group array items into an array
152 grouped_value = [elem.value for elem in elements]
153 try:
154 grouped_content = json.dumps(
155 grouped_value, indent=2, ensure_ascii=False
156 )
157 except (TypeError, ValueError):
158 grouped_content = str(grouped_value)
159 element_type = JSONElementType.ARRAY
160 name = f"grouped_items_{len(elements)}"
161 else:
162 # Group mixed elements into an object
163 grouped_value = {}
164 for elem in elements:
165 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}"
166 grouped_value[key] = elem.value
167 try:
168 grouped_content = json.dumps(
169 grouped_value, indent=2, ensure_ascii=False
170 )
171 except (TypeError, ValueError):
172 grouped_content = str(grouped_value)
173 element_type = JSONElementType.OBJECT
174 name = f"grouped_elements_{len(elements)}"
176 # Use the first element's path as base
177 base_path = elements[0].path
178 parent_path = (
179 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root"
180 )
181 grouped_path = f"{parent_path}.{name}"
183 grouped_element = JSONElement(
184 name=name,
185 element_type=element_type,
186 content=grouped_content,
187 value=grouped_value,
188 path=grouped_path,
189 level=min(elem.level for elem in elements),
190 size=len(grouped_content),
191 item_count=len(elements),
192 )
194 return grouped_element
196 def _split_large_element(self, element: JSONElement) -> list[JSONElement]:
197 """Split a large JSON element into smaller chunks.
199 Args:
200 element: Large JSON element to split
202 Returns:
203 List of smaller elements
204 """
205 if element.size <= self.chunk_size:
206 return [element]
208 chunks = []
210 if element.element_type == JSONElementType.ARRAY and isinstance(
211 element.value, list
212 ):
213 # Split array into smaller arrays
214 items = element.value
215 chunk_size = self.json_config.max_array_items_per_chunk
217 for i in range(0, len(items), chunk_size):
218 chunk_items = items[i : i + chunk_size]
219 try:
220 chunk_content = json.dumps(
221 chunk_items, indent=2, ensure_ascii=False
222 )
223 except (TypeError, ValueError):
224 chunk_content = str(chunk_items)
226 chunk_element = JSONElement(
227 name=f"{element.name}_chunk_{i//chunk_size + 1}",
228 element_type=JSONElementType.ARRAY,
229 content=chunk_content,
230 value=chunk_items,
231 path=f"{element.path}_chunk_{i//chunk_size + 1}",
232 level=element.level,
233 size=len(chunk_content),
234 item_count=len(chunk_items),
235 )
236 chunks.append(chunk_element)
238 elif element.element_type == JSONElementType.OBJECT and isinstance(
239 element.value, dict
240 ):
241 # Split object by grouping properties
242 items = list(element.value.items())
243 current_chunk = {}
244 current_size = 0
245 chunk_index = 1
247 for key, value in items:
248 try:
249 item_content = json.dumps(
250 {key: value}, indent=2, ensure_ascii=False
251 )
252 except (TypeError, ValueError):
253 item_content = f'"{key}": {str(value)}'
254 item_size = len(item_content)
256 if current_size + item_size > self.chunk_size and current_chunk:
257 # Create chunk from current items
258 try:
259 chunk_content = json.dumps(
260 current_chunk, indent=2, ensure_ascii=False
261 )
262 except (TypeError, ValueError):
263 chunk_content = str(current_chunk)
265 chunk_element = JSONElement(
266 name=f"{element.name}_chunk_{chunk_index}",
267 element_type=JSONElementType.OBJECT,
268 content=chunk_content,
269 value=current_chunk.copy(),
270 path=f"{element.path}_chunk_{chunk_index}",
271 level=element.level,
272 size=len(chunk_content),
273 item_count=len(current_chunk),
274 )
275 chunks.append(chunk_element)
277 # Start new chunk
278 current_chunk = {key: value}
279 current_size = item_size
280 chunk_index += 1
281 else:
282 current_chunk[key] = value
283 current_size += item_size
285 # Add remaining items
286 if current_chunk:
287 try:
288 chunk_content = json.dumps(
289 current_chunk, indent=2, ensure_ascii=False
290 )
291 except (TypeError, ValueError):
292 chunk_content = str(current_chunk)
294 chunk_element = JSONElement(
295 name=f"{element.name}_chunk_{chunk_index}",
296 element_type=JSONElementType.OBJECT,
297 content=chunk_content,
298 value=current_chunk,
299 path=f"{element.path}_chunk_{chunk_index}",
300 level=element.level,
301 size=len(chunk_content),
302 item_count=len(current_chunk),
303 )
304 chunks.append(chunk_element)
305 else:
306 # For other types, split by lines as fallback
307 lines = element.content.split("\n")
308 current_chunk_lines = []
309 current_size = 0
310 chunk_index = 1
312 for line in lines:
313 line_size = len(line) + 1 # +1 for newline
315 if current_size + line_size > self.chunk_size and current_chunk_lines:
316 chunk_content = "\n".join(current_chunk_lines)
317 chunk_element = JSONElement(
318 name=f"{element.name}_chunk_{chunk_index}",
319 element_type=element.element_type,
320 content=chunk_content,
321 value=chunk_content, # Use content as value for text chunks
322 path=f"{element.path}_chunk_{chunk_index}",
323 level=element.level,
324 size=len(chunk_content),
325 item_count=len(current_chunk_lines),
326 )
327 chunks.append(chunk_element)
329 current_chunk_lines = [line]
330 current_size = line_size
331 chunk_index += 1
332 else:
333 current_chunk_lines.append(line)
334 current_size += line_size
336 # Add remaining lines
337 if current_chunk_lines:
338 chunk_content = "\n".join(current_chunk_lines)
339 chunk_element = JSONElement(
340 name=f"{element.name}_chunk_{chunk_index}",
341 element_type=element.element_type,
342 content=chunk_content,
343 value=chunk_content,
344 path=f"{element.path}_chunk_{chunk_index}",
345 level=element.level,
346 size=len(chunk_content),
347 item_count=len(current_chunk_lines),
348 )
349 chunks.append(chunk_element)
351 return chunks if chunks else [element]
353 def merge_small_sections(
354 self, sections: list[dict[str, Any]]
355 ) -> list[dict[str, Any]]:
356 """Merge small JSON sections to optimize chunk sizes.
358 Args:
359 sections: List of section dictionaries
361 Returns:
362 List of merged sections
363 """
364 if not sections:
365 return []
367 merged = []
368 current_merged = None
369 current_size = 0
371 for section in sections:
372 section_size = len(section.get("content", ""))
374 # If section is large enough or we have a full merged section, finalize current
375 if (
376 section_size >= self.min_chunk_size
377 or current_size + section_size > self.chunk_size
378 ) and current_merged:
379 merged.append(current_merged)
380 current_merged = None
381 current_size = 0
383 # Start new merged section if needed
384 if current_merged is None:
385 current_merged = section.copy()
386 current_size = section_size
387 else:
388 # Merge into existing section
389 current_merged["content"] += "\n" + section.get("content", "")
390 # Merge metadata
391 if "metadata" in section:
392 current_merged.setdefault("metadata", {}).update(
393 section["metadata"]
394 )
395 current_size += section_size + 1 # +1 for newline
397 # Add final merged section
398 if current_merged:
399 merged.append(current_merged)
401 return merged