Coverage for src/qdrant_loader/core/chunking/strategy/code_strategy.py: 81%

343 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Code-specific chunking strategy for programming languages.""" 

2 

3import ast 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import Any, Optional 

7 

8import structlog 

9 

10# Tree-sitter imports with error handling 

11try: 

12 from tree_sitter_languages import get_language, get_parser 

13 

14 TREE_SITTER_AVAILABLE = True 

15except ImportError: 

16 TREE_SITTER_AVAILABLE = False 

17 get_language = None 

18 get_parser = None 

19 

20from qdrant_loader.config import Settings 

21from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker 

22from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy 

23from qdrant_loader.core.document import Document 

24 

25logger = structlog.get_logger(__name__) 

26 

27# Performance constants - Universal limits for all code files 

28MAX_FILE_SIZE_FOR_AST = ( 

29 75_000 # 75KB limit for AST parsing (balanced for all languages) 

30) 

31MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts 

32CHUNK_SIZE_THRESHOLD = 40_000 # Files larger than this use simple chunking 

33MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth 

34MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this 

35 

36 

37class CodeElementType(Enum): 

38 """Types of code elements.""" 

39 

40 MODULE = "module" 

41 CLASS = "class" 

42 FUNCTION = "function" 

43 METHOD = "method" 

44 PROPERTY = "property" 

45 VARIABLE = "variable" 

46 IMPORT = "import" 

47 COMMENT = "comment" 

48 DOCSTRING = "docstring" 

49 DECORATOR = "decorator" 

50 CONSTANT = "constant" 

51 INTERFACE = "interface" 

52 ENUM = "enum" 

53 STRUCT = "struct" 

54 NAMESPACE = "namespace" 

55 PACKAGE = "package" 

56 

57 

58@dataclass 

59class CodeElement: 

60 """Represents a code element with its metadata.""" 

61 

62 name: str 

63 element_type: CodeElementType 

64 content: str 

65 start_line: int 

66 end_line: int 

67 level: int = 0 

68 parent: Optional["CodeElement"] = None 

69 children: list["CodeElement"] = field(default_factory=list) 

70 docstring: str | None = None 

71 decorators: list[str] = field(default_factory=list) 

72 parameters: list[str] = field(default_factory=list) 

73 return_type: str | None = None 

74 visibility: str = "public" # public, private, protected 

75 is_async: bool = False 

76 is_static: bool = False 

77 is_abstract: bool = False 

78 complexity: int = 0 # Cyclomatic complexity 

79 dependencies: list[str] = field(default_factory=list) 

80 

81 def add_child(self, child: "CodeElement"): 

82 """Add a child element.""" 

83 self.children.append(child) 

84 child.parent = self 

85 

86 

87class CodeChunkingStrategy(BaseChunkingStrategy): 

88 """Strategy for chunking code files based on programming language structure. 

89 

90 This strategy uses AST parsing (primarily tree-sitter) to split code files into 

91 chunks based on semantic code elements, preserving the code structure and hierarchy. 

92 """ 

93 

94 def __init__(self, settings: Settings): 

95 """Initialize the code chunking strategy. 

96 

97 Args: 

98 settings: Configuration settings 

99 """ 

100 super().__init__(settings) 

101 self.logger = logger 

102 self.progress_tracker = ChunkingProgressTracker(logger) 

103 

104 # Language detection patterns 

105 self.language_patterns = { 

106 ".py": "python", 

107 ".pyx": "python", 

108 ".pyi": "python", 

109 ".java": "java", 

110 ".js": "javascript", 

111 ".jsx": "javascript", 

112 ".mjs": "javascript", 

113 ".ts": "typescript", 

114 ".tsx": "typescript", 

115 ".go": "go", 

116 ".rs": "rust", 

117 ".cpp": "cpp", 

118 ".cc": "cpp", 

119 ".cxx": "cpp", 

120 ".c": "c", 

121 ".h": "c", 

122 ".cs": "c_sharp", 

123 ".php": "php", 

124 ".rb": "ruby", 

125 ".kt": "kotlin", 

126 ".scala": "scala", 

127 ".swift": "swift", 

128 ".dart": "dart", 

129 } 

130 

131 # Cache for Tree-sitter parsers 

132 self._parsers = {} 

133 

134 # Check tree-sitter availability 

135 if not TREE_SITTER_AVAILABLE: 

136 self.logger.warning("Tree-sitter not available, will use fallback parsing") 

137 

138 def _detect_language(self, file_path: str, content: str) -> str: 

139 """Detect programming language from file extension. 

140 

141 Args: 

142 file_path: Path to the file 

143 content: File content (for future content-based detection) 

144 

145 Returns: 

146 Detected language name or "unknown" 

147 """ 

148 # Get file extension 

149 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else "" 

150 

151 return self.language_patterns.get(ext, "unknown") 

152 

153 def _get_tree_sitter_parser(self, language: str): 

154 """Get or create a Tree-sitter parser for the given language. 

155 

156 Args: 

157 language: Tree-sitter language name 

158 

159 Returns: 

160 Tree-sitter parser or None if not available 

161 """ 

162 if not TREE_SITTER_AVAILABLE or get_parser is None: 

163 return None 

164 

165 if language in self._parsers: 

166 return self._parsers[language] 

167 

168 try: 

169 parser = get_parser(language) 

170 self._parsers[language] = parser 

171 return parser 

172 except Exception as e: 

173 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}") 

174 return None 

175 

176 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]: 

177 """Parse code using Tree-sitter AST. 

178 

179 Args: 

180 content: Source code content 

181 language: Programming language 

182 

183 Returns: 

184 List of code elements 

185 """ 

186 # Performance check: universal size limit for all languages 

187 if len(content) > MAX_FILE_SIZE_FOR_AST: 

188 self.logger.info( 

189 f"{language.title()} file too large for AST parsing ({len(content)} bytes), using fallback" 

190 ) 

191 return [] 

192 

193 parser = self._get_tree_sitter_parser(language) 

194 if not parser: 

195 return [] 

196 

197 try: 

198 tree = parser.parse(content.encode("utf-8")) 

199 root_node = tree.root_node 

200 

201 elements = [] 

202 self._extract_ast_elements(root_node, content, elements, language) 

203 

204 # Limit number of elements to prevent timeouts (universal limit) 

205 if len(elements) > MAX_ELEMENTS_TO_PROCESS: 

206 self.logger.warning( 

207 f"Too many {language} elements ({len(elements)}), truncating to {MAX_ELEMENTS_TO_PROCESS}" 

208 ) 

209 elements = elements[:MAX_ELEMENTS_TO_PROCESS] 

210 

211 return elements 

212 

213 except Exception as e: 

214 self.logger.warning(f"Failed to parse with Tree-sitter for {language}: {e}") 

215 return [] 

216 

217 def _extract_ast_elements( 

218 self, 

219 node, 

220 content: str, 

221 elements: list[CodeElement], 

222 language: str, 

223 level: int = 0, 

224 ): 

225 """Extract code elements from Tree-sitter AST node. 

226 

227 Args: 

228 node: Tree-sitter AST node 

229 content: Source code content 

230 elements: List to append elements to 

231 language: Programming language 

232 level: Nesting level 

233 """ 

234 # Performance check: limit recursion depth 

235 if level > MAX_RECURSION_DEPTH: # Prevent deep recursion 

236 return 

237 

238 # Performance check: limit total elements (universal limit) 

239 if len(elements) >= MAX_ELEMENTS_TO_PROCESS: 

240 return 

241 

242 # Define node types that represent code elements for different languages 

243 element_mappings = { 

244 "python": { 

245 "function_definition": CodeElementType.FUNCTION, 

246 "async_function_definition": CodeElementType.FUNCTION, 

247 "class_definition": CodeElementType.CLASS, 

248 "import_statement": CodeElementType.IMPORT, 

249 "import_from_statement": CodeElementType.IMPORT, 

250 }, 

251 "java": { 

252 "method_declaration": CodeElementType.METHOD, 

253 "constructor_declaration": CodeElementType.METHOD, 

254 "class_declaration": CodeElementType.CLASS, 

255 "interface_declaration": CodeElementType.INTERFACE, 

256 "import_declaration": CodeElementType.IMPORT, 

257 }, 

258 "javascript": { 

259 "function_declaration": CodeElementType.FUNCTION, 

260 "method_definition": CodeElementType.METHOD, 

261 "class_declaration": CodeElementType.CLASS, 

262 "import_statement": CodeElementType.IMPORT, 

263 "variable_declaration": CodeElementType.VARIABLE, 

264 }, 

265 "typescript": { 

266 "function_declaration": CodeElementType.FUNCTION, 

267 "method_definition": CodeElementType.METHOD, 

268 "class_declaration": CodeElementType.CLASS, 

269 "interface_declaration": CodeElementType.INTERFACE, 

270 "import_statement": CodeElementType.IMPORT, 

271 }, 

272 "go": { 

273 "function_declaration": CodeElementType.FUNCTION, 

274 "method_declaration": CodeElementType.METHOD, 

275 "type_declaration": CodeElementType.STRUCT, 

276 "import_declaration": CodeElementType.IMPORT, 

277 }, 

278 "rust": { 

279 "function_item": CodeElementType.FUNCTION, 

280 "impl_item": CodeElementType.CLASS, 

281 "struct_item": CodeElementType.STRUCT, 

282 "enum_item": CodeElementType.ENUM, 

283 "trait_item": CodeElementType.INTERFACE, 

284 "use_declaration": CodeElementType.IMPORT, 

285 }, 

286 "cpp": { 

287 "function_definition": CodeElementType.FUNCTION, 

288 "class_specifier": CodeElementType.CLASS, 

289 "struct_specifier": CodeElementType.STRUCT, 

290 "namespace_definition": CodeElementType.NAMESPACE, 

291 "preproc_include": CodeElementType.IMPORT, 

292 }, 

293 "c": { 

294 "function_definition": CodeElementType.FUNCTION, 

295 "struct_specifier": CodeElementType.STRUCT, 

296 "preproc_include": CodeElementType.IMPORT, 

297 }, 

298 } 

299 

300 # Get element types for this language 

301 lang_elements = element_mappings.get(language, {}) 

302 

303 # Check if this node represents a code element 

304 if node.type in lang_elements: 

305 element_type = lang_elements[node.type] 

306 

307 # Extract element name 

308 name = self._extract_element_name(node, language) 

309 

310 # Get node text 

311 start_byte = node.start_byte 

312 end_byte = node.end_byte 

313 element_content = content[start_byte:end_byte] 

314 

315 # Skip very large elements to prevent timeouts (universal limit) 

316 if len(element_content) > MAX_ELEMENT_SIZE: 

317 self.logger.debug( 

318 f"Skipping large {language} element {name} ({len(element_content)} bytes)" 

319 ) 

320 return 

321 

322 # Create code element 

323 element = CodeElement( 

324 name=name, 

325 element_type=element_type, 

326 content=element_content, 

327 start_line=node.start_point[0] + 1, 

328 end_line=node.end_point[0] + 1, 

329 level=level, 

330 ) 

331 

332 # Extract additional metadata (simplified for performance) 

333 if element.element_type in [ 

334 CodeElementType.FUNCTION, 

335 CodeElementType.METHOD, 

336 ]: 

337 params_node = node.child_by_field_name("parameters") 

338 if params_node: 

339 element.parameters = self._extract_parameters_from_node(params_node) 

340 

341 elements.append(element) 

342 

343 # Process children with increased level (limited depth) 

344 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting 

345 for child in node.children: 

346 self._extract_ast_elements( 

347 child, content, elements, language, level + 1 

348 ) 

349 else: 

350 # Process children at same level (limited depth) 

351 if level < MAX_RECURSION_DEPTH: # Use full depth limit 

352 for child in node.children: 

353 self._extract_ast_elements( 

354 child, content, elements, language, level 

355 ) 

356 

357 def _extract_element_name(self, node, language: str) -> str: 

358 """Extract the name of a code element from Tree-sitter node. 

359 

360 Args: 

361 node: Tree-sitter AST node 

362 language: Programming language 

363 

364 Returns: 

365 Element name or "unknown" 

366 """ 

367 # Common patterns for finding names in different node types 

368 name_fields = ["name", "identifier", "field_identifier"] 

369 

370 for field_name in name_fields: 

371 name_node = node.child_by_field_name(field_name) 

372 if name_node: 

373 return name_node.text.decode("utf-8") 

374 

375 # Fallback: look for identifier children (limited search) 

376 for i, child in enumerate(node.children): 

377 if i > 5: # Limit search to first few children 

378 break 

379 if child.type == "identifier": 

380 return child.text.decode("utf-8") 

381 

382 return "unknown" 

383 

384 def _extract_parameters_from_node(self, params_node) -> list[str]: 

385 """Extract parameter names from a parameters node. 

386 

387 Args: 

388 params_node: Tree-sitter parameters node 

389 

390 Returns: 

391 List of parameter names 

392 """ 

393 parameters = [] 

394 for i, child in enumerate(params_node.children): 

395 if i > 20: # Limit to prevent timeouts 

396 break 

397 if child.type in ["identifier", "parameter", "typed_parameter"]: 

398 if child.type == "identifier": 

399 parameters.append(child.text.decode("utf-8")) 

400 else: 

401 # Look for identifier within parameter 

402 for subchild in child.children: 

403 if subchild.type == "identifier": 

404 parameters.append(subchild.text.decode("utf-8")) 

405 break 

406 return parameters 

407 

408 def _parse_python_ast(self, content: str) -> list[CodeElement]: 

409 """Parse Python code using Python's built-in AST as fallback. 

410 

411 Args: 

412 content: Python source code 

413 

414 Returns: 

415 List of code elements 

416 """ 

417 # Performance check: skip AST parsing for very large files 

418 if len(content) > MAX_FILE_SIZE_FOR_AST: 

419 self.logger.info( 

420 f"Python file too large for AST parsing ({len(content)} bytes)" 

421 ) 

422 return [] 

423 

424 elements = [] 

425 

426 try: 

427 tree = ast.parse(content) 

428 except SyntaxError as e: 

429 self.logger.warning(f"Failed to parse Python AST: {e}") 

430 return [] 

431 

432 def extract_docstring(node) -> str | None: 

433 """Extract docstring from a node.""" 

434 if ( 

435 isinstance(node, ast.FunctionDef | ast.ClassDef | ast.AsyncFunctionDef) 

436 and node.body 

437 and isinstance(node.body[0], ast.Expr) 

438 and isinstance(node.body[0].value, ast.Constant) 

439 and isinstance(node.body[0].value.value, str) 

440 ): 

441 return node.body[0].value.value 

442 return None 

443 

444 def get_decorators(node) -> list[str]: 

445 """Extract decorator names from a node.""" 

446 decorators = [] 

447 if hasattr(node, "decorator_list"): 

448 for decorator in node.decorator_list[:5]: # Limit decorators 

449 if isinstance(decorator, ast.Name): 

450 decorators.append(decorator.id) 

451 elif isinstance(decorator, ast.Attribute): 

452 decorators.append(f"{decorator.attr}") 

453 return decorators 

454 

455 def get_parameters(node) -> list[str]: 

456 """Extract parameter names from a function node.""" 

457 if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

458 return [] 

459 

460 params = [] 

461 for arg in node.args.args[:20]: # Limit parameters 

462 params.append(arg.arg) 

463 return params 

464 

465 def visit_node(node, level=0, parent_element=None): 

466 """Recursively visit AST nodes.""" 

467 # Performance checks 

468 if level > MAX_RECURSION_DEPTH: # Limit recursion depth 

469 return 

470 if len(elements) >= MAX_ELEMENTS_TO_PROCESS: 

471 return 

472 

473 element = None 

474 

475 if isinstance(node, ast.ClassDef): 

476 element = CodeElement( 

477 name=node.name, 

478 element_type=CodeElementType.CLASS, 

479 content=ast.get_source_segment(content, node) or "", 

480 start_line=node.lineno, 

481 end_line=node.end_lineno or node.lineno, 

482 level=level, 

483 docstring=extract_docstring(node), 

484 decorators=get_decorators(node), 

485 ) 

486 

487 elif isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

488 element_type = ( 

489 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION 

490 ) 

491 element = CodeElement( 

492 name=node.name, 

493 element_type=element_type, 

494 content=ast.get_source_segment(content, node) or "", 

495 start_line=node.lineno, 

496 end_line=node.end_lineno or node.lineno, 

497 level=level, 

498 docstring=extract_docstring(node), 

499 decorators=get_decorators(node), 

500 parameters=get_parameters(node), 

501 is_async=isinstance(node, ast.AsyncFunctionDef), 

502 ) 

503 

504 elif isinstance(node, ast.Import | ast.ImportFrom): 

505 import_names = [] 

506 if isinstance(node, ast.Import): 

507 import_names = [ 

508 alias.name for alias in node.names[:10] 

509 ] # Limit imports 

510 else: 

511 module = node.module or "" 

512 import_names = [ 

513 f"{module}.{alias.name}" for alias in node.names[:10] 

514 ] 

515 

516 element = CodeElement( 

517 name=", ".join(import_names), 

518 element_type=CodeElementType.IMPORT, 

519 content=ast.get_source_segment(content, node) or "", 

520 start_line=node.lineno, 

521 end_line=node.end_lineno or node.lineno, 

522 level=level, 

523 dependencies=import_names, 

524 ) 

525 

526 if element: 

527 # Skip very large elements 

528 if len(element.content) > MAX_ELEMENT_SIZE: 

529 return 

530 

531 if parent_element: 

532 parent_element.add_child(element) 

533 else: 

534 elements.append(element) 

535 

536 # Recursively process children (limited depth) 

537 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting 

538 for child in ast.iter_child_nodes(node): 

539 visit_node(child, level + 1, element) 

540 else: 

541 # For nodes we don't handle, still process their children (limited depth) 

542 if level < MAX_RECURSION_DEPTH: # Use full depth limit 

543 for child in ast.iter_child_nodes(node): 

544 visit_node(child, level, parent_element) 

545 

546 # Start processing from the root 

547 for node in ast.iter_child_nodes(tree): 

548 visit_node(node) 

549 

550 return elements 

551 

552 def _extract_code_metadata( 

553 self, element: CodeElement, language: str 

554 ) -> dict[str, Any]: 

555 """Extract metadata from a code element. 

556 

557 Args: 

558 element: The code element to analyze 

559 language: Programming language 

560 

561 Returns: 

562 Dictionary containing element metadata 

563 """ 

564 metadata = { 

565 "element_type": element.element_type.value, 

566 "name": element.name, 

567 "language": language, 

568 "start_line": element.start_line, 

569 "end_line": element.end_line, 

570 "line_count": element.end_line - element.start_line + 1, 

571 "level": element.level, 

572 "visibility": element.visibility, 

573 "is_async": element.is_async, 

574 "is_static": element.is_static, 

575 "is_abstract": element.is_abstract, 

576 "complexity": element.complexity, 

577 "has_docstring": element.docstring is not None, 

578 "docstring_length": len(element.docstring) if element.docstring else 0, 

579 "parameter_count": len(element.parameters), 

580 "decorator_count": len(element.decorators), 

581 "child_count": len(element.children), 

582 "dependency_count": len(element.dependencies), 

583 } 

584 

585 # Add specific metadata based on element type 

586 if element.element_type in [CodeElementType.FUNCTION, CodeElementType.METHOD]: 

587 metadata.update( 

588 { 

589 "parameters": element.parameters, 

590 "return_type": element.return_type, 

591 "decorators": element.decorators, 

592 } 

593 ) 

594 

595 if element.element_type == CodeElementType.CLASS: 

596 metadata.update( 

597 { 

598 "method_count": len( 

599 [ 

600 c 

601 for c in element.children 

602 if c.element_type == CodeElementType.METHOD 

603 ] 

604 ), 

605 "property_count": len( 

606 [ 

607 c 

608 for c in element.children 

609 if c.element_type == CodeElementType.PROPERTY 

610 ] 

611 ), 

612 } 

613 ) 

614 

615 if element.element_type == CodeElementType.IMPORT: 

616 metadata.update({"dependencies": element.dependencies}) 

617 

618 # Add parent context 

619 if element.parent: 

620 metadata.update( 

621 { 

622 "parent_name": element.parent.name, 

623 "parent_type": element.parent.element_type.value, 

624 "parent_level": element.parent.level, 

625 } 

626 ) 

627 

628 return metadata 

629 

630 def _merge_small_elements( 

631 self, elements: list[CodeElement], min_size: int = 200 

632 ) -> list[CodeElement]: 

633 """Merge small code elements to create more meaningful chunks. 

634 

635 Args: 

636 elements: List of code elements 

637 min_size: Minimum size for standalone elements 

638 

639 Returns: 

640 List of merged elements 

641 """ 

642 if not elements: 

643 return [] 

644 

645 merged = [] 

646 current_group = [] 

647 current_size = 0 

648 

649 for element in elements: 

650 element_size = len(element.content) 

651 

652 # If element is large enough or is a significant code structure, keep it separate 

653 if ( 

654 element_size >= min_size 

655 or element.element_type 

656 in [CodeElementType.CLASS, CodeElementType.FUNCTION] 

657 or ( 

658 element.element_type == CodeElementType.METHOD 

659 and element_size > 100 

660 ) 

661 ): 

662 # First, add any accumulated small elements 

663 if current_group: 

664 merged_element = self._create_merged_element(current_group) 

665 merged.append(merged_element) 

666 current_group = [] 

667 current_size = 0 

668 

669 # Add the large element 

670 merged.append(element) 

671 else: 

672 # Accumulate small elements 

673 current_group.append(element) 

674 current_size += element_size 

675 

676 # If accumulated size is large enough, create a merged element 

677 if current_size >= min_size: 

678 merged_element = self._create_merged_element(current_group) 

679 merged.append(merged_element) 

680 current_group = [] 

681 current_size = 0 

682 

683 # Handle remaining small elements 

684 if current_group: 

685 merged_element = self._create_merged_element(current_group) 

686 merged.append(merged_element) 

687 

688 return merged 

689 

690 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement: 

691 """Create a merged element from a list of small elements. 

692 

693 Args: 

694 elements: List of elements to merge 

695 

696 Returns: 

697 Merged code element 

698 """ 

699 if not elements: 

700 raise ValueError("Cannot merge empty list of elements") 

701 

702 if len(elements) == 1: 

703 return elements[0] 

704 

705 # Create merged element 

706 merged_content = "\n\n".join(element.content for element in elements) 

707 merged_names = [element.name for element in elements] 

708 

709 merged_element = CodeElement( 

710 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})", 

711 element_type=CodeElementType.MODULE, # Use module as generic container 

712 content=merged_content, 

713 start_line=elements[0].start_line, 

714 end_line=elements[-1].end_line, 

715 level=min(element.level for element in elements), 

716 ) 

717 

718 # Merge dependencies 

719 all_dependencies = [] 

720 for element in elements: 

721 all_dependencies.extend(element.dependencies) 

722 merged_element.dependencies = list(set(all_dependencies)) 

723 

724 return merged_element 

725 

726 def _split_text(self, content: str) -> list[dict[str, Any]]: 

727 """Split code content into chunks based on programming language structure. 

728 

729 Args: 

730 content: The code content to split 

731 

732 Returns: 

733 List of dictionaries with chunk content and metadata 

734 """ 

735 # This method is required by the base class but not used in our implementation 

736 # We override chunk_document instead 

737 return [{"content": content, "metadata": {"element_type": "unknown"}}] 

738 

739 def chunk_document(self, document: Document) -> list[Document]: 

740 """Chunk a code document using AST parsing. 

741 

742 Args: 

743 document: Document to chunk 

744 

745 Returns: 

746 List of chunked documents 

747 """ 

748 file_name = ( 

749 document.metadata.get("file_name") 

750 or document.metadata.get("original_filename") 

751 or document.title 

752 or f"{document.source_type}:{document.source}" 

753 ) 

754 

755 # Start progress tracking 

756 self.progress_tracker.start_chunking( 

757 document.id, 

758 document.source, 

759 document.source_type, 

760 len(document.content), 

761 file_name, 

762 ) 

763 

764 try: 

765 # Detect language from file path first for language-specific optimizations 

766 file_path = document.metadata.get("file_name", "") or document.source 

767 language = self._detect_language(file_path, document.content) 

768 

769 # Performance check: universal threshold for all code files 

770 if len(document.content) > CHUNK_SIZE_THRESHOLD: 

771 self.progress_tracker.log_fallback( 

772 document.id, 

773 f"Large {language} file ({len(document.content)} bytes)", 

774 ) 

775 return self._fallback_chunking(document) 

776 

777 self.logger.debug(f"Detected language: {language}") 

778 

779 # Parse code structure using AST 

780 elements = [] 

781 parsing_method = "unknown" 

782 

783 if language == "python": 

784 # Try Python AST first for Python files 

785 self.logger.debug("Parsing Python with built-in AST") 

786 elements = self._parse_python_ast(document.content) 

787 parsing_method = "python_ast" 

788 

789 # Fallback to tree-sitter if Python AST fails 

790 if not elements and TREE_SITTER_AVAILABLE: 

791 self.logger.debug("Falling back to Tree-sitter for Python") 

792 elements = self._parse_with_tree_sitter(document.content, language) 

793 parsing_method = "tree_sitter" 

794 elif language != "unknown" and TREE_SITTER_AVAILABLE: 

795 # Use tree-sitter for other supported languages 

796 self.logger.debug(f"Parsing {language} with Tree-sitter") 

797 elements = self._parse_with_tree_sitter(document.content, language) 

798 parsing_method = "tree_sitter" 

799 

800 if not elements: 

801 self.progress_tracker.log_fallback( 

802 document.id, f"No {language} elements found" 

803 ) 

804 return self._fallback_chunking(document) 

805 

806 # Merge small elements to optimize chunk size 

807 final_elements = self._merge_small_elements(elements) 

808 if len(final_elements) > 100: # Limit total chunks 

809 final_elements = final_elements[:100] 

810 

811 # Create chunked documents 

812 chunked_docs = [] 

813 for i, element in enumerate(final_elements): 

814 self.logger.debug( 

815 f"Processing element {i+1}/{len(final_elements)}", 

816 extra={ 

817 "element_name": element.name, 

818 "element_type": element.element_type.value, 

819 "content_size": len(element.content), 

820 }, 

821 ) 

822 

823 # Create chunk document with optimized metadata processing 

824 chunk_doc = self._create_chunk_document( 

825 original_doc=document, 

826 chunk_content=element.content, 

827 chunk_index=i, 

828 total_chunks=len(final_elements), 

829 skip_nlp=False, 

830 ) 

831 

832 # Add code-specific metadata 

833 code_metadata = self._extract_code_metadata(element, language) 

834 code_metadata["parsing_method"] = parsing_method 

835 code_metadata["chunking_strategy"] = "code" 

836 code_metadata["parent_document_id"] = document.id 

837 chunk_doc.metadata.update(code_metadata) 

838 

839 chunked_docs.append(chunk_doc) 

840 

841 # Finish progress tracking 

842 self.progress_tracker.finish_chunking( 

843 document.id, len(chunked_docs), f"code ({language})" 

844 ) 

845 return chunked_docs 

846 

847 except Exception as e: 

848 self.progress_tracker.log_error(document.id, str(e)) 

849 # Fallback to default chunking 

850 self.progress_tracker.log_fallback( 

851 document.id, f"Code parsing failed: {str(e)}" 

852 ) 

853 return self._fallback_chunking(document) 

854 

855 def _fallback_chunking(self, document: Document) -> list[Document]: 

856 """Fallback to simple text-based chunking when AST parsing fails. 

857 

858 Args: 

859 document: Document to chunk 

860 

861 Returns: 

862 List of chunked documents 

863 """ 

864 self.logger.warning("Falling back to simple text chunking for code document") 

865 

866 # Use simple line-based splitting for code (optimized) 

867 lines = document.content.split("\n") 

868 chunks = [] 

869 current_chunk = [] 

870 current_size = 0 

871 

872 for line in lines: 

873 line_size = len(line) + 1 # +1 for newline 

874 

875 if current_size + line_size > self.chunk_size and current_chunk: 

876 chunks.append("\n".join(current_chunk)) 

877 current_chunk = [line] 

878 current_size = line_size 

879 else: 

880 current_chunk.append(line) 

881 current_size += line_size 

882 

883 # Add remaining lines 

884 if current_chunk: 

885 chunks.append("\n".join(current_chunk)) 

886 

887 # Create chunk documents (limited) 

888 chunked_docs = [] 

889 for i, chunk_content in enumerate(chunks[:50]): # Limit chunks 

890 chunk_doc = self._create_chunk_document( 

891 original_doc=document, 

892 chunk_content=chunk_content, 

893 chunk_index=i, 

894 total_chunks=len(chunks), 

895 ) 

896 

897 chunk_doc.id = Document.generate_chunk_id(document.id, i) 

898 chunk_doc.metadata["parent_document_id"] = document.id 

899 chunk_doc.metadata["chunking_method"] = "fallback_text" 

900 

901 chunked_docs.append(chunk_doc) 

902 

903 return chunked_docs