Coverage for src/qdrant_loader/core/chunking/strategy/code/code_document_parser.py: 76%
85 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Code document parser for AST analysis and language detection."""
3from typing import Any
5import structlog
7# Tree-sitter imports with error handling
8try:
9 from tree_sitter_languages import get_parser
11 TREE_SITTER_AVAILABLE = True
12except ImportError:
13 TREE_SITTER_AVAILABLE = False
14 get_parser = None
16from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser
17from qdrant_loader.core.chunking.strategy.code.parser.common import (
18 CodeElement, # re-export for backward compatibility
19)
20from qdrant_loader.core.chunking.strategy.code.parser.python_ast import parse_python_ast
21from qdrant_loader.core.chunking.strategy.code.parser.tree_sitter import (
22 extract_tree_sitter_elements,
23)
25logger = structlog.get_logger(__name__)
27# Performance constants - Universal limits for all code files
28MAX_FILE_SIZE_FOR_AST = 75_000 # 75KB limit for AST parsing
29MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts
30MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth
31MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this
34class CodeDocumentParser(BaseDocumentParser):
35 """Parser for code documents with AST analysis and language detection."""
37 def __init__(self, settings):
38 self.settings = settings
39 self.logger = logger
40 self.language_patterns = {
41 ".py": "python",
42 ".pyx": "python",
43 ".pyi": "python",
44 ".java": "java",
45 ".js": "javascript",
46 ".jsx": "javascript",
47 ".mjs": "javascript",
48 ".ts": "typescript",
49 ".tsx": "typescript",
50 ".go": "go",
51 ".rs": "rust",
52 ".cpp": "cpp",
53 ".cc": "cpp",
54 ".cxx": "cpp",
55 ".c": "c",
56 ".h": "c",
57 ".cs": "c_sharp",
58 ".php": "php",
59 ".rb": "ruby",
60 ".kt": "kotlin",
61 ".scala": "scala",
62 ".swift": "swift",
63 ".dart": "dart",
64 }
65 self._parsers = {}
66 if not TREE_SITTER_AVAILABLE:
67 self.logger.warning("Tree-sitter not available, will use fallback parsing")
69 def parse_document_structure(self, content: str) -> dict[str, Any]:
70 lines = content.split("\n")
71 non_empty_lines = [line for line in lines if line.strip()]
72 structure = {
73 "total_lines": len(lines),
74 "non_empty_lines": len(non_empty_lines),
75 "blank_lines": len(lines) - len(non_empty_lines),
76 "avg_line_length": (
77 sum(len(line) for line in lines) / len(lines) if lines else 0
78 ),
79 "max_line_length": max(len(line) for line in lines) if lines else 0,
80 "structure_type": "code",
81 "has_comments": any(
82 line.strip().startswith(("#", "//", "/*", "--")) for line in lines
83 ),
84 "has_docstrings": '"""' in content or "'''" in content,
85 "complexity_indicators": {
86 "if_statements": content.count("if "),
87 "loop_statements": content.count("for ") + content.count("while "),
88 "function_definitions": content.count("def ")
89 + content.count("function "),
90 "class_definitions": content.count("class "),
91 },
92 }
93 return structure
95 def extract_section_metadata(self, element: CodeElement) -> dict[str, Any]:
96 metadata = {
97 "element_type": element.element_type.value,
98 "element_name": element.name,
99 "start_line": element.start_line,
100 "end_line": element.end_line,
101 "line_count": element.end_line - element.start_line + 1,
102 "level": element.level,
103 "visibility": element.visibility,
104 "is_async": element.is_async,
105 "is_static": element.is_static,
106 "is_abstract": element.is_abstract,
107 "complexity": element.complexity,
108 "has_docstring": bool(element.docstring),
109 "decorator_count": len(element.decorators),
110 "parameter_count": len(element.parameters),
111 "dependency_count": len(element.dependencies),
112 "child_count": len(element.children),
113 }
114 if element.docstring:
115 metadata["docstring_length"] = len(element.docstring)
116 if element.decorators:
117 metadata["decorators"] = element.decorators
118 if element.parameters:
119 metadata["parameters"] = element.parameters
120 if element.return_type:
121 metadata["return_type"] = element.return_type
122 if element.dependencies:
123 metadata["dependencies"] = element.dependencies
124 return metadata
126 def detect_language(self, file_path: str, content: str) -> str:
127 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else ""
128 return self.language_patterns.get(ext, "unknown")
130 def parse_code_elements(self, content: str, language: str) -> list[CodeElement]:
131 if len(content) > MAX_FILE_SIZE_FOR_AST:
132 self.logger.info(
133 f"{language.title()} file too large for AST parsing ({len(content)} bytes), skipping"
134 )
135 return []
137 elements: list[CodeElement] = []
138 if language == "python":
139 self.logger.debug("Parsing Python with built-in AST")
140 elements = parse_python_ast(
141 content, max_elements_to_process=MAX_ELEMENTS_TO_PROCESS
142 )
143 if not elements and TREE_SITTER_AVAILABLE:
144 self.logger.debug("Falling back to Tree-sitter for Python")
145 elements = self._parse_with_tree_sitter(content, language)
146 elif language != "unknown" and TREE_SITTER_AVAILABLE:
147 self.logger.debug(f"Parsing {language} with Tree-sitter")
148 elements = self._parse_with_tree_sitter(content, language)
149 return elements
151 def _get_tree_sitter_parser(self, language: str):
152 if not TREE_SITTER_AVAILABLE or get_parser is None:
153 return None
154 if language in self._parsers:
155 return self._parsers[language]
156 try:
157 parser = get_parser(language)
158 self._parsers[language] = parser
159 return parser
160 except Exception as e:
161 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}")
162 return None
164 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]:
165 parser = self._get_tree_sitter_parser(language)
166 if not parser:
167 return []
168 try:
169 tree = parser.parse(content.encode("utf-8"))
170 root_node = tree.root_node
171 elements = extract_tree_sitter_elements(
172 root_node,
173 content.encode("utf-8"),
174 language=language,
175 max_recursion_depth=MAX_RECURSION_DEPTH,
176 max_element_size=MAX_ELEMENT_SIZE,
177 )
178 return elements[:MAX_ELEMENTS_TO_PROCESS]
179 except Exception as e:
180 self.logger.warning(
181 f"Tree-sitter parsing failed for {language}: {e}. Using fallback."
182 )
183 return []