Coverage for src/qdrant_loader/core/chunking/strategy/code/code_document_parser.py: 81%

207 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Code document parser for AST analysis and language detection.""" 

2 

3import ast 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import Any, Optional 

7 

8import structlog 

9 

10# Tree-sitter imports with error handling 

11try: 

12 from tree_sitter_languages import get_parser 

13 

14 TREE_SITTER_AVAILABLE = True 

15except ImportError: 

16 TREE_SITTER_AVAILABLE = False 

17 get_parser = None 

18 

19from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser 

20 

21logger = structlog.get_logger(__name__) 

22 

23# Performance constants - Universal limits for all code files 

24MAX_FILE_SIZE_FOR_AST = 75_000 # 75KB limit for AST parsing 

25MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts 

26MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth 

27MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this 

28 

29 

30class CodeElementType(Enum): 

31 """Types of code elements.""" 

32 

33 MODULE = "module" 

34 CLASS = "class" 

35 FUNCTION = "function" 

36 METHOD = "method" 

37 PROPERTY = "property" 

38 VARIABLE = "variable" 

39 IMPORT = "import" 

40 COMMENT = "comment" 

41 DOCSTRING = "docstring" 

42 DECORATOR = "decorator" 

43 CONSTANT = "constant" 

44 INTERFACE = "interface" 

45 ENUM = "enum" 

46 STRUCT = "struct" 

47 NAMESPACE = "namespace" 

48 PACKAGE = "package" 

49 

50 

51@dataclass 

52class CodeElement: 

53 """Represents a code element with its metadata.""" 

54 

55 name: str 

56 element_type: CodeElementType 

57 content: str 

58 start_line: int 

59 end_line: int 

60 level: int = 0 

61 parent: Optional["CodeElement"] = None 

62 children: list["CodeElement"] = field(default_factory=list) 

63 docstring: str = None 

64 decorators: list[str] = field(default_factory=list) 

65 parameters: list[str] = field(default_factory=list) 

66 return_type: str = None 

67 visibility: str = "public" # public, private, protected 

68 is_async: bool = False 

69 is_static: bool = False 

70 is_abstract: bool = False 

71 complexity: int = 0 # Cyclomatic complexity 

72 dependencies: list[str] = field(default_factory=list) 

73 

74 def add_child(self, child: "CodeElement"): 

75 """Add a child element.""" 

76 self.children.append(child) 

77 child.parent = self 

78 

79 

80class CodeDocumentParser(BaseDocumentParser): 

81 """Parser for code documents with AST analysis and language detection.""" 

82 

83 def __init__(self, settings): 

84 """Initialize the code document parser. 

85 

86 Args: 

87 settings: Configuration settings 

88 """ 

89 self.settings = settings 

90 self.logger = logger 

91 

92 # Language detection patterns 

93 self.language_patterns = { 

94 ".py": "python", 

95 ".pyx": "python", 

96 ".pyi": "python", 

97 ".java": "java", 

98 ".js": "javascript", 

99 ".jsx": "javascript", 

100 ".mjs": "javascript", 

101 ".ts": "typescript", 

102 ".tsx": "typescript", 

103 ".go": "go", 

104 ".rs": "rust", 

105 ".cpp": "cpp", 

106 ".cc": "cpp", 

107 ".cxx": "cpp", 

108 ".c": "c", 

109 ".h": "c", 

110 ".cs": "c_sharp", 

111 ".php": "php", 

112 ".rb": "ruby", 

113 ".kt": "kotlin", 

114 ".scala": "scala", 

115 ".swift": "swift", 

116 ".dart": "dart", 

117 } 

118 

119 # Cache for Tree-sitter parsers 

120 self._parsers = {} 

121 

122 # Check tree-sitter availability 

123 if not TREE_SITTER_AVAILABLE: 

124 self.logger.warning("Tree-sitter not available, will use fallback parsing") 

125 

126 def parse_document_structure(self, content: str) -> dict[str, Any]: 

127 """Parse code document structure and extract programming language information. 

128 

129 Args: 

130 content: Source code content 

131 

132 Returns: 

133 Dictionary containing code document structure information 

134 """ 

135 # For base analysis, we return general structure info 

136 lines = content.split("\n") 

137 non_empty_lines = [line for line in lines if line.strip()] 

138 

139 # Basic code metrics 

140 structure = { 

141 "total_lines": len(lines), 

142 "non_empty_lines": len(non_empty_lines), 

143 "blank_lines": len(lines) - len(non_empty_lines), 

144 "avg_line_length": ( 

145 sum(len(line) for line in lines) / len(lines) if lines else 0 

146 ), 

147 "max_line_length": max(len(line) for line in lines) if lines else 0, 

148 "structure_type": "code", 

149 "has_comments": any( 

150 line.strip().startswith(("#", "//", "/*", "--")) for line in lines 

151 ), 

152 "has_docstrings": '"""' in content or "'''" in content, 

153 "complexity_indicators": { 

154 "if_statements": content.count("if "), 

155 "loop_statements": content.count("for ") + content.count("while "), 

156 "function_definitions": content.count("def ") 

157 + content.count("function "), 

158 "class_definitions": content.count("class "), 

159 }, 

160 } 

161 

162 return structure 

163 

164 def extract_section_metadata(self, element: CodeElement) -> dict[str, Any]: 

165 """Extract metadata from a code element. 

166 

167 Args: 

168 element: Code element to extract metadata from 

169 

170 Returns: 

171 Dictionary containing element metadata 

172 """ 

173 metadata = { 

174 "element_type": element.element_type.value, 

175 "element_name": element.name, 

176 "start_line": element.start_line, 

177 "end_line": element.end_line, 

178 "line_count": element.end_line - element.start_line + 1, 

179 "level": element.level, 

180 "visibility": element.visibility, 

181 "is_async": element.is_async, 

182 "is_static": element.is_static, 

183 "is_abstract": element.is_abstract, 

184 "complexity": element.complexity, 

185 "has_docstring": bool(element.docstring), 

186 "decorator_count": len(element.decorators), 

187 "parameter_count": len(element.parameters), 

188 "dependency_count": len(element.dependencies), 

189 "child_count": len(element.children), 

190 } 

191 

192 # Add optional fields if present 

193 if element.docstring: 

194 metadata["docstring_length"] = len(element.docstring) 

195 if element.decorators: 

196 metadata["decorators"] = element.decorators 

197 if element.parameters: 

198 metadata["parameters"] = element.parameters 

199 if element.return_type: 

200 metadata["return_type"] = element.return_type 

201 if element.dependencies: 

202 metadata["dependencies"] = element.dependencies 

203 

204 return metadata 

205 

206 def detect_language(self, file_path: str, content: str) -> str: 

207 """Detect programming language from file extension. 

208 

209 Args: 

210 file_path: Path to the file 

211 content: File content (for future content-based detection) 

212 

213 Returns: 

214 Detected language name or "unknown" 

215 """ 

216 # Get file extension 

217 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else "" 

218 return self.language_patterns.get(ext, "unknown") 

219 

220 def parse_code_elements(self, content: str, language: str) -> list[CodeElement]: 

221 """Parse code content into structured elements using AST. 

222 

223 Args: 

224 content: Source code content 

225 language: Programming language 

226 

227 Returns: 

228 List of code elements 

229 """ 

230 # Performance check: universal size limit for all languages 

231 if len(content) > MAX_FILE_SIZE_FOR_AST: 

232 self.logger.info( 

233 f"{language.title()} file too large for AST parsing ({len(content)} bytes), skipping" 

234 ) 

235 return [] 

236 

237 elements = [] 

238 

239 # Try language-specific parsing 

240 if language == "python": 

241 # Try Python AST first for Python files 

242 self.logger.debug("Parsing Python with built-in AST") 

243 elements = self._parse_python_ast(content) 

244 

245 # Fallback to tree-sitter if Python AST fails 

246 if not elements and TREE_SITTER_AVAILABLE: 

247 self.logger.debug("Falling back to Tree-sitter for Python") 

248 elements = self._parse_with_tree_sitter(content, language) 

249 

250 elif language != "unknown" and TREE_SITTER_AVAILABLE: 

251 # Use tree-sitter for other supported languages 

252 self.logger.debug(f"Parsing {language} with Tree-sitter") 

253 elements = self._parse_with_tree_sitter(content, language) 

254 

255 return elements 

256 

257 def _get_tree_sitter_parser(self, language: str): 

258 """Get or create a Tree-sitter parser for the given language. 

259 

260 Args: 

261 language: Tree-sitter language name 

262 

263 Returns: 

264 Tree-sitter parser or None if not available 

265 """ 

266 if not TREE_SITTER_AVAILABLE or get_parser is None: 

267 return None 

268 

269 if language in self._parsers: 

270 return self._parsers[language] 

271 

272 try: 

273 parser = get_parser(language) 

274 self._parsers[language] = parser 

275 return parser 

276 except Exception as e: 

277 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}") 

278 return None 

279 

280 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]: 

281 """Parse code using Tree-sitter AST. 

282 

283 Args: 

284 content: Source code content 

285 language: Programming language 

286 

287 Returns: 

288 List of code elements 

289 """ 

290 parser = self._get_tree_sitter_parser(language) 

291 if not parser: 

292 return [] 

293 

294 try: 

295 tree = parser.parse(content.encode("utf-8")) 

296 root_node = tree.root_node 

297 

298 elements = [] 

299 self._extract_tree_sitter_elements( 

300 root_node, content.encode("utf-8"), elements, language, level=0 

301 ) 

302 

303 # Limit elements to prevent timeouts 

304 if len(elements) > MAX_ELEMENTS_TO_PROCESS: 

305 self.logger.warning( 

306 f"Too many elements ({len(elements)}), limiting to {MAX_ELEMENTS_TO_PROCESS}" 

307 ) 

308 elements = elements[:MAX_ELEMENTS_TO_PROCESS] 

309 

310 return elements 

311 

312 except Exception as e: 

313 self.logger.warning(f"Tree-sitter parsing failed for {language}: {e}") 

314 return [] 

315 

316 def _extract_tree_sitter_elements( 

317 self, 

318 node, 

319 content_bytes: bytes, 

320 elements: list[CodeElement], 

321 language: str, 

322 level: int = 0, 

323 ): 

324 """Extract elements from Tree-sitter AST node. 

325 

326 Args: 

327 node: Tree-sitter node 

328 content_bytes: Source code as bytes 

329 elements: List to append elements to 

330 language: Programming language 

331 level: Current nesting level 

332 """ 

333 if level > MAX_RECURSION_DEPTH: 

334 return 

335 

336 # Define element type mapping based on node type 

337 element_type_map = { 

338 "function_definition": CodeElementType.FUNCTION, 

339 "method_definition": CodeElementType.METHOD, 

340 "class_definition": CodeElementType.CLASS, 

341 "interface_declaration": CodeElementType.INTERFACE, 

342 "enum_declaration": CodeElementType.ENUM, 

343 "struct_declaration": CodeElementType.STRUCT, 

344 "variable_declaration": CodeElementType.VARIABLE, 

345 "import_statement": CodeElementType.IMPORT, 

346 "comment": CodeElementType.COMMENT, 

347 } 

348 

349 # Map node type to code element type 

350 element_type = element_type_map.get(node.type) 

351 

352 if element_type: 

353 # Extract element content 

354 element_content = content_bytes[node.start_byte : node.end_byte].decode( 

355 "utf-8" 

356 ) 

357 

358 # Skip overly large elements 

359 if len(element_content) > MAX_ELEMENT_SIZE: 

360 self.logger.debug( 

361 f"Skipping large {node.type} element ({len(element_content)} chars)" 

362 ) 

363 return 

364 

365 # Extract element name 

366 element_name = self._extract_element_name(node, content_bytes, language) 

367 

368 # Create code element 

369 element = CodeElement( 

370 name=element_name, 

371 element_type=element_type, 

372 content=element_content, 

373 start_line=node.start_point[0] + 1, 

374 end_line=node.end_point[0] + 1, 

375 level=level, 

376 ) 

377 

378 # Extract additional metadata 

379 self._enrich_element_metadata(element, node, content_bytes, language) 

380 

381 elements.append(element) 

382 

383 # Recursively process child nodes 

384 for child in node.children: 

385 self._extract_tree_sitter_elements( 

386 child, content_bytes, elements, language, level + 1 

387 ) 

388 

389 def _extract_element_name(self, node, content_bytes: bytes, language: str) -> str: 

390 """Extract element name from Tree-sitter node. 

391 

392 Args: 

393 node: Tree-sitter node 

394 content_bytes: Source code as bytes 

395 language: Programming language 

396 

397 Returns: 

398 Element name or default name 

399 """ 

400 # Try to find identifier child node 

401 for child in node.children: 

402 if child.type == "identifier": 

403 return content_bytes[child.start_byte : child.end_byte].decode("utf-8") 

404 

405 # Fallback to node type 

406 return f"unnamed_{node.type}" 

407 

408 def _enrich_element_metadata( 

409 self, element: CodeElement, node, content_bytes: bytes, language: str 

410 ): 

411 """Enrich code element with additional metadata from AST node. 

412 

413 Args: 

414 element: Code element to enrich 

415 node: Tree-sitter node 

416 content_bytes: Source code as bytes 

417 language: Programming language 

418 """ 

419 # Extract decorators, parameters, etc. based on language 

420 if language == "python": 

421 self._enrich_python_metadata(element, node, content_bytes) 

422 elif language in ["javascript", "typescript"]: 

423 self._enrich_javascript_metadata(element, node, content_bytes) 

424 # Add more language-specific enrichment as needed 

425 

426 def _enrich_python_metadata(self, element: CodeElement, node, content_bytes: bytes): 

427 """Enrich element with Python-specific metadata.""" 

428 # Look for decorators 

429 for child in node.children: 

430 if child.type == "decorator": 

431 decorator_name = content_bytes[ 

432 child.start_byte : child.end_byte 

433 ].decode("utf-8") 

434 element.decorators.append(decorator_name.strip()) 

435 

436 # Check for async 

437 element.is_async = any(child.type == "async" for child in node.children) 

438 

439 def _enrich_javascript_metadata( 

440 self, element: CodeElement, node, content_bytes: bytes 

441 ): 

442 """Enrich element with JavaScript/TypeScript-specific metadata.""" 

443 # Check for async 

444 element.is_async = any(child.type == "async" for child in node.children) 

445 

446 # Check for static 

447 element.is_static = any(child.type == "static" for child in node.children) 

448 

449 def _parse_python_ast(self, content: str) -> list[CodeElement]: 

450 """Parse Python code using built-in AST module. 

451 

452 Args: 

453 content: Python source code 

454 

455 Returns: 

456 List of code elements 

457 """ 

458 try: 

459 tree = ast.parse(content) 

460 elements = [] 

461 self._extract_ast_elements(tree, content, elements, level=0) 

462 return elements 

463 

464 except SyntaxError as e: 

465 self.logger.warning(f"Python AST parsing failed: {e}") 

466 return [] 

467 except Exception as e: 

468 self.logger.warning(f"Unexpected error in Python AST parsing: {e}") 

469 return [] 

470 

471 def _extract_ast_elements( 

472 self, node: ast.AST, content: str, elements: list[CodeElement], level: int = 0 

473 ): 

474 """Extract elements from Python AST node. 

475 

476 Args: 

477 node: AST node 

478 content: Source code content 

479 elements: List to append elements to 

480 level: Current nesting level 

481 """ 

482 if level > MAX_RECURSION_DEPTH: 

483 return 

484 

485 lines = content.split("\n") 

486 

487 if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

488 element_type = ( 

489 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION 

490 ) 

491 

492 # Extract function content 

493 start_line = node.lineno 

494 end_line = node.end_lineno or start_line 

495 element_content = "\n".join(lines[start_line - 1 : end_line]) 

496 

497 element = CodeElement( 

498 name=node.name, 

499 element_type=element_type, 

500 content=element_content, 

501 start_line=start_line, 

502 end_line=end_line, 

503 level=level, 

504 is_async=isinstance(node, ast.AsyncFunctionDef), 

505 parameters=[arg.arg for arg in node.args.args], 

506 decorators=[self._get_decorator_name(d) for d in node.decorator_list], 

507 ) 

508 

509 # Extract docstring 

510 if ( 

511 node.body 

512 and isinstance(node.body[0], ast.Expr) 

513 and isinstance(node.body[0].value, ast.Constant) 

514 and isinstance(node.body[0].value.value, str) 

515 ): 

516 element.docstring = node.body[0].value.value 

517 

518 elements.append(element) 

519 

520 elif isinstance(node, ast.ClassDef): 

521 start_line = node.lineno 

522 end_line = node.end_lineno or start_line 

523 element_content = "\n".join(lines[start_line - 1 : end_line]) 

524 

525 element = CodeElement( 

526 name=node.name, 

527 element_type=CodeElementType.CLASS, 

528 content=element_content, 

529 start_line=start_line, 

530 end_line=end_line, 

531 level=level, 

532 decorators=[self._get_decorator_name(d) for d in node.decorator_list], 

533 ) 

534 

535 # Extract docstring 

536 if ( 

537 node.body 

538 and isinstance(node.body[0], ast.Expr) 

539 and isinstance(node.body[0].value, ast.Constant) 

540 and isinstance(node.body[0].value.value, str) 

541 ): 

542 element.docstring = node.body[0].value.value 

543 

544 elements.append(element) 

545 

546 # Recursively process child nodes 

547 for child in ast.iter_child_nodes(node): 

548 self._extract_ast_elements(child, content, elements, level + 1) 

549 

550 def _get_decorator_name(self, decorator: ast.AST) -> str: 

551 """Extract decorator name from AST node. 

552 

553 Args: 

554 decorator: AST decorator node 

555 

556 Returns: 

557 Decorator name as string 

558 """ 

559 if isinstance(decorator, ast.Name): 

560 return decorator.id 

561 elif isinstance(decorator, ast.Attribute): 

562 return f"{self._get_decorator_name(decorator.value)}.{decorator.attr}" 

563 elif isinstance(decorator, ast.Call): 

564 return f"{self._get_decorator_name(decorator.func)}()" 

565 else: 

566 return "unknown_decorator"