Coverage for src/qdrant_loader/core/chunking/strategy/code/code_section_splitter.py: 89%
114 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Code section splitter for intelligent code element extraction and merging."""
3from typing import Any
5import structlog
7from qdrant_loader.core.chunking.strategy.base.section_splitter import (
8 BaseSectionSplitter,
9)
10from qdrant_loader.core.chunking.strategy.code.parser.common import (
11 CodeElement,
12 CodeElementType,
13)
14from qdrant_loader.core.document import Document
16from .code_document_parser import CodeDocumentParser
18logger = structlog.get_logger(__name__)
21class CodeSectionSplitter(BaseSectionSplitter):
22 """Section splitter for code documents with intelligent element merging."""
24 def __init__(self, settings):
25 """Initialize the code section splitter.
27 Args:
28 settings: Configuration settings
29 """
30 super().__init__(settings)
31 self.logger = logger
32 self.document_parser = CodeDocumentParser(settings)
34 # Code-specific configuration
35 self.code_config = getattr(
36 settings.global_config.chunking.strategies, "code", None
37 )
38 self.chunk_size_threshold = getattr(
39 self.code_config, "max_file_size_for_ast", 40000
40 )
41 self.min_element_size = max(
42 100, self.chunk_size // 10
43 ) # Minimum size for standalone elements
45 def split_sections(
46 self, content: str, document: Document = None
47 ) -> list[dict[str, Any]]:
48 """Split code content into sections based on programming language structure.
50 Args:
51 content: Source code content
52 document: Document being processed (for metadata)
54 Returns:
55 List of section dictionaries with content and metadata
56 """
57 if not content.strip():
58 return [
59 {
60 "content": content,
61 "metadata": {
62 "section_type": "empty",
63 "element_type": "empty",
64 "language": "unknown",
65 "parsing_method": "none",
66 },
67 }
68 ]
70 # Performance check: use simple splitting for very large files
71 if len(content) > self.chunk_size_threshold:
72 self.logger.info(
73 f"Code file too large ({len(content)} bytes), using simple text-based splitting"
74 )
75 return self._fallback_text_split(content)
77 # Detect language from document metadata or filename
78 language = "unknown"
79 if document:
80 file_path = (
81 document.metadata.get("file_name")
82 or document.source
83 or document.title
84 or ""
85 )
86 language = self.document_parser.detect_language(file_path, content)
88 # Parse code elements using AST
89 elements = self.document_parser.parse_code_elements(content, language)
91 if not elements:
92 self.logger.debug(f"No {language} elements found, using fallback splitting")
93 return self._fallback_text_split(content)
95 # Merge small elements to optimize chunk sizes
96 merged_elements = self._merge_small_elements(elements)
98 # Limit total number of sections
99 if len(merged_elements) > self.max_chunks_per_document:
100 self.logger.warning(
101 f"Too many code elements ({len(merged_elements)}), "
102 f"limiting to {self.max_chunks_per_document}"
103 )
104 merged_elements = merged_elements[: self.max_chunks_per_document]
106 # Convert elements to section dictionaries
107 sections = []
108 for i, element in enumerate(merged_elements):
109 section_metadata = self.document_parser.extract_section_metadata(element)
110 section_metadata.update(
111 {
112 "section_index": i,
113 "language": language,
114 "parsing_method": "ast",
115 "section_type": "code_element",
116 }
117 )
119 sections.append({"content": element.content, "metadata": section_metadata})
121 self.logger.debug(
122 f"Split {language} code into {len(sections)} sections using AST parsing"
123 )
125 return sections
127 def _merge_small_elements(self, elements: list[CodeElement]) -> list[CodeElement]:
128 """Merge small elements to optimize chunk sizes.
130 Args:
131 elements: List of code elements to merge
133 Returns:
134 List of merged elements optimized for chunk size
135 """
136 if not elements:
137 return []
139 merged = []
140 current_group = []
141 current_size = 0
143 for element in elements:
144 element_size = len(element.content)
146 # If element is large enough or is a significant code structure, keep it separate
147 if (
148 element_size >= self.min_element_size
149 or element.element_type
150 in [
151 CodeElementType.CLASS,
152 CodeElementType.FUNCTION,
153 CodeElementType.INTERFACE,
154 CodeElementType.ENUM,
155 ]
156 or (
157 element.element_type == CodeElementType.METHOD
158 and element_size > 100
159 )
160 ):
161 # First, add any accumulated small elements
162 if current_group:
163 merged_element = self._create_merged_element(current_group)
164 merged.append(merged_element)
165 current_group = []
166 current_size = 0
168 # Add the large element
169 merged.append(element)
170 else:
171 # Accumulate small elements
172 current_group.append(element)
173 current_size += element_size
175 # If accumulated size is large enough, create a merged element
176 if current_size >= self.min_element_size:
177 merged_element = self._create_merged_element(current_group)
178 merged.append(merged_element)
179 current_group = []
180 current_size = 0
182 # Handle remaining small elements
183 if current_group:
184 merged_element = self._create_merged_element(current_group)
185 merged.append(merged_element)
187 return merged
189 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement:
190 """Create a merged element from a list of small elements.
192 Args:
193 elements: List of elements to merge
195 Returns:
196 Merged code element
197 """
198 if not elements:
199 raise ValueError("Cannot merge empty list of elements")
201 if len(elements) == 1:
202 return elements[0]
204 # Create merged element
205 merged_content = "\n\n".join(element.content for element in elements)
206 merged_names = [element.name for element in elements]
208 merged_element = CodeElement(
209 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})",
210 element_type=CodeElementType.MODULE, # Use module as generic container
211 content=merged_content,
212 start_line=elements[0].start_line,
213 end_line=elements[-1].end_line,
214 level=min(element.level for element in elements),
215 )
217 # Merge dependencies
218 all_dependencies = []
219 for element in elements:
220 all_dependencies.extend(element.dependencies)
221 merged_element.dependencies = list(set(all_dependencies))
223 # Aggregate decorators
224 all_decorators = []
225 for element in elements:
226 all_decorators.extend(element.decorators)
227 merged_element.decorators = list(set(all_decorators))
229 # Set merged element properties
230 merged_element.is_async = any(element.is_async for element in elements)
231 merged_element.is_static = any(element.is_static for element in elements)
232 merged_element.complexity = sum(element.complexity for element in elements)
234 return merged_element
236 def _fallback_text_split(self, content: str) -> list[dict[str, Any]]:
237 """Fallback to simple text-based splitting for large files or parsing failures.
239 Args:
240 content: Source code content
242 Returns:
243 List of section dictionaries
244 """
245 # Split by functions and classes using simple regex patterns
246 import re
248 sections = []
249 lines = content.split("\n")
250 current_section = []
251 current_start_line = 1
253 # Common patterns for different languages
254 function_patterns = [
255 r"^\s*(def\s+\w+|function\s+\w+|func\s+\w+)", # Python, JS, Go
256 r"^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\(", # Java, C#
257 r"^\s*class\s+\w+", # Class definitions
258 ]
260 pattern = "|".join(function_patterns)
262 for i, line in enumerate(lines, 1):
263 if re.match(pattern, line) and current_section:
264 # Start of a new function/class, save current section
265 section_content = "\n".join(current_section)
266 if section_content.strip():
267 sections.append(
268 {
269 "content": section_content,
270 "metadata": {
271 "section_type": "code_block",
272 "element_type": "code_block",
273 "start_line": current_start_line,
274 "end_line": i - 1,
275 "line_count": len(current_section),
276 "parsing_method": "regex_fallback",
277 "language": "unknown",
278 },
279 }
280 )
282 # Start new section
283 current_section = [line]
284 current_start_line = i
285 else:
286 current_section.append(line)
288 # Limit section size to prevent overly large chunks
289 if len("\n".join(current_section)) > self.chunk_size and current_section:
290 section_content = "\n".join(current_section)
291 sections.append(
292 {
293 "content": section_content,
294 "metadata": {
295 "section_type": "code_block",
296 "element_type": "code_block",
297 "start_line": current_start_line,
298 "end_line": i,
299 "line_count": len(current_section),
300 "parsing_method": "regex_fallback",
301 "language": "unknown",
302 },
303 }
304 )
305 current_section = []
306 current_start_line = i + 1
308 # Add remaining content
309 if current_section:
310 section_content = "\n".join(current_section)
311 if section_content.strip():
312 sections.append(
313 {
314 "content": section_content,
315 "metadata": {
316 "section_type": "code_block",
317 "element_type": "code_block",
318 "start_line": current_start_line,
319 "end_line": len(lines),
320 "line_count": len(current_section),
321 "parsing_method": "regex_fallback",
322 "language": "unknown",
323 },
324 }
325 )
327 # If no sections found, return the entire content as one section
328 if not sections:
329 sections.append(
330 {
331 "content": content,
332 "metadata": {
333 "section_type": "code_block",
334 "element_type": "unknown",
335 "start_line": 1,
336 "end_line": len(lines),
337 "line_count": len(lines),
338 "parsing_method": "fallback_single",
339 "language": "unknown",
340 },
341 }
342 )
344 return sections