Coverage for src / qdrant_loader / core / chunking / strategy / code / parser / python_ast.py: 100%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1from __future__ import annotations
3import ast
5from qdrant_loader.core.chunking.strategy.code.parser.common import (
6 CodeElement,
7 CodeElementType,
8)
11def parse_python_ast(
12 content: str,
13 *,
14 max_elements_to_process: int,
15) -> list[CodeElement]:
16 try:
17 tree = ast.parse(content)
18 except Exception:
19 return []
21 elements: list[CodeElement] = []
22 content_lines = content.split("\n")
24 def add_element(node: ast.AST, elem_type: CodeElementType, level: int) -> bool:
25 """Add a code element from an AST node. Returns False if limit reached."""
26 if len(elements) >= max_elements_to_process:
27 return False
28 try:
29 start_line: int = node.lineno # type: ignore[attr-defined]
30 end_line: int = node.end_lineno # type: ignore[attr-defined]
31 except AttributeError:
32 return True
33 snippet = "\n".join(content_lines[start_line - 1 : end_line])
34 if not snippet.strip():
35 return True
36 elements.append(
37 CodeElement(
38 name=getattr(node, "name", type(node).__name__),
39 element_type=elem_type,
40 content=snippet,
41 start_line=start_line,
42 end_line=end_line,
43 level=level,
44 )
45 )
46 return True
48 def flush_module_group(group: list[ast.AST]) -> bool:
49 """Combine consecutive module-level statements into one MODULE element."""
50 if not group:
51 return True
52 try:
53 start_line: int = group[0].lineno # type: ignore[attr-defined]
54 end_line: int = group[-1].end_lineno # type: ignore[attr-defined]
55 except AttributeError:
56 return True
57 snippet = "\n".join(content_lines[start_line - 1 : end_line])
58 if not snippet.strip():
59 return True
60 if len(elements) >= max_elements_to_process:
61 return False
62 elements.append(
63 CodeElement(
64 name="module",
65 element_type=CodeElementType.MODULE,
66 content=snippet,
67 start_line=start_line,
68 end_line=end_line,
69 level=0,
70 )
71 )
72 return True
74 # Walk only the top-level statements of the module so we get exactly
75 # one element per top-level class / function, with no overlap from
76 # nested methods, assignments, or control-flow nodes.
77 # Non-class/function statements (imports, constants, if __name__ == ..., etc.)
78 # are grouped together and emitted as a MODULE element.
79 current_group: list[ast.AST] = []
80 for node in tree.body:
81 if isinstance(node, ast.ClassDef):
82 if not flush_module_group(current_group):
83 break
84 current_group = []
85 if not add_element(node, CodeElementType.CLASS, 0):
86 break
87 elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
88 if not flush_module_group(current_group):
89 break
90 current_group = []
91 if not add_element(node, CodeElementType.FUNCTION, 0):
92 break
93 else:
94 current_group.append(node)
96 # Flush any trailing module-level code (e.g. if __name__ == "__main__")
97 flush_module_group(current_group)
99 return elements