Coverage for src/qdrant_loader/core/chunking/strategy/code/code_document_parser.py: 76%

85 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Code document parser for AST analysis and language detection.""" 

2 

3from typing import Any 

4 

5import structlog 

6 

7# Tree-sitter imports with error handling 

8try: 

9 from tree_sitter_languages import get_parser 

10 

11 TREE_SITTER_AVAILABLE = True 

12except ImportError: 

13 TREE_SITTER_AVAILABLE = False 

14 get_parser = None 

15 

16from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser 

17from qdrant_loader.core.chunking.strategy.code.parser.common import ( 

18 CodeElement, # re-export for backward compatibility 

19) 

20from qdrant_loader.core.chunking.strategy.code.parser.python_ast import parse_python_ast 

21from qdrant_loader.core.chunking.strategy.code.parser.tree_sitter import ( 

22 extract_tree_sitter_elements, 

23) 

24 

25logger = structlog.get_logger(__name__) 

26 

27# Performance constants - Universal limits for all code files 

28MAX_FILE_SIZE_FOR_AST = 75_000 # 75KB limit for AST parsing 

29MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts 

30MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth 

31MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this 

32 

33 

34class CodeDocumentParser(BaseDocumentParser): 

35 """Parser for code documents with AST analysis and language detection.""" 

36 

37 def __init__(self, settings): 

38 self.settings = settings 

39 self.logger = logger 

40 self.language_patterns = { 

41 ".py": "python", 

42 ".pyx": "python", 

43 ".pyi": "python", 

44 ".java": "java", 

45 ".js": "javascript", 

46 ".jsx": "javascript", 

47 ".mjs": "javascript", 

48 ".ts": "typescript", 

49 ".tsx": "typescript", 

50 ".go": "go", 

51 ".rs": "rust", 

52 ".cpp": "cpp", 

53 ".cc": "cpp", 

54 ".cxx": "cpp", 

55 ".c": "c", 

56 ".h": "c", 

57 ".cs": "c_sharp", 

58 ".php": "php", 

59 ".rb": "ruby", 

60 ".kt": "kotlin", 

61 ".scala": "scala", 

62 ".swift": "swift", 

63 ".dart": "dart", 

64 } 

65 self._parsers = {} 

66 if not TREE_SITTER_AVAILABLE: 

67 self.logger.warning("Tree-sitter not available, will use fallback parsing") 

68 

69 def parse_document_structure(self, content: str) -> dict[str, Any]: 

70 lines = content.split("\n") 

71 non_empty_lines = [line for line in lines if line.strip()] 

72 structure = { 

73 "total_lines": len(lines), 

74 "non_empty_lines": len(non_empty_lines), 

75 "blank_lines": len(lines) - len(non_empty_lines), 

76 "avg_line_length": ( 

77 sum(len(line) for line in lines) / len(lines) if lines else 0 

78 ), 

79 "max_line_length": max(len(line) for line in lines) if lines else 0, 

80 "structure_type": "code", 

81 "has_comments": any( 

82 line.strip().startswith(("#", "//", "/*", "--")) for line in lines 

83 ), 

84 "has_docstrings": '"""' in content or "'''" in content, 

85 "complexity_indicators": { 

86 "if_statements": content.count("if "), 

87 "loop_statements": content.count("for ") + content.count("while "), 

88 "function_definitions": content.count("def ") 

89 + content.count("function "), 

90 "class_definitions": content.count("class "), 

91 }, 

92 } 

93 return structure 

94 

95 def extract_section_metadata(self, element: CodeElement) -> dict[str, Any]: 

96 metadata = { 

97 "element_type": element.element_type.value, 

98 "element_name": element.name, 

99 "start_line": element.start_line, 

100 "end_line": element.end_line, 

101 "line_count": element.end_line - element.start_line + 1, 

102 "level": element.level, 

103 "visibility": element.visibility, 

104 "is_async": element.is_async, 

105 "is_static": element.is_static, 

106 "is_abstract": element.is_abstract, 

107 "complexity": element.complexity, 

108 "has_docstring": bool(element.docstring), 

109 "decorator_count": len(element.decorators), 

110 "parameter_count": len(element.parameters), 

111 "dependency_count": len(element.dependencies), 

112 "child_count": len(element.children), 

113 } 

114 if element.docstring: 

115 metadata["docstring_length"] = len(element.docstring) 

116 if element.decorators: 

117 metadata["decorators"] = element.decorators 

118 if element.parameters: 

119 metadata["parameters"] = element.parameters 

120 if element.return_type: 

121 metadata["return_type"] = element.return_type 

122 if element.dependencies: 

123 metadata["dependencies"] = element.dependencies 

124 return metadata 

125 

126 def detect_language(self, file_path: str, content: str) -> str: 

127 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else "" 

128 return self.language_patterns.get(ext, "unknown") 

129 

130 def parse_code_elements(self, content: str, language: str) -> list[CodeElement]: 

131 if len(content) > MAX_FILE_SIZE_FOR_AST: 

132 self.logger.info( 

133 f"{language.title()} file too large for AST parsing ({len(content)} bytes), skipping" 

134 ) 

135 return [] 

136 

137 elements: list[CodeElement] = [] 

138 if language == "python": 

139 self.logger.debug("Parsing Python with built-in AST") 

140 elements = parse_python_ast( 

141 content, max_elements_to_process=MAX_ELEMENTS_TO_PROCESS 

142 ) 

143 if not elements and TREE_SITTER_AVAILABLE: 

144 self.logger.debug("Falling back to Tree-sitter for Python") 

145 elements = self._parse_with_tree_sitter(content, language) 

146 elif language != "unknown" and TREE_SITTER_AVAILABLE: 

147 self.logger.debug(f"Parsing {language} with Tree-sitter") 

148 elements = self._parse_with_tree_sitter(content, language) 

149 return elements 

150 

151 def _get_tree_sitter_parser(self, language: str): 

152 if not TREE_SITTER_AVAILABLE or get_parser is None: 

153 return None 

154 if language in self._parsers: 

155 return self._parsers[language] 

156 try: 

157 parser = get_parser(language) 

158 self._parsers[language] = parser 

159 return parser 

160 except Exception as e: 

161 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}") 

162 return None 

163 

164 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]: 

165 parser = self._get_tree_sitter_parser(language) 

166 if not parser: 

167 return [] 

168 try: 

169 tree = parser.parse(content.encode("utf-8")) 

170 root_node = tree.root_node 

171 elements = extract_tree_sitter_elements( 

172 root_node, 

173 content.encode("utf-8"), 

174 language=language, 

175 max_recursion_depth=MAX_RECURSION_DEPTH, 

176 max_element_size=MAX_ELEMENT_SIZE, 

177 ) 

178 return elements[:MAX_ELEMENTS_TO_PROCESS] 

179 except Exception as e: 

180 self.logger.warning( 

181 f"Tree-sitter parsing failed for {language}: {e}. Using fallback." 

182 ) 

183 return []