Coverage for src/qdrant_loader/core/chunking/strategy/code/code_section_splitter.py: 89%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Code section splitter for intelligent code element extraction and merging.""" 

2 

3from typing import Any 

4 

5import structlog 

6 

7from qdrant_loader.core.chunking.strategy.base.section_splitter import ( 

8 BaseSectionSplitter, 

9) 

10from qdrant_loader.core.chunking.strategy.code.parser.common import ( 

11 CodeElement, 

12 CodeElementType, 

13) 

14from qdrant_loader.core.document import Document 

15 

16from .code_document_parser import CodeDocumentParser 

17 

18logger = structlog.get_logger(__name__) 

19 

20 

21class CodeSectionSplitter(BaseSectionSplitter): 

22 """Section splitter for code documents with intelligent element merging.""" 

23 

24 def __init__(self, settings): 

25 """Initialize the code section splitter. 

26 

27 Args: 

28 settings: Configuration settings 

29 """ 

30 super().__init__(settings) 

31 self.logger = logger 

32 self.document_parser = CodeDocumentParser(settings) 

33 

34 # Code-specific configuration 

35 self.code_config = getattr( 

36 settings.global_config.chunking.strategies, "code", None 

37 ) 

38 self.chunk_size_threshold = getattr( 

39 self.code_config, "max_file_size_for_ast", 40000 

40 ) 

41 self.min_element_size = max( 

42 100, self.chunk_size // 10 

43 ) # Minimum size for standalone elements 

44 

45 def split_sections( 

46 self, content: str, document: Document = None 

47 ) -> list[dict[str, Any]]: 

48 """Split code content into sections based on programming language structure. 

49 

50 Args: 

51 content: Source code content 

52 document: Document being processed (for metadata) 

53 

54 Returns: 

55 List of section dictionaries with content and metadata 

56 """ 

57 if not content.strip(): 

58 return [ 

59 { 

60 "content": content, 

61 "metadata": { 

62 "section_type": "empty", 

63 "element_type": "empty", 

64 "language": "unknown", 

65 "parsing_method": "none", 

66 }, 

67 } 

68 ] 

69 

70 # Performance check: use simple splitting for very large files 

71 if len(content) > self.chunk_size_threshold: 

72 self.logger.info( 

73 f"Code file too large ({len(content)} bytes), using simple text-based splitting" 

74 ) 

75 return self._fallback_text_split(content) 

76 

77 # Detect language from document metadata or filename 

78 language = "unknown" 

79 if document: 

80 file_path = ( 

81 document.metadata.get("file_name") 

82 or document.source 

83 or document.title 

84 or "" 

85 ) 

86 language = self.document_parser.detect_language(file_path, content) 

87 

88 # Parse code elements using AST 

89 elements = self.document_parser.parse_code_elements(content, language) 

90 

91 if not elements: 

92 self.logger.debug(f"No {language} elements found, using fallback splitting") 

93 return self._fallback_text_split(content) 

94 

95 # Merge small elements to optimize chunk sizes 

96 merged_elements = self._merge_small_elements(elements) 

97 

98 # Limit total number of sections 

99 if len(merged_elements) > self.max_chunks_per_document: 

100 self.logger.warning( 

101 f"Too many code elements ({len(merged_elements)}), " 

102 f"limiting to {self.max_chunks_per_document}" 

103 ) 

104 merged_elements = merged_elements[: self.max_chunks_per_document] 

105 

106 # Convert elements to section dictionaries 

107 sections = [] 

108 for i, element in enumerate(merged_elements): 

109 section_metadata = self.document_parser.extract_section_metadata(element) 

110 section_metadata.update( 

111 { 

112 "section_index": i, 

113 "language": language, 

114 "parsing_method": "ast", 

115 "section_type": "code_element", 

116 } 

117 ) 

118 

119 sections.append({"content": element.content, "metadata": section_metadata}) 

120 

121 self.logger.debug( 

122 f"Split {language} code into {len(sections)} sections using AST parsing" 

123 ) 

124 

125 return sections 

126 

127 def _merge_small_elements(self, elements: list[CodeElement]) -> list[CodeElement]: 

128 """Merge small elements to optimize chunk sizes. 

129 

130 Args: 

131 elements: List of code elements to merge 

132 

133 Returns: 

134 List of merged elements optimized for chunk size 

135 """ 

136 if not elements: 

137 return [] 

138 

139 merged = [] 

140 current_group = [] 

141 current_size = 0 

142 

143 for element in elements: 

144 element_size = len(element.content) 

145 

146 # If element is large enough or is a significant code structure, keep it separate 

147 if ( 

148 element_size >= self.min_element_size 

149 or element.element_type 

150 in [ 

151 CodeElementType.CLASS, 

152 CodeElementType.FUNCTION, 

153 CodeElementType.INTERFACE, 

154 CodeElementType.ENUM, 

155 ] 

156 or ( 

157 element.element_type == CodeElementType.METHOD 

158 and element_size > 100 

159 ) 

160 ): 

161 # First, add any accumulated small elements 

162 if current_group: 

163 merged_element = self._create_merged_element(current_group) 

164 merged.append(merged_element) 

165 current_group = [] 

166 current_size = 0 

167 

168 # Add the large element 

169 merged.append(element) 

170 else: 

171 # Accumulate small elements 

172 current_group.append(element) 

173 current_size += element_size 

174 

175 # If accumulated size is large enough, create a merged element 

176 if current_size >= self.min_element_size: 

177 merged_element = self._create_merged_element(current_group) 

178 merged.append(merged_element) 

179 current_group = [] 

180 current_size = 0 

181 

182 # Handle remaining small elements 

183 if current_group: 

184 merged_element = self._create_merged_element(current_group) 

185 merged.append(merged_element) 

186 

187 return merged 

188 

189 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement: 

190 """Create a merged element from a list of small elements. 

191 

192 Args: 

193 elements: List of elements to merge 

194 

195 Returns: 

196 Merged code element 

197 """ 

198 if not elements: 

199 raise ValueError("Cannot merge empty list of elements") 

200 

201 if len(elements) == 1: 

202 return elements[0] 

203 

204 # Create merged element 

205 merged_content = "\n\n".join(element.content for element in elements) 

206 merged_names = [element.name for element in elements] 

207 

208 merged_element = CodeElement( 

209 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})", 

210 element_type=CodeElementType.MODULE, # Use module as generic container 

211 content=merged_content, 

212 start_line=elements[0].start_line, 

213 end_line=elements[-1].end_line, 

214 level=min(element.level for element in elements), 

215 ) 

216 

217 # Merge dependencies 

218 all_dependencies = [] 

219 for element in elements: 

220 all_dependencies.extend(element.dependencies) 

221 merged_element.dependencies = list(set(all_dependencies)) 

222 

223 # Aggregate decorators 

224 all_decorators = [] 

225 for element in elements: 

226 all_decorators.extend(element.decorators) 

227 merged_element.decorators = list(set(all_decorators)) 

228 

229 # Set merged element properties 

230 merged_element.is_async = any(element.is_async for element in elements) 

231 merged_element.is_static = any(element.is_static for element in elements) 

232 merged_element.complexity = sum(element.complexity for element in elements) 

233 

234 return merged_element 

235 

236 def _fallback_text_split(self, content: str) -> list[dict[str, Any]]: 

237 """Fallback to simple text-based splitting for large files or parsing failures. 

238 

239 Args: 

240 content: Source code content 

241 

242 Returns: 

243 List of section dictionaries 

244 """ 

245 # Split by functions and classes using simple regex patterns 

246 import re 

247 

248 sections = [] 

249 lines = content.split("\n") 

250 current_section = [] 

251 current_start_line = 1 

252 

253 # Common patterns for different languages 

254 function_patterns = [ 

255 r"^\s*(def\s+\w+|function\s+\w+|func\s+\w+)", # Python, JS, Go 

256 r"^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\(", # Java, C# 

257 r"^\s*class\s+\w+", # Class definitions 

258 ] 

259 

260 pattern = "|".join(function_patterns) 

261 

262 for i, line in enumerate(lines, 1): 

263 if re.match(pattern, line) and current_section: 

264 # Start of a new function/class, save current section 

265 section_content = "\n".join(current_section) 

266 if section_content.strip(): 

267 sections.append( 

268 { 

269 "content": section_content, 

270 "metadata": { 

271 "section_type": "code_block", 

272 "element_type": "code_block", 

273 "start_line": current_start_line, 

274 "end_line": i - 1, 

275 "line_count": len(current_section), 

276 "parsing_method": "regex_fallback", 

277 "language": "unknown", 

278 }, 

279 } 

280 ) 

281 

282 # Start new section 

283 current_section = [line] 

284 current_start_line = i 

285 else: 

286 current_section.append(line) 

287 

288 # Limit section size to prevent overly large chunks 

289 if len("\n".join(current_section)) > self.chunk_size and current_section: 

290 section_content = "\n".join(current_section) 

291 sections.append( 

292 { 

293 "content": section_content, 

294 "metadata": { 

295 "section_type": "code_block", 

296 "element_type": "code_block", 

297 "start_line": current_start_line, 

298 "end_line": i, 

299 "line_count": len(current_section), 

300 "parsing_method": "regex_fallback", 

301 "language": "unknown", 

302 }, 

303 } 

304 ) 

305 current_section = [] 

306 current_start_line = i + 1 

307 

308 # Add remaining content 

309 if current_section: 

310 section_content = "\n".join(current_section) 

311 if section_content.strip(): 

312 sections.append( 

313 { 

314 "content": section_content, 

315 "metadata": { 

316 "section_type": "code_block", 

317 "element_type": "code_block", 

318 "start_line": current_start_line, 

319 "end_line": len(lines), 

320 "line_count": len(current_section), 

321 "parsing_method": "regex_fallback", 

322 "language": "unknown", 

323 }, 

324 } 

325 ) 

326 

327 # If no sections found, return the entire content as one section 

328 if not sections: 

329 sections.append( 

330 { 

331 "content": content, 

332 "metadata": { 

333 "section_type": "code_block", 

334 "element_type": "unknown", 

335 "start_line": 1, 

336 "end_line": len(lines), 

337 "line_count": len(lines), 

338 "parsing_method": "fallback_single", 

339 "language": "unknown", 

340 }, 

341 } 

342 ) 

343 

344 return sections