Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 98%

316 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""JSON-specific chunking strategy for structured data.""" 

2 

3import json 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import TYPE_CHECKING, Any, Optional 

7 

8import structlog 

9 

10from qdrant_loader.config import Settings 

11from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

13from qdrant_loader.core.document import Document 

14 

15if TYPE_CHECKING: 

16 pass 

17 

18logger = structlog.get_logger(__name__) 

19 

20# Performance constants to prevent timeouts 

21MAX_JSON_SIZE_FOR_PARSING = 1_000_000 # 1MB limit for JSON parsing 

22MAX_OBJECTS_TO_PROCESS = 200 # Reduced limit for objects to prevent timeouts 

23MAX_CHUNK_SIZE_FOR_NLP = 20_000 # 20KB limit for NLP processing 

24MAX_RECURSION_DEPTH = 5 # Limit recursion depth for nested structures 

25MAX_ARRAY_ITEMS_TO_PROCESS = 50 # Limit array items to process 

26MAX_OBJECT_KEYS_TO_PROCESS = 100 # Limit object keys to process 

27SIMPLE_CHUNKING_THRESHOLD = 500_000 # Use simple chunking for files larger than 500KB 

28 

29 

30class JSONElementType(Enum): 

31 """Types of JSON elements.""" 

32 

33 OBJECT = "object" 

34 ARRAY = "array" 

35 ARRAY_ITEM = "array_item" 

36 PROPERTY = "property" 

37 VALUE = "value" 

38 ROOT = "root" 

39 

40 

41@dataclass 

42class JSONElement: 

43 """Represents a JSON element with its metadata.""" 

44 

45 name: str 

46 element_type: JSONElementType 

47 content: str 

48 value: Any 

49 path: str # JSON path like "root.users[0].name" 

50 level: int = 0 

51 parent: Optional["JSONElement"] = None 

52 children: list["JSONElement"] = field(default_factory=list) 

53 size: int = 0 # Size in characters 

54 item_count: int = 0 # Number of items for arrays/objects 

55 

56 def add_child(self, child: "JSONElement"): 

57 """Add a child element.""" 

58 self.children.append(child) 

59 child.parent = self 

60 

61 

62class JSONChunkingStrategy(BaseChunkingStrategy): 

63 """Strategy for chunking JSON documents based on structure. 

64 

65 This strategy parses JSON structure and creates chunks based on: 

66 - Top-level objects and arrays 

67 - Large nested objects 

68 - Array items (grouped when small) 

69 - Preserving JSON structure and hierarchy 

70 """ 

71 

72 def __init__(self, settings: Settings): 

73 """Initialize the JSON chunking strategy. 

74 

75 Args: 

76 settings: Configuration settings 

77 """ 

78 super().__init__(settings) 

79 self.logger = logger 

80 self.progress_tracker = ChunkingProgressTracker(logger) 

81 

82 # Cache for processed chunks 

83 self._processed_chunks = {} 

84 

85 # Minimum size for standalone chunks 

86 self.min_chunk_size = 200 

87 

88 # Maximum items to group together in arrays 

89 self.max_array_items_per_chunk = 50 

90 

91 def _parse_json_structure(self, content: str) -> JSONElement | None: 

92 """Parse JSON content into structured elements. 

93 

94 Args: 

95 content: JSON content to parse 

96 

97 Returns: 

98 Root JSON element or None if parsing fails 

99 """ 

100 # Performance check: skip parsing for very large files 

101 if len(content) > MAX_JSON_SIZE_FOR_PARSING: 

102 self.logger.info( 

103 f"JSON too large for structured parsing ({len(content)} bytes)" 

104 ) 

105 return None 

106 

107 try: 

108 data = json.loads(content) 

109 root_element = self._create_json_element( 

110 "root", data, JSONElementType.ROOT, "root" 

111 ) 

112 # Initialize processed count for this document 

113 processed_count = [0] 

114 self._extract_json_elements(root_element, data, "root", 0, processed_count) 

115 self.logger.debug( 

116 f"Processed {processed_count[0]} JSON elements (limit: {MAX_OBJECTS_TO_PROCESS})" 

117 ) 

118 return root_element 

119 

120 except json.JSONDecodeError as e: 

121 self.logger.warning(f"Failed to parse JSON: {e}") 

122 return None 

123 except Exception as e: 

124 self.logger.warning(f"Error parsing JSON structure: {e}") 

125 return None 

126 

127 def _create_json_element( 

128 self, 

129 name: str, 

130 value: Any, 

131 element_type: JSONElementType, 

132 path: str, 

133 level: int = 0, 

134 ) -> JSONElement: 

135 """Create a JSON element from a value. 

136 

137 Args: 

138 name: Element name 

139 value: JSON value 

140 element_type: Type of JSON element 

141 path: JSON path 

142 level: Nesting level 

143 

144 Returns: 

145 JSONElement instance 

146 """ 

147 # Convert value to JSON string for content 

148 try: 

149 content = json.dumps(value, indent=2, ensure_ascii=False) 

150 except (TypeError, ValueError): 

151 content = str(value) 

152 

153 # Calculate size and item count 

154 size = len(content) 

155 item_count = 0 

156 

157 if isinstance(value, dict): 

158 item_count = len(value) 

159 elif isinstance(value, list): 

160 item_count = len(value) 

161 

162 return JSONElement( 

163 name=name, 

164 element_type=element_type, 

165 content=content, 

166 value=value, 

167 path=path, 

168 level=level, 

169 size=size, 

170 item_count=item_count, 

171 ) 

172 

173 def _extract_json_elements( 

174 self, 

175 parent_element: JSONElement, 

176 data: Any, 

177 path: str, 

178 level: int = 0, 

179 processed_count: list[int] | None = None, 

180 ): 

181 """Recursively extract JSON elements. 

182 

183 Args: 

184 parent_element: Parent JSON element 

185 data: JSON data to process 

186 path: Current JSON path 

187 level: Current nesting level 

188 processed_count: Mutable list to track total processed objects 

189 """ 

190 if processed_count is None: 

191 processed_count = [0] 

192 

193 # Performance checks 

194 if level > MAX_RECURSION_DEPTH: # Limit recursion depth 

195 return 

196 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: # Global limit 

197 return 

198 if len(parent_element.children) >= MAX_ARRAY_ITEMS_TO_PROCESS: # Local limit 

199 return 

200 

201 if isinstance(data, dict): 

202 for i, (key, value) in enumerate(data.items()): 

203 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: 

204 break 

205 if i >= MAX_OBJECT_KEYS_TO_PROCESS: # Limit keys per object 

206 break 

207 

208 processed_count[0] += 1 

209 child_path = f"{path}.{key}" 

210 

211 if isinstance(value, dict | list): 

212 # Create element for complex values 

213 element_type = ( 

214 JSONElementType.OBJECT 

215 if isinstance(value, dict) 

216 else JSONElementType.ARRAY 

217 ) 

218 child_element = self._create_json_element( 

219 key, value, element_type, child_path, level + 1 

220 ) 

221 parent_element.add_child(child_element) 

222 

223 # Recursively process if not too large 

224 if child_element.size < self.chunk_size: 

225 self._extract_json_elements( 

226 child_element, value, child_path, level + 1, processed_count 

227 ) 

228 else: 

229 # Create element for simple values 

230 child_element = self._create_json_element( 

231 key, value, JSONElementType.PROPERTY, child_path, level + 1 

232 ) 

233 parent_element.add_child(child_element) 

234 

235 elif isinstance(data, list): 

236 for i, item in enumerate(data): 

237 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: 

238 break 

239 if i >= MAX_ARRAY_ITEMS_TO_PROCESS: # Limit array items 

240 break 

241 

242 processed_count[0] += 1 

243 child_path = f"{path}[{i}]" 

244 

245 if isinstance(item, dict | list): 

246 # Create element for complex array items 

247 element_type = ( 

248 JSONElementType.OBJECT 

249 if isinstance(item, dict) 

250 else JSONElementType.ARRAY 

251 ) 

252 child_element = self._create_json_element( 

253 f"item_{i}", item, element_type, child_path, level + 1 

254 ) 

255 parent_element.add_child(child_element) 

256 

257 # Recursively process if not too large 

258 if child_element.size < self.chunk_size: 

259 self._extract_json_elements( 

260 child_element, item, child_path, level + 1, processed_count 

261 ) 

262 else: 

263 # Create element for simple array items 

264 child_element = self._create_json_element( 

265 f"item_{i}", 

266 item, 

267 JSONElementType.ARRAY_ITEM, 

268 child_path, 

269 level + 1, 

270 ) 

271 parent_element.add_child(child_element) 

272 

273 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]: 

274 """Group small JSON elements into larger chunks. 

275 

276 Args: 

277 elements: List of JSON elements 

278 

279 Returns: 

280 List of grouped elements 

281 """ 

282 if not elements: 

283 return [] 

284 

285 grouped = [] 

286 current_group = [] 

287 current_size = 0 

288 

289 for element in elements: 

290 # If element is large enough or is a significant structure, keep it separate 

291 if ( 

292 element.size >= self.min_chunk_size 

293 or element.element_type 

294 in [JSONElementType.OBJECT, JSONElementType.ARRAY] 

295 or element.item_count > MAX_OBJECT_KEYS_TO_PROCESS 

296 ): 

297 

298 # First, add any accumulated small elements 

299 if current_group: 

300 grouped_element = self._create_grouped_element(current_group) 

301 grouped.append(grouped_element) 

302 current_group = [] 

303 current_size = 0 

304 

305 # Add the large element 

306 grouped.append(element) 

307 else: 

308 # Accumulate small elements 

309 current_group.append(element) 

310 current_size += element.size 

311 

312 # If accumulated size is large enough, create a grouped element 

313 if ( 

314 current_size >= self.min_chunk_size 

315 or len(current_group) >= self.max_array_items_per_chunk 

316 ): 

317 grouped_element = self._create_grouped_element(current_group) 

318 grouped.append(grouped_element) 

319 current_group = [] 

320 current_size = 0 

321 

322 # Handle remaining small elements 

323 if current_group: 

324 grouped_element = self._create_grouped_element(current_group) 

325 grouped.append(grouped_element) 

326 

327 return grouped 

328 

329 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement: 

330 """Create a grouped element from multiple small elements. 

331 

332 Args: 

333 elements: List of elements to group 

334 

335 Returns: 

336 Grouped JSON element 

337 """ 

338 if not elements: 

339 raise ValueError("Cannot group empty list of elements") 

340 

341 if len(elements) == 1: 

342 return elements[0] 

343 

344 # Create grouped content 

345 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements): 

346 # Group array items into an array 

347 grouped_value = [elem.value for elem in elements] 

348 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False) 

349 element_type = JSONElementType.ARRAY 

350 name = f"grouped_items_{len(elements)}" 

351 else: 

352 # Group mixed elements into an object 

353 grouped_value = {} 

354 for elem in elements: 

355 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}" 

356 grouped_value[key] = elem.value 

357 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False) 

358 element_type = JSONElementType.OBJECT 

359 name = f"grouped_elements_{len(elements)}" 

360 

361 # Use the first element's path as base 

362 base_path = elements[0].path 

363 parent_path = ( 

364 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root" 

365 ) 

366 grouped_path = f"{parent_path}.{name}" 

367 

368 grouped_element = JSONElement( 

369 name=name, 

370 element_type=element_type, 

371 content=grouped_content, 

372 value=grouped_value, 

373 path=grouped_path, 

374 level=min(elem.level for elem in elements), 

375 size=len(grouped_content), 

376 item_count=len(elements), 

377 ) 

378 

379 return grouped_element 

380 

381 def _split_large_element(self, element: JSONElement) -> list[JSONElement]: 

382 """Split a large JSON element into smaller chunks. 

383 

384 Args: 

385 element: Large JSON element to split 

386 

387 Returns: 

388 List of smaller elements 

389 """ 

390 if element.size <= self.chunk_size: 

391 return [element] 

392 

393 chunks = [] 

394 

395 if element.element_type == JSONElementType.ARRAY and isinstance( 

396 element.value, list 

397 ): 

398 # Split array into smaller arrays 

399 items = element.value 

400 chunk_size = self.max_array_items_per_chunk 

401 

402 for i in range(0, len(items), chunk_size): 

403 chunk_items = items[i : i + chunk_size] 

404 chunk_content = json.dumps(chunk_items, indent=2, ensure_ascii=False) 

405 

406 chunk_element = JSONElement( 

407 name=f"{element.name}_chunk_{i//chunk_size + 1}", 

408 element_type=JSONElementType.ARRAY, 

409 content=chunk_content, 

410 value=chunk_items, 

411 path=f"{element.path}_chunk_{i//chunk_size + 1}", 

412 level=element.level, 

413 size=len(chunk_content), 

414 item_count=len(chunk_items), 

415 ) 

416 chunks.append(chunk_element) 

417 

418 elif element.element_type == JSONElementType.OBJECT and isinstance( 

419 element.value, dict 

420 ): 

421 # Split object by grouping properties 

422 items = list(element.value.items()) 

423 current_chunk = {} 

424 current_size = 0 

425 chunk_index = 1 

426 

427 for key, value in items: 

428 item_content = json.dumps({key: value}, indent=2, ensure_ascii=False) 

429 item_size = len(item_content) 

430 

431 if current_size + item_size > self.chunk_size and current_chunk: 

432 # Create chunk from current items 

433 chunk_content = json.dumps( 

434 current_chunk, indent=2, ensure_ascii=False 

435 ) 

436 chunk_element = JSONElement( 

437 name=f"{element.name}_chunk_{chunk_index}", 

438 element_type=JSONElementType.OBJECT, 

439 content=chunk_content, 

440 value=current_chunk.copy(), 

441 path=f"{element.path}_chunk_{chunk_index}", 

442 level=element.level, 

443 size=len(chunk_content), 

444 item_count=len(current_chunk), 

445 ) 

446 chunks.append(chunk_element) 

447 

448 # Start new chunk 

449 current_chunk = {key: value} 

450 current_size = item_size 

451 chunk_index += 1 

452 else: 

453 current_chunk[key] = value 

454 current_size += item_size 

455 

456 # Add remaining items 

457 if current_chunk: 

458 chunk_content = json.dumps(current_chunk, indent=2, ensure_ascii=False) 

459 chunk_element = JSONElement( 

460 name=f"{element.name}_chunk_{chunk_index}", 

461 element_type=JSONElementType.OBJECT, 

462 content=chunk_content, 

463 value=current_chunk, 

464 path=f"{element.path}_chunk_{chunk_index}", 

465 level=element.level, 

466 size=len(chunk_content), 

467 item_count=len(current_chunk), 

468 ) 

469 chunks.append(chunk_element) 

470 else: 

471 # For other types, split by lines as fallback 

472 lines = element.content.split("\n") 

473 current_chunk_lines = [] 

474 current_size = 0 

475 chunk_index = 1 

476 

477 for line in lines: 

478 line_size = len(line) + 1 # +1 for newline 

479 

480 if current_size + line_size > self.chunk_size and current_chunk_lines: 

481 chunk_content = "\n".join(current_chunk_lines) 

482 chunk_element = JSONElement( 

483 name=f"{element.name}_chunk_{chunk_index}", 

484 element_type=element.element_type, 

485 content=chunk_content, 

486 value=chunk_content, # Use content as value for text chunks 

487 path=f"{element.path}_chunk_{chunk_index}", 

488 level=element.level, 

489 size=len(chunk_content), 

490 item_count=len(current_chunk_lines), 

491 ) 

492 chunks.append(chunk_element) 

493 

494 current_chunk_lines = [line] 

495 current_size = line_size 

496 chunk_index += 1 

497 else: 

498 current_chunk_lines.append(line) 

499 current_size += line_size 

500 

501 # Add remaining lines 

502 if current_chunk_lines: 

503 chunk_content = "\n".join(current_chunk_lines) 

504 chunk_element = JSONElement( 

505 name=f"{element.name}_chunk_{chunk_index}", 

506 element_type=element.element_type, 

507 content=chunk_content, 

508 value=chunk_content, 

509 path=f"{element.path}_chunk_{chunk_index}", 

510 level=element.level, 

511 size=len(chunk_content), 

512 item_count=len(current_chunk_lines), 

513 ) 

514 chunks.append(chunk_element) 

515 

516 return chunks if chunks else [element] 

517 

518 def _extract_json_metadata(self, element: JSONElement) -> dict[str, Any]: 

519 """Extract metadata from a JSON element. 

520 

521 Args: 

522 element: JSON element to analyze 

523 

524 Returns: 

525 Dictionary containing element metadata 

526 """ 

527 metadata = { 

528 "element_type": element.element_type.value, 

529 "name": element.name, 

530 "path": element.path, 

531 "level": element.level, 

532 "size": element.size, 

533 "item_count": element.item_count, 

534 "has_nested_objects": False, 

535 "has_arrays": False, 

536 "data_types": [], 

537 } 

538 

539 # Analyze value types 

540 if isinstance(element.value, dict): 

541 metadata["data_types"] = list( 

542 {type(v).__name__ for v in element.value.values()} 

543 ) 

544 metadata["has_nested_objects"] = any( 

545 isinstance(v, dict) for v in element.value.values() 

546 ) 

547 metadata["has_arrays"] = any( 

548 isinstance(v, list) for v in element.value.values() 

549 ) 

550 elif isinstance(element.value, list) and element.value: 

551 metadata["data_types"] = list({type(v).__name__ for v in element.value}) 

552 metadata["has_nested_objects"] = any( 

553 isinstance(v, dict) for v in element.value 

554 ) 

555 metadata["has_arrays"] = any(isinstance(v, list) for v in element.value) 

556 else: 

557 metadata["data_types"] = [type(element.value).__name__] 

558 

559 # Add parent context 

560 if element.parent: 

561 metadata.update( 

562 { 

563 "parent_name": element.parent.name, 

564 "parent_type": element.parent.element_type.value, 

565 "parent_path": element.parent.path, 

566 } 

567 ) 

568 

569 return metadata 

570 

571 def chunk_document(self, document: Document) -> list[Document]: 

572 """Chunk a JSON document using structural boundaries. 

573 

574 Args: 

575 document: Document to chunk 

576 

577 Returns: 

578 List of chunked documents 

579 """ 

580 file_name = ( 

581 document.metadata.get("file_name") 

582 or document.metadata.get("original_filename") 

583 or document.title 

584 or f"{document.source_type}:{document.source}" 

585 ) 

586 

587 # Start progress tracking 

588 self.progress_tracker.start_chunking( 

589 document.id, 

590 document.source, 

591 document.source_type, 

592 len(document.content), 

593 file_name, 

594 ) 

595 

596 try: 

597 # Performance check: for very large files, use simple chunking 

598 if len(document.content) > SIMPLE_CHUNKING_THRESHOLD: 

599 self.progress_tracker.log_fallback( 

600 document.id, f"Large JSON file ({len(document.content)} bytes)" 

601 ) 

602 return self._fallback_chunking(document) 

603 

604 # Parse JSON structure 

605 root_element = self._parse_json_structure(document.content) 

606 

607 if not root_element: 

608 self.progress_tracker.log_fallback(document.id, "JSON parsing failed") 

609 return self._fallback_chunking(document) 

610 

611 # Get all elements to chunk 

612 elements_to_chunk = [] 

613 

614 if root_element.children: 

615 # Use top-level children as chunks 

616 elements_to_chunk = root_element.children 

617 else: 

618 # Use root element if no children 

619 elements_to_chunk = [root_element] 

620 

621 # Group small elements and split large ones 

622 grouped_elements = self._group_small_elements(elements_to_chunk) 

623 final_elements = [] 

624 

625 for element in grouped_elements: 

626 if element.size > self.chunk_size: 

627 # Split large elements 

628 split_elements = self._split_large_element(element) 

629 final_elements.extend(split_elements) 

630 else: 

631 final_elements.append(element) 

632 

633 # Limit total elements 

634 final_elements = final_elements[:MAX_OBJECTS_TO_PROCESS] 

635 

636 if not final_elements: 

637 self.progress_tracker.finish_chunking(document.id, 0, "json") 

638 return [] 

639 

640 # Create chunked documents 

641 chunked_docs = [] 

642 for i, element in enumerate(final_elements): 

643 self.logger.debug( 

644 f"Processing element {i+1}/{len(final_elements)}", 

645 extra={ 

646 "element_name": element.name, 

647 "element_type": element.element_type.value, 

648 "content_size": element.size, 

649 }, 

650 ) 

651 

652 # Create chunk document with optimized metadata processing 

653 skip_nlp = element.size > MAX_CHUNK_SIZE_FOR_NLP 

654 

655 if skip_nlp: 

656 # Create chunk without expensive NLP processing 

657 chunk_doc = self._create_optimized_chunk_document( 

658 original_doc=document, 

659 chunk_content=element.content, 

660 chunk_index=i, 

661 total_chunks=len(final_elements), 

662 skip_nlp=True, 

663 ) 

664 else: 

665 # Use normal processing for smaller chunks 

666 chunk_doc = self._create_chunk_document( 

667 original_doc=document, 

668 chunk_content=element.content, 

669 chunk_index=i, 

670 total_chunks=len(final_elements), 

671 ) 

672 

673 # Add JSON-specific metadata 

674 json_metadata = self._extract_json_metadata(element) 

675 json_metadata["chunking_strategy"] = "json" 

676 json_metadata["chunking_method"] = "structured_json" 

677 json_metadata["parent_document_id"] = document.id 

678 chunk_doc.metadata.update(json_metadata) 

679 

680 chunked_docs.append(chunk_doc) 

681 

682 # Finish progress tracking 

683 self.progress_tracker.finish_chunking( 

684 document.id, len(chunked_docs), "json" 

685 ) 

686 return chunked_docs 

687 

688 except Exception as e: 

689 self.progress_tracker.log_error(document.id, str(e)) 

690 # Fallback to default chunking 

691 self.progress_tracker.log_fallback( 

692 document.id, f"JSON processing failed: {str(e)}" 

693 ) 

694 return self._fallback_chunking(document) 

695 

696 def _create_optimized_chunk_document( 

697 self, 

698 original_doc: Document, 

699 chunk_content: str, 

700 chunk_index: int, 

701 total_chunks: int, 

702 skip_nlp: bool = False, 

703 ) -> Document: 

704 """Create a chunk document with optimized processing. 

705 

706 Args: 

707 original_doc: Original document 

708 chunk_content: Content of the chunk 

709 chunk_index: Index of the chunk 

710 total_chunks: Total number of chunks 

711 skip_nlp: Whether to skip NLP processing 

712 

713 Returns: 

714 Document: New document instance for the chunk 

715 """ 

716 # Create enhanced metadata 

717 metadata = original_doc.metadata.copy() 

718 metadata.update( 

719 { 

720 "chunk_index": chunk_index, 

721 "total_chunks": total_chunks, 

722 } 

723 ) 

724 

725 if skip_nlp: 

726 # Skip expensive NLP processing for large chunks 

727 metadata.update( 

728 { 

729 "entities": [], 

730 "pos_tags": [], 

731 "nlp_skipped": True, 

732 "skip_reason": "chunk_too_large", 

733 } 

734 ) 

735 else: 

736 try: 

737 # Process the chunk text to get additional features 

738 processed = self._process_text(chunk_content) 

739 metadata.update( 

740 { 

741 "entities": processed["entities"], 

742 "pos_tags": processed["pos_tags"], 

743 "nlp_skipped": False, 

744 } 

745 ) 

746 except Exception as e: 

747 self.logger.warning( 

748 f"NLP processing failed for chunk {chunk_index}: {e}" 

749 ) 

750 metadata.update( 

751 { 

752 "entities": [], 

753 "pos_tags": [], 

754 "nlp_skipped": True, 

755 "skip_reason": "nlp_error", 

756 } 

757 ) 

758 

759 return Document( 

760 content=chunk_content, 

761 metadata=metadata, 

762 source=original_doc.source, 

763 source_type=original_doc.source_type, 

764 url=original_doc.url, 

765 title=original_doc.title, 

766 content_type=original_doc.content_type, 

767 ) 

768 

769 def _fallback_chunking(self, document: Document) -> list[Document]: 

770 """Fallback to simple text-based chunking when JSON parsing fails. 

771 

772 Args: 

773 document: Document to chunk 

774 

775 Returns: 

776 List of chunked documents 

777 """ 

778 self.logger.warning("Falling back to simple text chunking for JSON document") 

779 

780 # Use simple line-based splitting for JSON 

781 lines = document.content.split("\n") 

782 chunks = [] 

783 current_chunk = [] 

784 current_size = 0 

785 

786 for line in lines: 

787 line_size = len(line) + 1 # +1 for newline 

788 

789 if current_size + line_size > self.chunk_size and current_chunk: 

790 chunks.append("\n".join(current_chunk)) 

791 current_chunk = [line] 

792 current_size = line_size 

793 else: 

794 current_chunk.append(line) 

795 current_size += line_size 

796 

797 # Add remaining lines 

798 if current_chunk: 

799 chunks.append("\n".join(current_chunk)) 

800 

801 # Create chunk documents (limited) 

802 chunked_docs = [] 

803 for i, chunk_content in enumerate(chunks[:MAX_OBJECTS_TO_PROCESS]): 

804 chunk_doc = self._create_optimized_chunk_document( 

805 original_doc=document, 

806 chunk_content=chunk_content, 

807 chunk_index=i, 

808 total_chunks=len(chunks), 

809 skip_nlp=len(chunk_content) > MAX_CHUNK_SIZE_FOR_NLP, 

810 ) 

811 

812 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

813 chunk_doc.metadata["parent_document_id"] = document.id 

814 chunk_doc.metadata["chunking_method"] = "fallback_text" 

815 

816 chunked_docs.append(chunk_doc) 

817 

818 return chunked_docs 

819 

820 def _split_text(self, text: str) -> list[str]: 

821 """Split text into chunks (required by base class). 

822 

823 Args: 

824 text: Text to split 

825 

826 Returns: 

827 List of text chunks 

828 """ 

829 # This method is required by the base class but not used in our implementation 

830 # We override chunk_document instead 

831 return [text]