Coverage for src/qdrant_loader/core/chunking/strategy/code/code_document_parser.py: 81%
207 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Code document parser for AST analysis and language detection."""
3import ast
4from dataclasses import dataclass, field
5from enum import Enum
6from typing import Any, Optional
8import structlog
10# Tree-sitter imports with error handling
11try:
12 from tree_sitter_languages import get_parser
14 TREE_SITTER_AVAILABLE = True
15except ImportError:
16 TREE_SITTER_AVAILABLE = False
17 get_parser = None
19from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser
21logger = structlog.get_logger(__name__)
23# Performance constants - Universal limits for all code files
24MAX_FILE_SIZE_FOR_AST = 75_000 # 75KB limit for AST parsing
25MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts
26MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth
27MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this
30class CodeElementType(Enum):
31 """Types of code elements."""
33 MODULE = "module"
34 CLASS = "class"
35 FUNCTION = "function"
36 METHOD = "method"
37 PROPERTY = "property"
38 VARIABLE = "variable"
39 IMPORT = "import"
40 COMMENT = "comment"
41 DOCSTRING = "docstring"
42 DECORATOR = "decorator"
43 CONSTANT = "constant"
44 INTERFACE = "interface"
45 ENUM = "enum"
46 STRUCT = "struct"
47 NAMESPACE = "namespace"
48 PACKAGE = "package"
51@dataclass
52class CodeElement:
53 """Represents a code element with its metadata."""
55 name: str
56 element_type: CodeElementType
57 content: str
58 start_line: int
59 end_line: int
60 level: int = 0
61 parent: Optional["CodeElement"] = None
62 children: list["CodeElement"] = field(default_factory=list)
63 docstring: str = None
64 decorators: list[str] = field(default_factory=list)
65 parameters: list[str] = field(default_factory=list)
66 return_type: str = None
67 visibility: str = "public" # public, private, protected
68 is_async: bool = False
69 is_static: bool = False
70 is_abstract: bool = False
71 complexity: int = 0 # Cyclomatic complexity
72 dependencies: list[str] = field(default_factory=list)
74 def add_child(self, child: "CodeElement"):
75 """Add a child element."""
76 self.children.append(child)
77 child.parent = self
80class CodeDocumentParser(BaseDocumentParser):
81 """Parser for code documents with AST analysis and language detection."""
83 def __init__(self, settings):
84 """Initialize the code document parser.
86 Args:
87 settings: Configuration settings
88 """
89 self.settings = settings
90 self.logger = logger
92 # Language detection patterns
93 self.language_patterns = {
94 ".py": "python",
95 ".pyx": "python",
96 ".pyi": "python",
97 ".java": "java",
98 ".js": "javascript",
99 ".jsx": "javascript",
100 ".mjs": "javascript",
101 ".ts": "typescript",
102 ".tsx": "typescript",
103 ".go": "go",
104 ".rs": "rust",
105 ".cpp": "cpp",
106 ".cc": "cpp",
107 ".cxx": "cpp",
108 ".c": "c",
109 ".h": "c",
110 ".cs": "c_sharp",
111 ".php": "php",
112 ".rb": "ruby",
113 ".kt": "kotlin",
114 ".scala": "scala",
115 ".swift": "swift",
116 ".dart": "dart",
117 }
119 # Cache for Tree-sitter parsers
120 self._parsers = {}
122 # Check tree-sitter availability
123 if not TREE_SITTER_AVAILABLE:
124 self.logger.warning("Tree-sitter not available, will use fallback parsing")
126 def parse_document_structure(self, content: str) -> dict[str, Any]:
127 """Parse code document structure and extract programming language information.
129 Args:
130 content: Source code content
132 Returns:
133 Dictionary containing code document structure information
134 """
135 # For base analysis, we return general structure info
136 lines = content.split("\n")
137 non_empty_lines = [line for line in lines if line.strip()]
139 # Basic code metrics
140 structure = {
141 "total_lines": len(lines),
142 "non_empty_lines": len(non_empty_lines),
143 "blank_lines": len(lines) - len(non_empty_lines),
144 "avg_line_length": (
145 sum(len(line) for line in lines) / len(lines) if lines else 0
146 ),
147 "max_line_length": max(len(line) for line in lines) if lines else 0,
148 "structure_type": "code",
149 "has_comments": any(
150 line.strip().startswith(("#", "//", "/*", "--")) for line in lines
151 ),
152 "has_docstrings": '"""' in content or "'''" in content,
153 "complexity_indicators": {
154 "if_statements": content.count("if "),
155 "loop_statements": content.count("for ") + content.count("while "),
156 "function_definitions": content.count("def ")
157 + content.count("function "),
158 "class_definitions": content.count("class "),
159 },
160 }
162 return structure
164 def extract_section_metadata(self, element: CodeElement) -> dict[str, Any]:
165 """Extract metadata from a code element.
167 Args:
168 element: Code element to extract metadata from
170 Returns:
171 Dictionary containing element metadata
172 """
173 metadata = {
174 "element_type": element.element_type.value,
175 "element_name": element.name,
176 "start_line": element.start_line,
177 "end_line": element.end_line,
178 "line_count": element.end_line - element.start_line + 1,
179 "level": element.level,
180 "visibility": element.visibility,
181 "is_async": element.is_async,
182 "is_static": element.is_static,
183 "is_abstract": element.is_abstract,
184 "complexity": element.complexity,
185 "has_docstring": bool(element.docstring),
186 "decorator_count": len(element.decorators),
187 "parameter_count": len(element.parameters),
188 "dependency_count": len(element.dependencies),
189 "child_count": len(element.children),
190 }
192 # Add optional fields if present
193 if element.docstring:
194 metadata["docstring_length"] = len(element.docstring)
195 if element.decorators:
196 metadata["decorators"] = element.decorators
197 if element.parameters:
198 metadata["parameters"] = element.parameters
199 if element.return_type:
200 metadata["return_type"] = element.return_type
201 if element.dependencies:
202 metadata["dependencies"] = element.dependencies
204 return metadata
206 def detect_language(self, file_path: str, content: str) -> str:
207 """Detect programming language from file extension.
209 Args:
210 file_path: Path to the file
211 content: File content (for future content-based detection)
213 Returns:
214 Detected language name or "unknown"
215 """
216 # Get file extension
217 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else ""
218 return self.language_patterns.get(ext, "unknown")
220 def parse_code_elements(self, content: str, language: str) -> list[CodeElement]:
221 """Parse code content into structured elements using AST.
223 Args:
224 content: Source code content
225 language: Programming language
227 Returns:
228 List of code elements
229 """
230 # Performance check: universal size limit for all languages
231 if len(content) > MAX_FILE_SIZE_FOR_AST:
232 self.logger.info(
233 f"{language.title()} file too large for AST parsing ({len(content)} bytes), skipping"
234 )
235 return []
237 elements = []
239 # Try language-specific parsing
240 if language == "python":
241 # Try Python AST first for Python files
242 self.logger.debug("Parsing Python with built-in AST")
243 elements = self._parse_python_ast(content)
245 # Fallback to tree-sitter if Python AST fails
246 if not elements and TREE_SITTER_AVAILABLE:
247 self.logger.debug("Falling back to Tree-sitter for Python")
248 elements = self._parse_with_tree_sitter(content, language)
250 elif language != "unknown" and TREE_SITTER_AVAILABLE:
251 # Use tree-sitter for other supported languages
252 self.logger.debug(f"Parsing {language} with Tree-sitter")
253 elements = self._parse_with_tree_sitter(content, language)
255 return elements
257 def _get_tree_sitter_parser(self, language: str):
258 """Get or create a Tree-sitter parser for the given language.
260 Args:
261 language: Tree-sitter language name
263 Returns:
264 Tree-sitter parser or None if not available
265 """
266 if not TREE_SITTER_AVAILABLE or get_parser is None:
267 return None
269 if language in self._parsers:
270 return self._parsers[language]
272 try:
273 parser = get_parser(language)
274 self._parsers[language] = parser
275 return parser
276 except Exception as e:
277 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}")
278 return None
280 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]:
281 """Parse code using Tree-sitter AST.
283 Args:
284 content: Source code content
285 language: Programming language
287 Returns:
288 List of code elements
289 """
290 parser = self._get_tree_sitter_parser(language)
291 if not parser:
292 return []
294 try:
295 tree = parser.parse(content.encode("utf-8"))
296 root_node = tree.root_node
298 elements = []
299 self._extract_tree_sitter_elements(
300 root_node, content.encode("utf-8"), elements, language, level=0
301 )
303 # Limit elements to prevent timeouts
304 if len(elements) > MAX_ELEMENTS_TO_PROCESS:
305 self.logger.warning(
306 f"Too many elements ({len(elements)}), limiting to {MAX_ELEMENTS_TO_PROCESS}"
307 )
308 elements = elements[:MAX_ELEMENTS_TO_PROCESS]
310 return elements
312 except Exception as e:
313 self.logger.warning(f"Tree-sitter parsing failed for {language}: {e}")
314 return []
316 def _extract_tree_sitter_elements(
317 self,
318 node,
319 content_bytes: bytes,
320 elements: list[CodeElement],
321 language: str,
322 level: int = 0,
323 ):
324 """Extract elements from Tree-sitter AST node.
326 Args:
327 node: Tree-sitter node
328 content_bytes: Source code as bytes
329 elements: List to append elements to
330 language: Programming language
331 level: Current nesting level
332 """
333 if level > MAX_RECURSION_DEPTH:
334 return
336 # Define element type mapping based on node type
337 element_type_map = {
338 "function_definition": CodeElementType.FUNCTION,
339 "method_definition": CodeElementType.METHOD,
340 "class_definition": CodeElementType.CLASS,
341 "interface_declaration": CodeElementType.INTERFACE,
342 "enum_declaration": CodeElementType.ENUM,
343 "struct_declaration": CodeElementType.STRUCT,
344 "variable_declaration": CodeElementType.VARIABLE,
345 "import_statement": CodeElementType.IMPORT,
346 "comment": CodeElementType.COMMENT,
347 }
349 # Map node type to code element type
350 element_type = element_type_map.get(node.type)
352 if element_type:
353 # Extract element content
354 element_content = content_bytes[node.start_byte : node.end_byte].decode(
355 "utf-8"
356 )
358 # Skip overly large elements
359 if len(element_content) > MAX_ELEMENT_SIZE:
360 self.logger.debug(
361 f"Skipping large {node.type} element ({len(element_content)} chars)"
362 )
363 return
365 # Extract element name
366 element_name = self._extract_element_name(node, content_bytes, language)
368 # Create code element
369 element = CodeElement(
370 name=element_name,
371 element_type=element_type,
372 content=element_content,
373 start_line=node.start_point[0] + 1,
374 end_line=node.end_point[0] + 1,
375 level=level,
376 )
378 # Extract additional metadata
379 self._enrich_element_metadata(element, node, content_bytes, language)
381 elements.append(element)
383 # Recursively process child nodes
384 for child in node.children:
385 self._extract_tree_sitter_elements(
386 child, content_bytes, elements, language, level + 1
387 )
389 def _extract_element_name(self, node, content_bytes: bytes, language: str) -> str:
390 """Extract element name from Tree-sitter node.
392 Args:
393 node: Tree-sitter node
394 content_bytes: Source code as bytes
395 language: Programming language
397 Returns:
398 Element name or default name
399 """
400 # Try to find identifier child node
401 for child in node.children:
402 if child.type == "identifier":
403 return content_bytes[child.start_byte : child.end_byte].decode("utf-8")
405 # Fallback to node type
406 return f"unnamed_{node.type}"
408 def _enrich_element_metadata(
409 self, element: CodeElement, node, content_bytes: bytes, language: str
410 ):
411 """Enrich code element with additional metadata from AST node.
413 Args:
414 element: Code element to enrich
415 node: Tree-sitter node
416 content_bytes: Source code as bytes
417 language: Programming language
418 """
419 # Extract decorators, parameters, etc. based on language
420 if language == "python":
421 self._enrich_python_metadata(element, node, content_bytes)
422 elif language in ["javascript", "typescript"]:
423 self._enrich_javascript_metadata(element, node, content_bytes)
424 # Add more language-specific enrichment as needed
426 def _enrich_python_metadata(self, element: CodeElement, node, content_bytes: bytes):
427 """Enrich element with Python-specific metadata."""
428 # Look for decorators
429 for child in node.children:
430 if child.type == "decorator":
431 decorator_name = content_bytes[
432 child.start_byte : child.end_byte
433 ].decode("utf-8")
434 element.decorators.append(decorator_name.strip())
436 # Check for async
437 element.is_async = any(child.type == "async" for child in node.children)
439 def _enrich_javascript_metadata(
440 self, element: CodeElement, node, content_bytes: bytes
441 ):
442 """Enrich element with JavaScript/TypeScript-specific metadata."""
443 # Check for async
444 element.is_async = any(child.type == "async" for child in node.children)
446 # Check for static
447 element.is_static = any(child.type == "static" for child in node.children)
449 def _parse_python_ast(self, content: str) -> list[CodeElement]:
450 """Parse Python code using built-in AST module.
452 Args:
453 content: Python source code
455 Returns:
456 List of code elements
457 """
458 try:
459 tree = ast.parse(content)
460 elements = []
461 self._extract_ast_elements(tree, content, elements, level=0)
462 return elements
464 except SyntaxError as e:
465 self.logger.warning(f"Python AST parsing failed: {e}")
466 return []
467 except Exception as e:
468 self.logger.warning(f"Unexpected error in Python AST parsing: {e}")
469 return []
471 def _extract_ast_elements(
472 self, node: ast.AST, content: str, elements: list[CodeElement], level: int = 0
473 ):
474 """Extract elements from Python AST node.
476 Args:
477 node: AST node
478 content: Source code content
479 elements: List to append elements to
480 level: Current nesting level
481 """
482 if level > MAX_RECURSION_DEPTH:
483 return
485 lines = content.split("\n")
487 if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
488 element_type = (
489 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION
490 )
492 # Extract function content
493 start_line = node.lineno
494 end_line = node.end_lineno or start_line
495 element_content = "\n".join(lines[start_line - 1 : end_line])
497 element = CodeElement(
498 name=node.name,
499 element_type=element_type,
500 content=element_content,
501 start_line=start_line,
502 end_line=end_line,
503 level=level,
504 is_async=isinstance(node, ast.AsyncFunctionDef),
505 parameters=[arg.arg for arg in node.args.args],
506 decorators=[self._get_decorator_name(d) for d in node.decorator_list],
507 )
509 # Extract docstring
510 if (
511 node.body
512 and isinstance(node.body[0], ast.Expr)
513 and isinstance(node.body[0].value, ast.Constant)
514 and isinstance(node.body[0].value.value, str)
515 ):
516 element.docstring = node.body[0].value.value
518 elements.append(element)
520 elif isinstance(node, ast.ClassDef):
521 start_line = node.lineno
522 end_line = node.end_lineno or start_line
523 element_content = "\n".join(lines[start_line - 1 : end_line])
525 element = CodeElement(
526 name=node.name,
527 element_type=CodeElementType.CLASS,
528 content=element_content,
529 start_line=start_line,
530 end_line=end_line,
531 level=level,
532 decorators=[self._get_decorator_name(d) for d in node.decorator_list],
533 )
535 # Extract docstring
536 if (
537 node.body
538 and isinstance(node.body[0], ast.Expr)
539 and isinstance(node.body[0].value, ast.Constant)
540 and isinstance(node.body[0].value.value, str)
541 ):
542 element.docstring = node.body[0].value.value
544 elements.append(element)
546 # Recursively process child nodes
547 for child in ast.iter_child_nodes(node):
548 self._extract_ast_elements(child, content, elements, level + 1)
550 def _get_decorator_name(self, decorator: ast.AST) -> str:
551 """Extract decorator name from AST node.
553 Args:
554 decorator: AST decorator node
556 Returns:
557 Decorator name as string
558 """
559 if isinstance(decorator, ast.Name):
560 return decorator.id
561 elif isinstance(decorator, ast.Attribute):
562 return f"{self._get_decorator_name(decorator.value)}.{decorator.attr}"
563 elif isinstance(decorator, ast.Call):
564 return f"{self._get_decorator_name(decorator.func)}()"
565 else:
566 return "unknown_decorator"