Coverage for src/qdrant_loader/core/chunking/strategy/code/code_section_splitter.py: 89%

113 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Code section splitter for intelligent code element extraction and merging.""" 

2 

3from typing import Any 

4 

5import structlog 

6 

7from qdrant_loader.core.chunking.strategy.base.section_splitter import ( 

8 BaseSectionSplitter, 

9) 

10from qdrant_loader.core.document import Document 

11 

12from .code_document_parser import CodeDocumentParser, CodeElement, CodeElementType 

13 

14logger = structlog.get_logger(__name__) 

15 

16 

17class CodeSectionSplitter(BaseSectionSplitter): 

18 """Section splitter for code documents with intelligent element merging.""" 

19 

20 def __init__(self, settings): 

21 """Initialize the code section splitter. 

22 

23 Args: 

24 settings: Configuration settings 

25 """ 

26 super().__init__(settings) 

27 self.logger = logger 

28 self.document_parser = CodeDocumentParser(settings) 

29 

30 # Code-specific configuration 

31 self.code_config = getattr( 

32 settings.global_config.chunking.strategies, "code", None 

33 ) 

34 self.chunk_size_threshold = getattr( 

35 self.code_config, "max_file_size_for_ast", 40000 

36 ) 

37 self.min_element_size = max( 

38 100, self.chunk_size // 10 

39 ) # Minimum size for standalone elements 

40 

41 def split_sections( 

42 self, content: str, document: Document = None 

43 ) -> list[dict[str, Any]]: 

44 """Split code content into sections based on programming language structure. 

45 

46 Args: 

47 content: Source code content 

48 document: Document being processed (for metadata) 

49 

50 Returns: 

51 List of section dictionaries with content and metadata 

52 """ 

53 if not content.strip(): 

54 return [ 

55 { 

56 "content": content, 

57 "metadata": { 

58 "section_type": "empty", 

59 "element_type": "empty", 

60 "language": "unknown", 

61 "parsing_method": "none", 

62 }, 

63 } 

64 ] 

65 

66 # Performance check: use simple splitting for very large files 

67 if len(content) > self.chunk_size_threshold: 

68 self.logger.info( 

69 f"Code file too large ({len(content)} bytes), using simple text-based splitting" 

70 ) 

71 return self._fallback_text_split(content) 

72 

73 # Detect language from document metadata or filename 

74 language = "unknown" 

75 if document: 

76 file_path = ( 

77 document.metadata.get("file_name") 

78 or document.source 

79 or document.title 

80 or "" 

81 ) 

82 language = self.document_parser.detect_language(file_path, content) 

83 

84 # Parse code elements using AST 

85 elements = self.document_parser.parse_code_elements(content, language) 

86 

87 if not elements: 

88 self.logger.debug(f"No {language} elements found, using fallback splitting") 

89 return self._fallback_text_split(content) 

90 

91 # Merge small elements to optimize chunk sizes 

92 merged_elements = self._merge_small_elements(elements) 

93 

94 # Limit total number of sections 

95 if len(merged_elements) > self.max_chunks_per_document: 

96 self.logger.warning( 

97 f"Too many code elements ({len(merged_elements)}), " 

98 f"limiting to {self.max_chunks_per_document}" 

99 ) 

100 merged_elements = merged_elements[: self.max_chunks_per_document] 

101 

102 # Convert elements to section dictionaries 

103 sections = [] 

104 for i, element in enumerate(merged_elements): 

105 section_metadata = self.document_parser.extract_section_metadata(element) 

106 section_metadata.update( 

107 { 

108 "section_index": i, 

109 "language": language, 

110 "parsing_method": "ast", 

111 "section_type": "code_element", 

112 } 

113 ) 

114 

115 sections.append({"content": element.content, "metadata": section_metadata}) 

116 

117 self.logger.debug( 

118 f"Split {language} code into {len(sections)} sections using AST parsing" 

119 ) 

120 

121 return sections 

122 

123 def _merge_small_elements(self, elements: list[CodeElement]) -> list[CodeElement]: 

124 """Merge small elements to optimize chunk sizes. 

125 

126 Args: 

127 elements: List of code elements to merge 

128 

129 Returns: 

130 List of merged elements optimized for chunk size 

131 """ 

132 if not elements: 

133 return [] 

134 

135 merged = [] 

136 current_group = [] 

137 current_size = 0 

138 

139 for element in elements: 

140 element_size = len(element.content) 

141 

142 # If element is large enough or is a significant code structure, keep it separate 

143 if ( 

144 element_size >= self.min_element_size 

145 or element.element_type 

146 in [ 

147 CodeElementType.CLASS, 

148 CodeElementType.FUNCTION, 

149 CodeElementType.INTERFACE, 

150 CodeElementType.ENUM, 

151 ] 

152 or ( 

153 element.element_type == CodeElementType.METHOD 

154 and element_size > 100 

155 ) 

156 ): 

157 # First, add any accumulated small elements 

158 if current_group: 

159 merged_element = self._create_merged_element(current_group) 

160 merged.append(merged_element) 

161 current_group = [] 

162 current_size = 0 

163 

164 # Add the large element 

165 merged.append(element) 

166 else: 

167 # Accumulate small elements 

168 current_group.append(element) 

169 current_size += element_size 

170 

171 # If accumulated size is large enough, create a merged element 

172 if current_size >= self.min_element_size: 

173 merged_element = self._create_merged_element(current_group) 

174 merged.append(merged_element) 

175 current_group = [] 

176 current_size = 0 

177 

178 # Handle remaining small elements 

179 if current_group: 

180 merged_element = self._create_merged_element(current_group) 

181 merged.append(merged_element) 

182 

183 return merged 

184 

185 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement: 

186 """Create a merged element from a list of small elements. 

187 

188 Args: 

189 elements: List of elements to merge 

190 

191 Returns: 

192 Merged code element 

193 """ 

194 if not elements: 

195 raise ValueError("Cannot merge empty list of elements") 

196 

197 if len(elements) == 1: 

198 return elements[0] 

199 

200 # Create merged element 

201 merged_content = "\n\n".join(element.content for element in elements) 

202 merged_names = [element.name for element in elements] 

203 

204 merged_element = CodeElement( 

205 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})", 

206 element_type=CodeElementType.MODULE, # Use module as generic container 

207 content=merged_content, 

208 start_line=elements[0].start_line, 

209 end_line=elements[-1].end_line, 

210 level=min(element.level for element in elements), 

211 ) 

212 

213 # Merge dependencies 

214 all_dependencies = [] 

215 for element in elements: 

216 all_dependencies.extend(element.dependencies) 

217 merged_element.dependencies = list(set(all_dependencies)) 

218 

219 # Aggregate decorators 

220 all_decorators = [] 

221 for element in elements: 

222 all_decorators.extend(element.decorators) 

223 merged_element.decorators = list(set(all_decorators)) 

224 

225 # Set merged element properties 

226 merged_element.is_async = any(element.is_async for element in elements) 

227 merged_element.is_static = any(element.is_static for element in elements) 

228 merged_element.complexity = sum(element.complexity for element in elements) 

229 

230 return merged_element 

231 

232 def _fallback_text_split(self, content: str) -> list[dict[str, Any]]: 

233 """Fallback to simple text-based splitting for large files or parsing failures. 

234 

235 Args: 

236 content: Source code content 

237 

238 Returns: 

239 List of section dictionaries 

240 """ 

241 # Split by functions and classes using simple regex patterns 

242 import re 

243 

244 sections = [] 

245 lines = content.split("\n") 

246 current_section = [] 

247 current_start_line = 1 

248 

249 # Common patterns for different languages 

250 function_patterns = [ 

251 r"^\s*(def\s+\w+|function\s+\w+|func\s+\w+)", # Python, JS, Go 

252 r"^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\(", # Java, C# 

253 r"^\s*class\s+\w+", # Class definitions 

254 ] 

255 

256 pattern = "|".join(function_patterns) 

257 

258 for i, line in enumerate(lines, 1): 

259 if re.match(pattern, line) and current_section: 

260 # Start of a new function/class, save current section 

261 section_content = "\n".join(current_section) 

262 if section_content.strip(): 

263 sections.append( 

264 { 

265 "content": section_content, 

266 "metadata": { 

267 "section_type": "code_block", 

268 "element_type": "code_block", 

269 "start_line": current_start_line, 

270 "end_line": i - 1, 

271 "line_count": len(current_section), 

272 "parsing_method": "regex_fallback", 

273 "language": "unknown", 

274 }, 

275 } 

276 ) 

277 

278 # Start new section 

279 current_section = [line] 

280 current_start_line = i 

281 else: 

282 current_section.append(line) 

283 

284 # Limit section size to prevent overly large chunks 

285 if len("\n".join(current_section)) > self.chunk_size and current_section: 

286 section_content = "\n".join(current_section) 

287 sections.append( 

288 { 

289 "content": section_content, 

290 "metadata": { 

291 "section_type": "code_block", 

292 "element_type": "code_block", 

293 "start_line": current_start_line, 

294 "end_line": i, 

295 "line_count": len(current_section), 

296 "parsing_method": "regex_fallback", 

297 "language": "unknown", 

298 }, 

299 } 

300 ) 

301 current_section = [] 

302 current_start_line = i + 1 

303 

304 # Add remaining content 

305 if current_section: 

306 section_content = "\n".join(current_section) 

307 if section_content.strip(): 

308 sections.append( 

309 { 

310 "content": section_content, 

311 "metadata": { 

312 "section_type": "code_block", 

313 "element_type": "code_block", 

314 "start_line": current_start_line, 

315 "end_line": len(lines), 

316 "line_count": len(current_section), 

317 "parsing_method": "regex_fallback", 

318 "language": "unknown", 

319 }, 

320 } 

321 ) 

322 

323 # If no sections found, return the entire content as one section 

324 if not sections: 

325 sections.append( 

326 { 

327 "content": content, 

328 "metadata": { 

329 "section_type": "code_block", 

330 "element_type": "unknown", 

331 "start_line": 1, 

332 "end_line": len(lines), 

333 "line_count": len(lines), 

334 "parsing_method": "fallback_single", 

335 "language": "unknown", 

336 }, 

337 } 

338 ) 

339 

340 return sections