Coverage for src/qdrant_loader/core/chunking/strategy/json_strategy.py: 98%

318 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""JSON-specific chunking strategy for structured data.""" 

2 

3import json 

4import re 

5from dataclasses import dataclass, field 

6from enum import Enum 

7from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union 

8 

9import structlog 

10 

11from qdrant_loader.config import Settings 

12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

13from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

14from qdrant_loader.core.document import Document 

15from qdrant_loader.utils.logging import LoggingConfig 

16 

17if TYPE_CHECKING: 

18 pass 

19 

20logger = structlog.get_logger(__name__) 

21 

22# Performance constants to prevent timeouts 

23MAX_JSON_SIZE_FOR_PARSING = 1_000_000 # 1MB limit for JSON parsing 

24MAX_OBJECTS_TO_PROCESS = 200 # Reduced limit for objects to prevent timeouts 

25MAX_CHUNK_SIZE_FOR_NLP = 20_000 # 20KB limit for NLP processing 

26MAX_RECURSION_DEPTH = 5 # Limit recursion depth for nested structures 

27MAX_ARRAY_ITEMS_TO_PROCESS = 50 # Limit array items to process 

28MAX_OBJECT_KEYS_TO_PROCESS = 100 # Limit object keys to process 

29SIMPLE_CHUNKING_THRESHOLD = 500_000 # Use simple chunking for files larger than 500KB 

30 

31 

32class JSONElementType(Enum): 

33 """Types of JSON elements.""" 

34 

35 OBJECT = "object" 

36 ARRAY = "array" 

37 ARRAY_ITEM = "array_item" 

38 PROPERTY = "property" 

39 VALUE = "value" 

40 ROOT = "root" 

41 

42 

43@dataclass 

44class JSONElement: 

45 """Represents a JSON element with its metadata.""" 

46 

47 name: str 

48 element_type: JSONElementType 

49 content: str 

50 value: Any 

51 path: str # JSON path like "root.users[0].name" 

52 level: int = 0 

53 parent: Optional["JSONElement"] = None 

54 children: list["JSONElement"] = field(default_factory=list) 

55 size: int = 0 # Size in characters 

56 item_count: int = 0 # Number of items for arrays/objects 

57 

58 def add_child(self, child: "JSONElement"): 

59 """Add a child element.""" 

60 self.children.append(child) 

61 child.parent = self 

62 

63 

64class JSONChunkingStrategy(BaseChunkingStrategy): 

65 """Strategy for chunking JSON documents based on structure. 

66 

67 This strategy parses JSON structure and creates chunks based on: 

68 - Top-level objects and arrays 

69 - Large nested objects 

70 - Array items (grouped when small) 

71 - Preserving JSON structure and hierarchy 

72 """ 

73 

74 def __init__(self, settings: Settings): 

75 """Initialize the JSON chunking strategy. 

76 

77 Args: 

78 settings: Configuration settings 

79 """ 

80 super().__init__(settings) 

81 self.logger = logger 

82 self.progress_tracker = ChunkingProgressTracker(logger) 

83 

84 # Cache for processed chunks 

85 self._processed_chunks = {} 

86 

87 # Minimum size for standalone chunks 

88 self.min_chunk_size = 200 

89 

90 # Maximum items to group together in arrays 

91 self.max_array_items_per_chunk = 50 

92 

93 def _parse_json_structure(self, content: str) -> JSONElement | None: 

94 """Parse JSON content into structured elements. 

95 

96 Args: 

97 content: JSON content to parse 

98 

99 Returns: 

100 Root JSON element or None if parsing fails 

101 """ 

102 # Performance check: skip parsing for very large files 

103 if len(content) > MAX_JSON_SIZE_FOR_PARSING: 

104 self.logger.info( 

105 f"JSON too large for structured parsing ({len(content)} bytes)" 

106 ) 

107 return None 

108 

109 try: 

110 data = json.loads(content) 

111 root_element = self._create_json_element( 

112 "root", data, JSONElementType.ROOT, "root" 

113 ) 

114 # Initialize processed count for this document 

115 processed_count = [0] 

116 self._extract_json_elements(root_element, data, "root", 0, processed_count) 

117 self.logger.debug( 

118 f"Processed {processed_count[0]} JSON elements (limit: {MAX_OBJECTS_TO_PROCESS})" 

119 ) 

120 return root_element 

121 

122 except json.JSONDecodeError as e: 

123 self.logger.warning(f"Failed to parse JSON: {e}") 

124 return None 

125 except Exception as e: 

126 self.logger.warning(f"Error parsing JSON structure: {e}") 

127 return None 

128 

129 def _create_json_element( 

130 self, 

131 name: str, 

132 value: Any, 

133 element_type: JSONElementType, 

134 path: str, 

135 level: int = 0, 

136 ) -> JSONElement: 

137 """Create a JSON element from a value. 

138 

139 Args: 

140 name: Element name 

141 value: JSON value 

142 element_type: Type of JSON element 

143 path: JSON path 

144 level: Nesting level 

145 

146 Returns: 

147 JSONElement instance 

148 """ 

149 # Convert value to JSON string for content 

150 try: 

151 content = json.dumps(value, indent=2, ensure_ascii=False) 

152 except (TypeError, ValueError): 

153 content = str(value) 

154 

155 # Calculate size and item count 

156 size = len(content) 

157 item_count = 0 

158 

159 if isinstance(value, dict): 

160 item_count = len(value) 

161 elif isinstance(value, list): 

162 item_count = len(value) 

163 

164 return JSONElement( 

165 name=name, 

166 element_type=element_type, 

167 content=content, 

168 value=value, 

169 path=path, 

170 level=level, 

171 size=size, 

172 item_count=item_count, 

173 ) 

174 

175 def _extract_json_elements( 

176 self, 

177 parent_element: JSONElement, 

178 data: Any, 

179 path: str, 

180 level: int = 0, 

181 processed_count: list[int] | None = None, 

182 ): 

183 """Recursively extract JSON elements. 

184 

185 Args: 

186 parent_element: Parent JSON element 

187 data: JSON data to process 

188 path: Current JSON path 

189 level: Current nesting level 

190 processed_count: Mutable list to track total processed objects 

191 """ 

192 if processed_count is None: 

193 processed_count = [0] 

194 

195 # Performance checks 

196 if level > MAX_RECURSION_DEPTH: # Limit recursion depth 

197 return 

198 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: # Global limit 

199 return 

200 if len(parent_element.children) >= MAX_ARRAY_ITEMS_TO_PROCESS: # Local limit 

201 return 

202 

203 if isinstance(data, dict): 

204 for i, (key, value) in enumerate(data.items()): 

205 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: 

206 break 

207 if i >= MAX_OBJECT_KEYS_TO_PROCESS: # Limit keys per object 

208 break 

209 

210 processed_count[0] += 1 

211 child_path = f"{path}.{key}" 

212 

213 if isinstance(value, dict | list): 

214 # Create element for complex values 

215 element_type = ( 

216 JSONElementType.OBJECT 

217 if isinstance(value, dict) 

218 else JSONElementType.ARRAY 

219 ) 

220 child_element = self._create_json_element( 

221 key, value, element_type, child_path, level + 1 

222 ) 

223 parent_element.add_child(child_element) 

224 

225 # Recursively process if not too large 

226 if child_element.size < self.chunk_size: 

227 self._extract_json_elements( 

228 child_element, value, child_path, level + 1, processed_count 

229 ) 

230 else: 

231 # Create element for simple values 

232 child_element = self._create_json_element( 

233 key, value, JSONElementType.PROPERTY, child_path, level + 1 

234 ) 

235 parent_element.add_child(child_element) 

236 

237 elif isinstance(data, list): 

238 for i, item in enumerate(data): 

239 if processed_count[0] >= MAX_OBJECTS_TO_PROCESS: 

240 break 

241 if i >= MAX_ARRAY_ITEMS_TO_PROCESS: # Limit array items 

242 break 

243 

244 processed_count[0] += 1 

245 child_path = f"{path}[{i}]" 

246 

247 if isinstance(item, dict | list): 

248 # Create element for complex array items 

249 element_type = ( 

250 JSONElementType.OBJECT 

251 if isinstance(item, dict) 

252 else JSONElementType.ARRAY 

253 ) 

254 child_element = self._create_json_element( 

255 f"item_{i}", item, element_type, child_path, level + 1 

256 ) 

257 parent_element.add_child(child_element) 

258 

259 # Recursively process if not too large 

260 if child_element.size < self.chunk_size: 

261 self._extract_json_elements( 

262 child_element, item, child_path, level + 1, processed_count 

263 ) 

264 else: 

265 # Create element for simple array items 

266 child_element = self._create_json_element( 

267 f"item_{i}", 

268 item, 

269 JSONElementType.ARRAY_ITEM, 

270 child_path, 

271 level + 1, 

272 ) 

273 parent_element.add_child(child_element) 

274 

275 def _group_small_elements(self, elements: list[JSONElement]) -> list[JSONElement]: 

276 """Group small JSON elements into larger chunks. 

277 

278 Args: 

279 elements: List of JSON elements 

280 

281 Returns: 

282 List of grouped elements 

283 """ 

284 if not elements: 

285 return [] 

286 

287 grouped = [] 

288 current_group = [] 

289 current_size = 0 

290 

291 for element in elements: 

292 # If element is large enough or is a significant structure, keep it separate 

293 if ( 

294 element.size >= self.min_chunk_size 

295 or element.element_type 

296 in [JSONElementType.OBJECT, JSONElementType.ARRAY] 

297 or element.item_count > MAX_OBJECT_KEYS_TO_PROCESS 

298 ): 

299 

300 # First, add any accumulated small elements 

301 if current_group: 

302 grouped_element = self._create_grouped_element(current_group) 

303 grouped.append(grouped_element) 

304 current_group = [] 

305 current_size = 0 

306 

307 # Add the large element 

308 grouped.append(element) 

309 else: 

310 # Accumulate small elements 

311 current_group.append(element) 

312 current_size += element.size 

313 

314 # If accumulated size is large enough, create a grouped element 

315 if ( 

316 current_size >= self.min_chunk_size 

317 or len(current_group) >= self.max_array_items_per_chunk 

318 ): 

319 grouped_element = self._create_grouped_element(current_group) 

320 grouped.append(grouped_element) 

321 current_group = [] 

322 current_size = 0 

323 

324 # Handle remaining small elements 

325 if current_group: 

326 grouped_element = self._create_grouped_element(current_group) 

327 grouped.append(grouped_element) 

328 

329 return grouped 

330 

331 def _create_grouped_element(self, elements: list[JSONElement]) -> JSONElement: 

332 """Create a grouped element from multiple small elements. 

333 

334 Args: 

335 elements: List of elements to group 

336 

337 Returns: 

338 Grouped JSON element 

339 """ 

340 if not elements: 

341 raise ValueError("Cannot group empty list of elements") 

342 

343 if len(elements) == 1: 

344 return elements[0] 

345 

346 # Create grouped content 

347 if all(elem.element_type == JSONElementType.ARRAY_ITEM for elem in elements): 

348 # Group array items into an array 

349 grouped_value = [elem.value for elem in elements] 

350 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False) 

351 element_type = JSONElementType.ARRAY 

352 name = f"grouped_items_{len(elements)}" 

353 else: 

354 # Group mixed elements into an object 

355 grouped_value = {} 

356 for elem in elements: 

357 key = elem.name if elem.name != "root" else f"item_{len(grouped_value)}" 

358 grouped_value[key] = elem.value 

359 grouped_content = json.dumps(grouped_value, indent=2, ensure_ascii=False) 

360 element_type = JSONElementType.OBJECT 

361 name = f"grouped_elements_{len(elements)}" 

362 

363 # Use the first element's path as base 

364 base_path = elements[0].path 

365 parent_path = ( 

366 ".".join(base_path.split(".")[:-1]) if "." in base_path else "root" 

367 ) 

368 grouped_path = f"{parent_path}.{name}" 

369 

370 grouped_element = JSONElement( 

371 name=name, 

372 element_type=element_type, 

373 content=grouped_content, 

374 value=grouped_value, 

375 path=grouped_path, 

376 level=min(elem.level for elem in elements), 

377 size=len(grouped_content), 

378 item_count=len(elements), 

379 ) 

380 

381 return grouped_element 

382 

383 def _split_large_element(self, element: JSONElement) -> list[JSONElement]: 

384 """Split a large JSON element into smaller chunks. 

385 

386 Args: 

387 element: Large JSON element to split 

388 

389 Returns: 

390 List of smaller elements 

391 """ 

392 if element.size <= self.chunk_size: 

393 return [element] 

394 

395 chunks = [] 

396 

397 if element.element_type == JSONElementType.ARRAY and isinstance( 

398 element.value, list 

399 ): 

400 # Split array into smaller arrays 

401 items = element.value 

402 chunk_size = self.max_array_items_per_chunk 

403 

404 for i in range(0, len(items), chunk_size): 

405 chunk_items = items[i : i + chunk_size] 

406 chunk_content = json.dumps(chunk_items, indent=2, ensure_ascii=False) 

407 

408 chunk_element = JSONElement( 

409 name=f"{element.name}_chunk_{i//chunk_size + 1}", 

410 element_type=JSONElementType.ARRAY, 

411 content=chunk_content, 

412 value=chunk_items, 

413 path=f"{element.path}_chunk_{i//chunk_size + 1}", 

414 level=element.level, 

415 size=len(chunk_content), 

416 item_count=len(chunk_items), 

417 ) 

418 chunks.append(chunk_element) 

419 

420 elif element.element_type == JSONElementType.OBJECT and isinstance( 

421 element.value, dict 

422 ): 

423 # Split object by grouping properties 

424 items = list(element.value.items()) 

425 current_chunk = {} 

426 current_size = 0 

427 chunk_index = 1 

428 

429 for key, value in items: 

430 item_content = json.dumps({key: value}, indent=2, ensure_ascii=False) 

431 item_size = len(item_content) 

432 

433 if current_size + item_size > self.chunk_size and current_chunk: 

434 # Create chunk from current items 

435 chunk_content = json.dumps( 

436 current_chunk, indent=2, ensure_ascii=False 

437 ) 

438 chunk_element = JSONElement( 

439 name=f"{element.name}_chunk_{chunk_index}", 

440 element_type=JSONElementType.OBJECT, 

441 content=chunk_content, 

442 value=current_chunk.copy(), 

443 path=f"{element.path}_chunk_{chunk_index}", 

444 level=element.level, 

445 size=len(chunk_content), 

446 item_count=len(current_chunk), 

447 ) 

448 chunks.append(chunk_element) 

449 

450 # Start new chunk 

451 current_chunk = {key: value} 

452 current_size = item_size 

453 chunk_index += 1 

454 else: 

455 current_chunk[key] = value 

456 current_size += item_size 

457 

458 # Add remaining items 

459 if current_chunk: 

460 chunk_content = json.dumps(current_chunk, indent=2, ensure_ascii=False) 

461 chunk_element = JSONElement( 

462 name=f"{element.name}_chunk_{chunk_index}", 

463 element_type=JSONElementType.OBJECT, 

464 content=chunk_content, 

465 value=current_chunk, 

466 path=f"{element.path}_chunk_{chunk_index}", 

467 level=element.level, 

468 size=len(chunk_content), 

469 item_count=len(current_chunk), 

470 ) 

471 chunks.append(chunk_element) 

472 else: 

473 # For other types, split by lines as fallback 

474 lines = element.content.split("\n") 

475 current_chunk_lines = [] 

476 current_size = 0 

477 chunk_index = 1 

478 

479 for line in lines: 

480 line_size = len(line) + 1 # +1 for newline 

481 

482 if current_size + line_size > self.chunk_size and current_chunk_lines: 

483 chunk_content = "\n".join(current_chunk_lines) 

484 chunk_element = JSONElement( 

485 name=f"{element.name}_chunk_{chunk_index}", 

486 element_type=element.element_type, 

487 content=chunk_content, 

488 value=chunk_content, # Use content as value for text chunks 

489 path=f"{element.path}_chunk_{chunk_index}", 

490 level=element.level, 

491 size=len(chunk_content), 

492 item_count=len(current_chunk_lines), 

493 ) 

494 chunks.append(chunk_element) 

495 

496 current_chunk_lines = [line] 

497 current_size = line_size 

498 chunk_index += 1 

499 else: 

500 current_chunk_lines.append(line) 

501 current_size += line_size 

502 

503 # Add remaining lines 

504 if current_chunk_lines: 

505 chunk_content = "\n".join(current_chunk_lines) 

506 chunk_element = JSONElement( 

507 name=f"{element.name}_chunk_{chunk_index}", 

508 element_type=element.element_type, 

509 content=chunk_content, 

510 value=chunk_content, 

511 path=f"{element.path}_chunk_{chunk_index}", 

512 level=element.level, 

513 size=len(chunk_content), 

514 item_count=len(current_chunk_lines), 

515 ) 

516 chunks.append(chunk_element) 

517 

518 return chunks if chunks else [element] 

519 

520 def _extract_json_metadata(self, element: JSONElement) -> dict[str, Any]: 

521 """Extract metadata from a JSON element. 

522 

523 Args: 

524 element: JSON element to analyze 

525 

526 Returns: 

527 Dictionary containing element metadata 

528 """ 

529 metadata = { 

530 "element_type": element.element_type.value, 

531 "name": element.name, 

532 "path": element.path, 

533 "level": element.level, 

534 "size": element.size, 

535 "item_count": element.item_count, 

536 "has_nested_objects": False, 

537 "has_arrays": False, 

538 "data_types": [], 

539 } 

540 

541 # Analyze value types 

542 if isinstance(element.value, dict): 

543 metadata["data_types"] = list( 

544 {type(v).__name__ for v in element.value.values()} 

545 ) 

546 metadata["has_nested_objects"] = any( 

547 isinstance(v, dict) for v in element.value.values() 

548 ) 

549 metadata["has_arrays"] = any( 

550 isinstance(v, list) for v in element.value.values() 

551 ) 

552 elif isinstance(element.value, list) and element.value: 

553 metadata["data_types"] = list({type(v).__name__ for v in element.value}) 

554 metadata["has_nested_objects"] = any( 

555 isinstance(v, dict) for v in element.value 

556 ) 

557 metadata["has_arrays"] = any(isinstance(v, list) for v in element.value) 

558 else: 

559 metadata["data_types"] = [type(element.value).__name__] 

560 

561 # Add parent context 

562 if element.parent: 

563 metadata.update( 

564 { 

565 "parent_name": element.parent.name, 

566 "parent_type": element.parent.element_type.value, 

567 "parent_path": element.parent.path, 

568 } 

569 ) 

570 

571 return metadata 

572 

573 def chunk_document(self, document: Document) -> list[Document]: 

574 """Chunk a JSON document using structural boundaries. 

575 

576 Args: 

577 document: Document to chunk 

578 

579 Returns: 

580 List of chunked documents 

581 """ 

582 file_name = ( 

583 document.metadata.get("file_name") 

584 or document.metadata.get("original_filename") 

585 or document.title 

586 or f"{document.source_type}:{document.source}" 

587 ) 

588 

589 # Start progress tracking 

590 self.progress_tracker.start_chunking( 

591 document.id, 

592 document.source, 

593 document.source_type, 

594 len(document.content), 

595 file_name, 

596 ) 

597 

598 try: 

599 # Performance check: for very large files, use simple chunking 

600 if len(document.content) > SIMPLE_CHUNKING_THRESHOLD: 

601 self.progress_tracker.log_fallback( 

602 document.id, f"Large JSON file ({len(document.content)} bytes)" 

603 ) 

604 return self._fallback_chunking(document) 

605 

606 # Parse JSON structure 

607 root_element = self._parse_json_structure(document.content) 

608 

609 if not root_element: 

610 self.progress_tracker.log_fallback(document.id, "JSON parsing failed") 

611 return self._fallback_chunking(document) 

612 

613 # Get all elements to chunk 

614 elements_to_chunk = [] 

615 

616 if root_element.children: 

617 # Use top-level children as chunks 

618 elements_to_chunk = root_element.children 

619 else: 

620 # Use root element if no children 

621 elements_to_chunk = [root_element] 

622 

623 # Group small elements and split large ones 

624 grouped_elements = self._group_small_elements(elements_to_chunk) 

625 final_elements = [] 

626 

627 for element in grouped_elements: 

628 if element.size > self.chunk_size: 

629 # Split large elements 

630 split_elements = self._split_large_element(element) 

631 final_elements.extend(split_elements) 

632 else: 

633 final_elements.append(element) 

634 

635 # Limit total elements 

636 final_elements = final_elements[:MAX_OBJECTS_TO_PROCESS] 

637 

638 if not final_elements: 

639 self.progress_tracker.finish_chunking(document.id, 0, "json") 

640 return [] 

641 

642 # Create chunked documents 

643 chunked_docs = [] 

644 for i, element in enumerate(final_elements): 

645 self.logger.debug( 

646 f"Processing element {i+1}/{len(final_elements)}", 

647 extra={ 

648 "element_name": element.name, 

649 "element_type": element.element_type.value, 

650 "content_size": element.size, 

651 }, 

652 ) 

653 

654 # Create chunk document with optimized metadata processing 

655 skip_nlp = element.size > MAX_CHUNK_SIZE_FOR_NLP 

656 

657 if skip_nlp: 

658 # Create chunk without expensive NLP processing 

659 chunk_doc = self._create_optimized_chunk_document( 

660 original_doc=document, 

661 chunk_content=element.content, 

662 chunk_index=i, 

663 total_chunks=len(final_elements), 

664 skip_nlp=True, 

665 ) 

666 else: 

667 # Use normal processing for smaller chunks 

668 chunk_doc = self._create_chunk_document( 

669 original_doc=document, 

670 chunk_content=element.content, 

671 chunk_index=i, 

672 total_chunks=len(final_elements), 

673 ) 

674 

675 # Add JSON-specific metadata 

676 json_metadata = self._extract_json_metadata(element) 

677 json_metadata["chunking_strategy"] = "json" 

678 json_metadata["chunking_method"] = "structured_json" 

679 json_metadata["parent_document_id"] = document.id 

680 chunk_doc.metadata.update(json_metadata) 

681 

682 chunked_docs.append(chunk_doc) 

683 

684 # Finish progress tracking 

685 self.progress_tracker.finish_chunking( 

686 document.id, len(chunked_docs), "json" 

687 ) 

688 return chunked_docs 

689 

690 except Exception as e: 

691 self.progress_tracker.log_error(document.id, str(e)) 

692 # Fallback to default chunking 

693 self.progress_tracker.log_fallback( 

694 document.id, f"JSON processing failed: {str(e)}" 

695 ) 

696 return self._fallback_chunking(document) 

697 

698 def _create_optimized_chunk_document( 

699 self, 

700 original_doc: Document, 

701 chunk_content: str, 

702 chunk_index: int, 

703 total_chunks: int, 

704 skip_nlp: bool = False, 

705 ) -> Document: 

706 """Create a chunk document with optimized processing. 

707 

708 Args: 

709 original_doc: Original document 

710 chunk_content: Content of the chunk 

711 chunk_index: Index of the chunk 

712 total_chunks: Total number of chunks 

713 skip_nlp: Whether to skip NLP processing 

714 

715 Returns: 

716 Document: New document instance for the chunk 

717 """ 

718 # Create enhanced metadata 

719 metadata = original_doc.metadata.copy() 

720 metadata.update( 

721 { 

722 "chunk_index": chunk_index, 

723 "total_chunks": total_chunks, 

724 } 

725 ) 

726 

727 if skip_nlp: 

728 # Skip expensive NLP processing for large chunks 

729 metadata.update( 

730 { 

731 "entities": [], 

732 "pos_tags": [], 

733 "nlp_skipped": True, 

734 "skip_reason": "chunk_too_large", 

735 } 

736 ) 

737 else: 

738 try: 

739 # Process the chunk text to get additional features 

740 processed = self._process_text(chunk_content) 

741 metadata.update( 

742 { 

743 "entities": processed["entities"], 

744 "pos_tags": processed["pos_tags"], 

745 "nlp_skipped": False, 

746 } 

747 ) 

748 except Exception as e: 

749 self.logger.warning( 

750 f"NLP processing failed for chunk {chunk_index}: {e}" 

751 ) 

752 metadata.update( 

753 { 

754 "entities": [], 

755 "pos_tags": [], 

756 "nlp_skipped": True, 

757 "skip_reason": "nlp_error", 

758 } 

759 ) 

760 

761 return Document( 

762 content=chunk_content, 

763 metadata=metadata, 

764 source=original_doc.source, 

765 source_type=original_doc.source_type, 

766 url=original_doc.url, 

767 title=original_doc.title, 

768 content_type=original_doc.content_type, 

769 ) 

770 

771 def _fallback_chunking(self, document: Document) -> list[Document]: 

772 """Fallback to simple text-based chunking when JSON parsing fails. 

773 

774 Args: 

775 document: Document to chunk 

776 

777 Returns: 

778 List of chunked documents 

779 """ 

780 self.logger.warning("Falling back to simple text chunking for JSON document") 

781 

782 # Use simple line-based splitting for JSON 

783 lines = document.content.split("\n") 

784 chunks = [] 

785 current_chunk = [] 

786 current_size = 0 

787 

788 for line in lines: 

789 line_size = len(line) + 1 # +1 for newline 

790 

791 if current_size + line_size > self.chunk_size and current_chunk: 

792 chunks.append("\n".join(current_chunk)) 

793 current_chunk = [line] 

794 current_size = line_size 

795 else: 

796 current_chunk.append(line) 

797 current_size += line_size 

798 

799 # Add remaining lines 

800 if current_chunk: 

801 chunks.append("\n".join(current_chunk)) 

802 

803 # Create chunk documents (limited) 

804 chunked_docs = [] 

805 for i, chunk_content in enumerate(chunks[:MAX_OBJECTS_TO_PROCESS]): 

806 chunk_doc = self._create_optimized_chunk_document( 

807 original_doc=document, 

808 chunk_content=chunk_content, 

809 chunk_index=i, 

810 total_chunks=len(chunks), 

811 skip_nlp=len(chunk_content) > MAX_CHUNK_SIZE_FOR_NLP, 

812 ) 

813 

814 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

815 chunk_doc.metadata["parent_document_id"] = document.id 

816 chunk_doc.metadata["chunking_method"] = "fallback_text" 

817 

818 chunked_docs.append(chunk_doc) 

819 

820 return chunked_docs 

821 

822 def _split_text(self, text: str) -> list[str]: 

823 """Split text into chunks (required by base class). 

824 

825 Args: 

826 text: Text to split 

827 

828 Returns: 

829 List of text chunks 

830 """ 

831 # This method is required by the base class but not used in our implementation 

832 # We override chunk_document instead 

833 return [text]