Coverage for src / qdrant_loader / core / chunking / strategy / code / parser / python_ast.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1from __future__ import annotations 

2 

3import ast 

4 

5from qdrant_loader.core.chunking.strategy.code.parser.common import ( 

6 CodeElement, 

7 CodeElementType, 

8) 

9 

10 

11def parse_python_ast( 

12 content: str, 

13 *, 

14 max_elements_to_process: int, 

15) -> list[CodeElement]: 

16 try: 

17 tree = ast.parse(content) 

18 except Exception: 

19 return [] 

20 

21 elements: list[CodeElement] = [] 

22 content_lines = content.split("\n") 

23 

24 def add_element(node: ast.AST, elem_type: CodeElementType, level: int) -> bool: 

25 """Add a code element from an AST node. Returns False if limit reached.""" 

26 if len(elements) >= max_elements_to_process: 

27 return False 

28 try: 

29 start_line: int = node.lineno # type: ignore[attr-defined] 

30 end_line: int = node.end_lineno # type: ignore[attr-defined] 

31 except AttributeError: 

32 return True 

33 snippet = "\n".join(content_lines[start_line - 1 : end_line]) 

34 if not snippet.strip(): 

35 return True 

36 elements.append( 

37 CodeElement( 

38 name=getattr(node, "name", type(node).__name__), 

39 element_type=elem_type, 

40 content=snippet, 

41 start_line=start_line, 

42 end_line=end_line, 

43 level=level, 

44 ) 

45 ) 

46 return True 

47 

48 def flush_module_group(group: list[ast.AST]) -> bool: 

49 """Combine consecutive module-level statements into one MODULE element.""" 

50 if not group: 

51 return True 

52 try: 

53 start_line: int = group[0].lineno # type: ignore[attr-defined] 

54 end_line: int = group[-1].end_lineno # type: ignore[attr-defined] 

55 except AttributeError: 

56 return True 

57 snippet = "\n".join(content_lines[start_line - 1 : end_line]) 

58 if not snippet.strip(): 

59 return True 

60 if len(elements) >= max_elements_to_process: 

61 return False 

62 elements.append( 

63 CodeElement( 

64 name="module", 

65 element_type=CodeElementType.MODULE, 

66 content=snippet, 

67 start_line=start_line, 

68 end_line=end_line, 

69 level=0, 

70 ) 

71 ) 

72 return True 

73 

74 # Walk only the top-level statements of the module so we get exactly 

75 # one element per top-level class / function, with no overlap from 

76 # nested methods, assignments, or control-flow nodes. 

77 # Non-class/function statements (imports, constants, if __name__ == ..., etc.) 

78 # are grouped together and emitted as a MODULE element. 

79 current_group: list[ast.AST] = [] 

80 for node in tree.body: 

81 if isinstance(node, ast.ClassDef): 

82 if not flush_module_group(current_group): 

83 break 

84 current_group = [] 

85 if not add_element(node, CodeElementType.CLASS, 0): 

86 break 

87 elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

88 if not flush_module_group(current_group): 

89 break 

90 current_group = [] 

91 if not add_element(node, CodeElementType.FUNCTION, 0): 

92 break 

93 else: 

94 current_group.append(node) 

95 

96 # Flush any trailing module-level code (e.g. if __name__ == "__main__") 

97 flush_module_group(current_group) 

98 

99 return elements