Coverage for src/qdrant_loader/core/chunking/strategy/code_strategy.py: 81%

344 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Code-specific chunking strategy for programming languages.""" 

2 

3import ast 

4import re 

5from dataclasses import dataclass, field 

6from enum import Enum 

7from typing import Any, Optional 

8 

9import structlog 

10 

11# Tree-sitter imports with error handling 

12try: 

13 from tree_sitter_languages import get_language, get_parser 

14 

15 TREE_SITTER_AVAILABLE = True 

16except ImportError: 

17 TREE_SITTER_AVAILABLE = False 

18 get_language = None 

19 get_parser = None 

20 

21from qdrant_loader.config import Settings 

22from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

23from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

24from qdrant_loader.core.document import Document 

25 

26logger = structlog.get_logger(__name__) 

27 

28# Performance constants - Universal limits for all code files 

29MAX_FILE_SIZE_FOR_AST = ( 

30 75_000 # 75KB limit for AST parsing (balanced for all languages) 

31) 

32MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts 

33CHUNK_SIZE_THRESHOLD = 40_000 # Files larger than this use simple chunking 

34MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth 

35MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this 

36 

37 

38class CodeElementType(Enum): 

39 """Types of code elements.""" 

40 

41 MODULE = "module" 

42 CLASS = "class" 

43 FUNCTION = "function" 

44 METHOD = "method" 

45 PROPERTY = "property" 

46 VARIABLE = "variable" 

47 IMPORT = "import" 

48 COMMENT = "comment" 

49 DOCSTRING = "docstring" 

50 DECORATOR = "decorator" 

51 CONSTANT = "constant" 

52 INTERFACE = "interface" 

53 ENUM = "enum" 

54 STRUCT = "struct" 

55 NAMESPACE = "namespace" 

56 PACKAGE = "package" 

57 

58 

59@dataclass 

60class CodeElement: 

61 """Represents a code element with its metadata.""" 

62 

63 name: str 

64 element_type: CodeElementType 

65 content: str 

66 start_line: int 

67 end_line: int 

68 level: int = 0 

69 parent: Optional["CodeElement"] = None 

70 children: list["CodeElement"] = field(default_factory=list) 

71 docstring: str | None = None 

72 decorators: list[str] = field(default_factory=list) 

73 parameters: list[str] = field(default_factory=list) 

74 return_type: str | None = None 

75 visibility: str = "public" # public, private, protected 

76 is_async: bool = False 

77 is_static: bool = False 

78 is_abstract: bool = False 

79 complexity: int = 0 # Cyclomatic complexity 

80 dependencies: list[str] = field(default_factory=list) 

81 

82 def add_child(self, child: "CodeElement"): 

83 """Add a child element.""" 

84 self.children.append(child) 

85 child.parent = self 

86 

87 

88class CodeChunkingStrategy(BaseChunkingStrategy): 

89 """Strategy for chunking code files based on programming language structure. 

90 

91 This strategy uses AST parsing (primarily tree-sitter) to split code files into 

92 chunks based on semantic code elements, preserving the code structure and hierarchy. 

93 """ 

94 

95 def __init__(self, settings: Settings): 

96 """Initialize the code chunking strategy. 

97 

98 Args: 

99 settings: Configuration settings 

100 """ 

101 super().__init__(settings) 

102 self.logger = logger 

103 self.progress_tracker = ChunkingProgressTracker(logger) 

104 

105 # Language detection patterns 

106 self.language_patterns = { 

107 ".py": "python", 

108 ".pyx": "python", 

109 ".pyi": "python", 

110 ".java": "java", 

111 ".js": "javascript", 

112 ".jsx": "javascript", 

113 ".mjs": "javascript", 

114 ".ts": "typescript", 

115 ".tsx": "typescript", 

116 ".go": "go", 

117 ".rs": "rust", 

118 ".cpp": "cpp", 

119 ".cc": "cpp", 

120 ".cxx": "cpp", 

121 ".c": "c", 

122 ".h": "c", 

123 ".cs": "c_sharp", 

124 ".php": "php", 

125 ".rb": "ruby", 

126 ".kt": "kotlin", 

127 ".scala": "scala", 

128 ".swift": "swift", 

129 ".dart": "dart", 

130 } 

131 

132 # Cache for Tree-sitter parsers 

133 self._parsers = {} 

134 

135 # Check tree-sitter availability 

136 if not TREE_SITTER_AVAILABLE: 

137 self.logger.warning("Tree-sitter not available, will use fallback parsing") 

138 

139 def _detect_language(self, file_path: str, content: str) -> str: 

140 """Detect programming language from file extension. 

141 

142 Args: 

143 file_path: Path to the file 

144 content: File content (for future content-based detection) 

145 

146 Returns: 

147 Detected language name or "unknown" 

148 """ 

149 # Get file extension 

150 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else "" 

151 

152 return self.language_patterns.get(ext, "unknown") 

153 

154 def _get_tree_sitter_parser(self, language: str): 

155 """Get or create a Tree-sitter parser for the given language. 

156 

157 Args: 

158 language: Tree-sitter language name 

159 

160 Returns: 

161 Tree-sitter parser or None if not available 

162 """ 

163 if not TREE_SITTER_AVAILABLE or get_parser is None: 

164 return None 

165 

166 if language in self._parsers: 

167 return self._parsers[language] 

168 

169 try: 

170 parser = get_parser(language) 

171 self._parsers[language] = parser 

172 return parser 

173 except Exception as e: 

174 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}") 

175 return None 

176 

177 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]: 

178 """Parse code using Tree-sitter AST. 

179 

180 Args: 

181 content: Source code content 

182 language: Programming language 

183 

184 Returns: 

185 List of code elements 

186 """ 

187 # Performance check: universal size limit for all languages 

188 if len(content) > MAX_FILE_SIZE_FOR_AST: 

189 self.logger.info( 

190 f"{language.title()} file too large for AST parsing ({len(content)} bytes), using fallback" 

191 ) 

192 return [] 

193 

194 parser = self._get_tree_sitter_parser(language) 

195 if not parser: 

196 return [] 

197 

198 try: 

199 tree = parser.parse(content.encode("utf-8")) 

200 root_node = tree.root_node 

201 

202 elements = [] 

203 self._extract_ast_elements(root_node, content, elements, language) 

204 

205 # Limit number of elements to prevent timeouts (universal limit) 

206 if len(elements) > MAX_ELEMENTS_TO_PROCESS: 

207 self.logger.warning( 

208 f"Too many {language} elements ({len(elements)}), truncating to {MAX_ELEMENTS_TO_PROCESS}" 

209 ) 

210 elements = elements[:MAX_ELEMENTS_TO_PROCESS] 

211 

212 return elements 

213 

214 except Exception as e: 

215 self.logger.warning(f"Failed to parse with Tree-sitter for {language}: {e}") 

216 return [] 

217 

218 def _extract_ast_elements( 

219 self, 

220 node, 

221 content: str, 

222 elements: list[CodeElement], 

223 language: str, 

224 level: int = 0, 

225 ): 

226 """Extract code elements from Tree-sitter AST node. 

227 

228 Args: 

229 node: Tree-sitter AST node 

230 content: Source code content 

231 elements: List to append elements to 

232 language: Programming language 

233 level: Nesting level 

234 """ 

235 # Performance check: limit recursion depth 

236 if level > MAX_RECURSION_DEPTH: # Prevent deep recursion 

237 return 

238 

239 # Performance check: limit total elements (universal limit) 

240 if len(elements) >= MAX_ELEMENTS_TO_PROCESS: 

241 return 

242 

243 # Define node types that represent code elements for different languages 

244 element_mappings = { 

245 "python": { 

246 "function_definition": CodeElementType.FUNCTION, 

247 "async_function_definition": CodeElementType.FUNCTION, 

248 "class_definition": CodeElementType.CLASS, 

249 "import_statement": CodeElementType.IMPORT, 

250 "import_from_statement": CodeElementType.IMPORT, 

251 }, 

252 "java": { 

253 "method_declaration": CodeElementType.METHOD, 

254 "constructor_declaration": CodeElementType.METHOD, 

255 "class_declaration": CodeElementType.CLASS, 

256 "interface_declaration": CodeElementType.INTERFACE, 

257 "import_declaration": CodeElementType.IMPORT, 

258 }, 

259 "javascript": { 

260 "function_declaration": CodeElementType.FUNCTION, 

261 "method_definition": CodeElementType.METHOD, 

262 "class_declaration": CodeElementType.CLASS, 

263 "import_statement": CodeElementType.IMPORT, 

264 "variable_declaration": CodeElementType.VARIABLE, 

265 }, 

266 "typescript": { 

267 "function_declaration": CodeElementType.FUNCTION, 

268 "method_definition": CodeElementType.METHOD, 

269 "class_declaration": CodeElementType.CLASS, 

270 "interface_declaration": CodeElementType.INTERFACE, 

271 "import_statement": CodeElementType.IMPORT, 

272 }, 

273 "go": { 

274 "function_declaration": CodeElementType.FUNCTION, 

275 "method_declaration": CodeElementType.METHOD, 

276 "type_declaration": CodeElementType.STRUCT, 

277 "import_declaration": CodeElementType.IMPORT, 

278 }, 

279 "rust": { 

280 "function_item": CodeElementType.FUNCTION, 

281 "impl_item": CodeElementType.CLASS, 

282 "struct_item": CodeElementType.STRUCT, 

283 "enum_item": CodeElementType.ENUM, 

284 "trait_item": CodeElementType.INTERFACE, 

285 "use_declaration": CodeElementType.IMPORT, 

286 }, 

287 "cpp": { 

288 "function_definition": CodeElementType.FUNCTION, 

289 "class_specifier": CodeElementType.CLASS, 

290 "struct_specifier": CodeElementType.STRUCT, 

291 "namespace_definition": CodeElementType.NAMESPACE, 

292 "preproc_include": CodeElementType.IMPORT, 

293 }, 

294 "c": { 

295 "function_definition": CodeElementType.FUNCTION, 

296 "struct_specifier": CodeElementType.STRUCT, 

297 "preproc_include": CodeElementType.IMPORT, 

298 }, 

299 } 

300 

301 # Get element types for this language 

302 lang_elements = element_mappings.get(language, {}) 

303 

304 # Check if this node represents a code element 

305 if node.type in lang_elements: 

306 element_type = lang_elements[node.type] 

307 

308 # Extract element name 

309 name = self._extract_element_name(node, language) 

310 

311 # Get node text 

312 start_byte = node.start_byte 

313 end_byte = node.end_byte 

314 element_content = content[start_byte:end_byte] 

315 

316 # Skip very large elements to prevent timeouts (universal limit) 

317 if len(element_content) > MAX_ELEMENT_SIZE: 

318 self.logger.debug( 

319 f"Skipping large {language} element {name} ({len(element_content)} bytes)" 

320 ) 

321 return 

322 

323 # Create code element 

324 element = CodeElement( 

325 name=name, 

326 element_type=element_type, 

327 content=element_content, 

328 start_line=node.start_point[0] + 1, 

329 end_line=node.end_point[0] + 1, 

330 level=level, 

331 ) 

332 

333 # Extract additional metadata (simplified for performance) 

334 if element.element_type in [ 

335 CodeElementType.FUNCTION, 

336 CodeElementType.METHOD, 

337 ]: 

338 params_node = node.child_by_field_name("parameters") 

339 if params_node: 

340 element.parameters = self._extract_parameters_from_node(params_node) 

341 

342 elements.append(element) 

343 

344 # Process children with increased level (limited depth) 

345 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting 

346 for child in node.children: 

347 self._extract_ast_elements( 

348 child, content, elements, language, level + 1 

349 ) 

350 else: 

351 # Process children at same level (limited depth) 

352 if level < MAX_RECURSION_DEPTH: # Use full depth limit 

353 for child in node.children: 

354 self._extract_ast_elements( 

355 child, content, elements, language, level 

356 ) 

357 

358 def _extract_element_name(self, node, language: str) -> str: 

359 """Extract the name of a code element from Tree-sitter node. 

360 

361 Args: 

362 node: Tree-sitter AST node 

363 language: Programming language 

364 

365 Returns: 

366 Element name or "unknown" 

367 """ 

368 # Common patterns for finding names in different node types 

369 name_fields = ["name", "identifier", "field_identifier"] 

370 

371 for field_name in name_fields: 

372 name_node = node.child_by_field_name(field_name) 

373 if name_node: 

374 return name_node.text.decode("utf-8") 

375 

376 # Fallback: look for identifier children (limited search) 

377 for i, child in enumerate(node.children): 

378 if i > 5: # Limit search to first few children 

379 break 

380 if child.type == "identifier": 

381 return child.text.decode("utf-8") 

382 

383 return "unknown" 

384 

385 def _extract_parameters_from_node(self, params_node) -> list[str]: 

386 """Extract parameter names from a parameters node. 

387 

388 Args: 

389 params_node: Tree-sitter parameters node 

390 

391 Returns: 

392 List of parameter names 

393 """ 

394 parameters = [] 

395 for i, child in enumerate(params_node.children): 

396 if i > 20: # Limit to prevent timeouts 

397 break 

398 if child.type in ["identifier", "parameter", "typed_parameter"]: 

399 if child.type == "identifier": 

400 parameters.append(child.text.decode("utf-8")) 

401 else: 

402 # Look for identifier within parameter 

403 for subchild in child.children: 

404 if subchild.type == "identifier": 

405 parameters.append(subchild.text.decode("utf-8")) 

406 break 

407 return parameters 

408 

409 def _parse_python_ast(self, content: str) -> list[CodeElement]: 

410 """Parse Python code using Python's built-in AST as fallback. 

411 

412 Args: 

413 content: Python source code 

414 

415 Returns: 

416 List of code elements 

417 """ 

418 # Performance check: skip AST parsing for very large files 

419 if len(content) > MAX_FILE_SIZE_FOR_AST: 

420 self.logger.info( 

421 f"Python file too large for AST parsing ({len(content)} bytes)" 

422 ) 

423 return [] 

424 

425 elements = [] 

426 

427 try: 

428 tree = ast.parse(content) 

429 except SyntaxError as e: 

430 self.logger.warning(f"Failed to parse Python AST: {e}") 

431 return [] 

432 

433 def extract_docstring(node) -> str | None: 

434 """Extract docstring from a node.""" 

435 if ( 

436 isinstance(node, ast.FunctionDef | ast.ClassDef | ast.AsyncFunctionDef) 

437 and node.body 

438 and isinstance(node.body[0], ast.Expr) 

439 and isinstance(node.body[0].value, ast.Constant) 

440 and isinstance(node.body[0].value.value, str) 

441 ): 

442 return node.body[0].value.value 

443 return None 

444 

445 def get_decorators(node) -> list[str]: 

446 """Extract decorator names from a node.""" 

447 decorators = [] 

448 if hasattr(node, "decorator_list"): 

449 for decorator in node.decorator_list[:5]: # Limit decorators 

450 if isinstance(decorator, ast.Name): 

451 decorators.append(decorator.id) 

452 elif isinstance(decorator, ast.Attribute): 

453 decorators.append(f"{decorator.attr}") 

454 return decorators 

455 

456 def get_parameters(node) -> list[str]: 

457 """Extract parameter names from a function node.""" 

458 if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

459 return [] 

460 

461 params = [] 

462 for arg in node.args.args[:20]: # Limit parameters 

463 params.append(arg.arg) 

464 return params 

465 

466 def visit_node(node, level=0, parent_element=None): 

467 """Recursively visit AST nodes.""" 

468 # Performance checks 

469 if level > MAX_RECURSION_DEPTH: # Limit recursion depth 

470 return 

471 if len(elements) >= MAX_ELEMENTS_TO_PROCESS: 

472 return 

473 

474 element = None 

475 

476 if isinstance(node, ast.ClassDef): 

477 element = CodeElement( 

478 name=node.name, 

479 element_type=CodeElementType.CLASS, 

480 content=ast.get_source_segment(content, node) or "", 

481 start_line=node.lineno, 

482 end_line=node.end_lineno or node.lineno, 

483 level=level, 

484 docstring=extract_docstring(node), 

485 decorators=get_decorators(node), 

486 ) 

487 

488 elif isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

489 element_type = ( 

490 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION 

491 ) 

492 element = CodeElement( 

493 name=node.name, 

494 element_type=element_type, 

495 content=ast.get_source_segment(content, node) or "", 

496 start_line=node.lineno, 

497 end_line=node.end_lineno or node.lineno, 

498 level=level, 

499 docstring=extract_docstring(node), 

500 decorators=get_decorators(node), 

501 parameters=get_parameters(node), 

502 is_async=isinstance(node, ast.AsyncFunctionDef), 

503 ) 

504 

505 elif isinstance(node, ast.Import | ast.ImportFrom): 

506 import_names = [] 

507 if isinstance(node, ast.Import): 

508 import_names = [ 

509 alias.name for alias in node.names[:10] 

510 ] # Limit imports 

511 else: 

512 module = node.module or "" 

513 import_names = [ 

514 f"{module}.{alias.name}" for alias in node.names[:10] 

515 ] 

516 

517 element = CodeElement( 

518 name=", ".join(import_names), 

519 element_type=CodeElementType.IMPORT, 

520 content=ast.get_source_segment(content, node) or "", 

521 start_line=node.lineno, 

522 end_line=node.end_lineno or node.lineno, 

523 level=level, 

524 dependencies=import_names, 

525 ) 

526 

527 if element: 

528 # Skip very large elements 

529 if len(element.content) > MAX_ELEMENT_SIZE: 

530 return 

531 

532 if parent_element: 

533 parent_element.add_child(element) 

534 else: 

535 elements.append(element) 

536 

537 # Recursively process children (limited depth) 

538 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting 

539 for child in ast.iter_child_nodes(node): 

540 visit_node(child, level + 1, element) 

541 else: 

542 # For nodes we don't handle, still process their children (limited depth) 

543 if level < MAX_RECURSION_DEPTH: # Use full depth limit 

544 for child in ast.iter_child_nodes(node): 

545 visit_node(child, level, parent_element) 

546 

547 # Start processing from the root 

548 for node in ast.iter_child_nodes(tree): 

549 visit_node(node) 

550 

551 return elements 

552 

553 def _extract_code_metadata( 

554 self, element: CodeElement, language: str 

555 ) -> dict[str, Any]: 

556 """Extract metadata from a code element. 

557 

558 Args: 

559 element: The code element to analyze 

560 language: Programming language 

561 

562 Returns: 

563 Dictionary containing element metadata 

564 """ 

565 metadata = { 

566 "element_type": element.element_type.value, 

567 "name": element.name, 

568 "language": language, 

569 "start_line": element.start_line, 

570 "end_line": element.end_line, 

571 "line_count": element.end_line - element.start_line + 1, 

572 "level": element.level, 

573 "visibility": element.visibility, 

574 "is_async": element.is_async, 

575 "is_static": element.is_static, 

576 "is_abstract": element.is_abstract, 

577 "complexity": element.complexity, 

578 "has_docstring": element.docstring is not None, 

579 "docstring_length": len(element.docstring) if element.docstring else 0, 

580 "parameter_count": len(element.parameters), 

581 "decorator_count": len(element.decorators), 

582 "child_count": len(element.children), 

583 "dependency_count": len(element.dependencies), 

584 } 

585 

586 # Add specific metadata based on element type 

587 if element.element_type in [CodeElementType.FUNCTION, CodeElementType.METHOD]: 

588 metadata.update( 

589 { 

590 "parameters": element.parameters, 

591 "return_type": element.return_type, 

592 "decorators": element.decorators, 

593 } 

594 ) 

595 

596 if element.element_type == CodeElementType.CLASS: 

597 metadata.update( 

598 { 

599 "method_count": len( 

600 [ 

601 c 

602 for c in element.children 

603 if c.element_type == CodeElementType.METHOD 

604 ] 

605 ), 

606 "property_count": len( 

607 [ 

608 c 

609 for c in element.children 

610 if c.element_type == CodeElementType.PROPERTY 

611 ] 

612 ), 

613 } 

614 ) 

615 

616 if element.element_type == CodeElementType.IMPORT: 

617 metadata.update({"dependencies": element.dependencies}) 

618 

619 # Add parent context 

620 if element.parent: 

621 metadata.update( 

622 { 

623 "parent_name": element.parent.name, 

624 "parent_type": element.parent.element_type.value, 

625 "parent_level": element.parent.level, 

626 } 

627 ) 

628 

629 return metadata 

630 

631 def _merge_small_elements( 

632 self, elements: list[CodeElement], min_size: int = 200 

633 ) -> list[CodeElement]: 

634 """Merge small code elements to create more meaningful chunks. 

635 

636 Args: 

637 elements: List of code elements 

638 min_size: Minimum size for standalone elements 

639 

640 Returns: 

641 List of merged elements 

642 """ 

643 if not elements: 

644 return [] 

645 

646 merged = [] 

647 current_group = [] 

648 current_size = 0 

649 

650 for element in elements: 

651 element_size = len(element.content) 

652 

653 # If element is large enough or is a significant code structure, keep it separate 

654 if ( 

655 element_size >= min_size 

656 or element.element_type 

657 in [CodeElementType.CLASS, CodeElementType.FUNCTION] 

658 or ( 

659 element.element_type == CodeElementType.METHOD 

660 and element_size > 100 

661 ) 

662 ): 

663 # First, add any accumulated small elements 

664 if current_group: 

665 merged_element = self._create_merged_element(current_group) 

666 merged.append(merged_element) 

667 current_group = [] 

668 current_size = 0 

669 

670 # Add the large element 

671 merged.append(element) 

672 else: 

673 # Accumulate small elements 

674 current_group.append(element) 

675 current_size += element_size 

676 

677 # If accumulated size is large enough, create a merged element 

678 if current_size >= min_size: 

679 merged_element = self._create_merged_element(current_group) 

680 merged.append(merged_element) 

681 current_group = [] 

682 current_size = 0 

683 

684 # Handle remaining small elements 

685 if current_group: 

686 merged_element = self._create_merged_element(current_group) 

687 merged.append(merged_element) 

688 

689 return merged 

690 

691 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement: 

692 """Create a merged element from a list of small elements. 

693 

694 Args: 

695 elements: List of elements to merge 

696 

697 Returns: 

698 Merged code element 

699 """ 

700 if not elements: 

701 raise ValueError("Cannot merge empty list of elements") 

702 

703 if len(elements) == 1: 

704 return elements[0] 

705 

706 # Create merged element 

707 merged_content = "\n\n".join(element.content for element in elements) 

708 merged_names = [element.name for element in elements] 

709 

710 merged_element = CodeElement( 

711 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})", 

712 element_type=CodeElementType.MODULE, # Use module as generic container 

713 content=merged_content, 

714 start_line=elements[0].start_line, 

715 end_line=elements[-1].end_line, 

716 level=min(element.level for element in elements), 

717 ) 

718 

719 # Merge dependencies 

720 all_dependencies = [] 

721 for element in elements: 

722 all_dependencies.extend(element.dependencies) 

723 merged_element.dependencies = list(set(all_dependencies)) 

724 

725 return merged_element 

726 

727 def _split_text(self, content: str) -> list[dict[str, Any]]: 

728 """Split code content into chunks based on programming language structure. 

729 

730 Args: 

731 content: The code content to split 

732 

733 Returns: 

734 List of dictionaries with chunk content and metadata 

735 """ 

736 # This method is required by the base class but not used in our implementation 

737 # We override chunk_document instead 

738 return [{"content": content, "metadata": {"element_type": "unknown"}}] 

739 

740 def chunk_document(self, document: Document) -> list[Document]: 

741 """Chunk a code document using AST parsing. 

742 

743 Args: 

744 document: Document to chunk 

745 

746 Returns: 

747 List of chunked documents 

748 """ 

749 file_name = ( 

750 document.metadata.get("file_name") 

751 or document.metadata.get("original_filename") 

752 or document.title 

753 or f"{document.source_type}:{document.source}" 

754 ) 

755 

756 # Start progress tracking 

757 self.progress_tracker.start_chunking( 

758 document.id, 

759 document.source, 

760 document.source_type, 

761 len(document.content), 

762 file_name, 

763 ) 

764 

765 try: 

766 # Detect language from file path first for language-specific optimizations 

767 file_path = document.metadata.get("file_name", "") or document.source 

768 language = self._detect_language(file_path, document.content) 

769 

770 # Performance check: universal threshold for all code files 

771 if len(document.content) > CHUNK_SIZE_THRESHOLD: 

772 self.progress_tracker.log_fallback( 

773 document.id, 

774 f"Large {language} file ({len(document.content)} bytes)", 

775 ) 

776 return self._fallback_chunking(document) 

777 

778 self.logger.debug(f"Detected language: {language}") 

779 

780 # Parse code structure using AST 

781 elements = [] 

782 parsing_method = "unknown" 

783 

784 if language == "python": 

785 # Try Python AST first for Python files 

786 self.logger.debug("Parsing Python with built-in AST") 

787 elements = self._parse_python_ast(document.content) 

788 parsing_method = "python_ast" 

789 

790 # Fallback to tree-sitter if Python AST fails 

791 if not elements and TREE_SITTER_AVAILABLE: 

792 self.logger.debug("Falling back to Tree-sitter for Python") 

793 elements = self._parse_with_tree_sitter(document.content, language) 

794 parsing_method = "tree_sitter" 

795 elif language != "unknown" and TREE_SITTER_AVAILABLE: 

796 # Use tree-sitter for other supported languages 

797 self.logger.debug(f"Parsing {language} with Tree-sitter") 

798 elements = self._parse_with_tree_sitter(document.content, language) 

799 parsing_method = "tree_sitter" 

800 

801 if not elements: 

802 self.progress_tracker.log_fallback( 

803 document.id, f"No {language} elements found" 

804 ) 

805 return self._fallback_chunking(document) 

806 

807 # Merge small elements to optimize chunk size 

808 final_elements = self._merge_small_elements(elements) 

809 if len(final_elements) > 100: # Limit total chunks 

810 final_elements = final_elements[:100] 

811 

812 # Create chunked documents 

813 chunked_docs = [] 

814 for i, element in enumerate(final_elements): 

815 self.logger.debug( 

816 f"Processing element {i+1}/{len(final_elements)}", 

817 extra={ 

818 "element_name": element.name, 

819 "element_type": element.element_type.value, 

820 "content_size": len(element.content), 

821 }, 

822 ) 

823 

824 # Create chunk document with optimized metadata processing 

825 chunk_doc = self._create_chunk_document( 

826 original_doc=document, 

827 chunk_content=element.content, 

828 chunk_index=i, 

829 total_chunks=len(final_elements), 

830 skip_nlp=False, 

831 ) 

832 

833 # Add code-specific metadata 

834 code_metadata = self._extract_code_metadata(element, language) 

835 code_metadata["parsing_method"] = parsing_method 

836 code_metadata["chunking_strategy"] = "code" 

837 code_metadata["parent_document_id"] = document.id 

838 chunk_doc.metadata.update(code_metadata) 

839 

840 chunked_docs.append(chunk_doc) 

841 

842 # Finish progress tracking 

843 self.progress_tracker.finish_chunking( 

844 document.id, len(chunked_docs), f"code ({language})" 

845 ) 

846 return chunked_docs 

847 

848 except Exception as e: 

849 self.progress_tracker.log_error(document.id, str(e)) 

850 # Fallback to default chunking 

851 self.progress_tracker.log_fallback( 

852 document.id, f"Code parsing failed: {str(e)}" 

853 ) 

854 return self._fallback_chunking(document) 

855 

856 def _fallback_chunking(self, document: Document) -> list[Document]: 

857 """Fallback to simple text-based chunking when AST parsing fails. 

858 

859 Args: 

860 document: Document to chunk 

861 

862 Returns: 

863 List of chunked documents 

864 """ 

865 self.logger.warning("Falling back to simple text chunking for code document") 

866 

867 # Use simple line-based splitting for code (optimized) 

868 lines = document.content.split("\n") 

869 chunks = [] 

870 current_chunk = [] 

871 current_size = 0 

872 

873 for line in lines: 

874 line_size = len(line) + 1 # +1 for newline 

875 

876 if current_size + line_size > self.chunk_size and current_chunk: 

877 chunks.append("\n".join(current_chunk)) 

878 current_chunk = [line] 

879 current_size = line_size 

880 else: 

881 current_chunk.append(line) 

882 current_size += line_size 

883 

884 # Add remaining lines 

885 if current_chunk: 

886 chunks.append("\n".join(current_chunk)) 

887 

888 # Create chunk documents (limited) 

889 chunked_docs = [] 

890 for i, chunk_content in enumerate(chunks[:50]): # Limit chunks 

891 chunk_doc = self._create_chunk_document( 

892 original_doc=document, 

893 chunk_content=chunk_content, 

894 chunk_index=i, 

895 total_chunks=len(chunks), 

896 ) 

897 

898 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

899 chunk_doc.metadata["parent_document_id"] = document.id 

900 chunk_doc.metadata["chunking_method"] = "fallback_text" 

901 

902 chunked_docs.append(chunk_doc) 

903 

904 return chunked_docs