Coverage for src/qdrant_loader/core/chunking/strategy/code/code_section_splitter.py: 89%
113 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Code section splitter for intelligent code element extraction and merging."""
3from typing import Any
5import structlog
7from qdrant_loader.core.chunking.strategy.base.section_splitter import (
8 BaseSectionSplitter,
9)
10from qdrant_loader.core.document import Document
12from .code_document_parser import CodeDocumentParser, CodeElement, CodeElementType
14logger = structlog.get_logger(__name__)
17class CodeSectionSplitter(BaseSectionSplitter):
18 """Section splitter for code documents with intelligent element merging."""
20 def __init__(self, settings):
21 """Initialize the code section splitter.
23 Args:
24 settings: Configuration settings
25 """
26 super().__init__(settings)
27 self.logger = logger
28 self.document_parser = CodeDocumentParser(settings)
30 # Code-specific configuration
31 self.code_config = getattr(
32 settings.global_config.chunking.strategies, "code", None
33 )
34 self.chunk_size_threshold = getattr(
35 self.code_config, "max_file_size_for_ast", 40000
36 )
37 self.min_element_size = max(
38 100, self.chunk_size // 10
39 ) # Minimum size for standalone elements
41 def split_sections(
42 self, content: str, document: Document = None
43 ) -> list[dict[str, Any]]:
44 """Split code content into sections based on programming language structure.
46 Args:
47 content: Source code content
48 document: Document being processed (for metadata)
50 Returns:
51 List of section dictionaries with content and metadata
52 """
53 if not content.strip():
54 return [
55 {
56 "content": content,
57 "metadata": {
58 "section_type": "empty",
59 "element_type": "empty",
60 "language": "unknown",
61 "parsing_method": "none",
62 },
63 }
64 ]
66 # Performance check: use simple splitting for very large files
67 if len(content) > self.chunk_size_threshold:
68 self.logger.info(
69 f"Code file too large ({len(content)} bytes), using simple text-based splitting"
70 )
71 return self._fallback_text_split(content)
73 # Detect language from document metadata or filename
74 language = "unknown"
75 if document:
76 file_path = (
77 document.metadata.get("file_name")
78 or document.source
79 or document.title
80 or ""
81 )
82 language = self.document_parser.detect_language(file_path, content)
84 # Parse code elements using AST
85 elements = self.document_parser.parse_code_elements(content, language)
87 if not elements:
88 self.logger.debug(f"No {language} elements found, using fallback splitting")
89 return self._fallback_text_split(content)
91 # Merge small elements to optimize chunk sizes
92 merged_elements = self._merge_small_elements(elements)
94 # Limit total number of sections
95 if len(merged_elements) > self.max_chunks_per_document:
96 self.logger.warning(
97 f"Too many code elements ({len(merged_elements)}), "
98 f"limiting to {self.max_chunks_per_document}"
99 )
100 merged_elements = merged_elements[: self.max_chunks_per_document]
102 # Convert elements to section dictionaries
103 sections = []
104 for i, element in enumerate(merged_elements):
105 section_metadata = self.document_parser.extract_section_metadata(element)
106 section_metadata.update(
107 {
108 "section_index": i,
109 "language": language,
110 "parsing_method": "ast",
111 "section_type": "code_element",
112 }
113 )
115 sections.append({"content": element.content, "metadata": section_metadata})
117 self.logger.debug(
118 f"Split {language} code into {len(sections)} sections using AST parsing"
119 )
121 return sections
123 def _merge_small_elements(self, elements: list[CodeElement]) -> list[CodeElement]:
124 """Merge small elements to optimize chunk sizes.
126 Args:
127 elements: List of code elements to merge
129 Returns:
130 List of merged elements optimized for chunk size
131 """
132 if not elements:
133 return []
135 merged = []
136 current_group = []
137 current_size = 0
139 for element in elements:
140 element_size = len(element.content)
142 # If element is large enough or is a significant code structure, keep it separate
143 if (
144 element_size >= self.min_element_size
145 or element.element_type
146 in [
147 CodeElementType.CLASS,
148 CodeElementType.FUNCTION,
149 CodeElementType.INTERFACE,
150 CodeElementType.ENUM,
151 ]
152 or (
153 element.element_type == CodeElementType.METHOD
154 and element_size > 100
155 )
156 ):
157 # First, add any accumulated small elements
158 if current_group:
159 merged_element = self._create_merged_element(current_group)
160 merged.append(merged_element)
161 current_group = []
162 current_size = 0
164 # Add the large element
165 merged.append(element)
166 else:
167 # Accumulate small elements
168 current_group.append(element)
169 current_size += element_size
171 # If accumulated size is large enough, create a merged element
172 if current_size >= self.min_element_size:
173 merged_element = self._create_merged_element(current_group)
174 merged.append(merged_element)
175 current_group = []
176 current_size = 0
178 # Handle remaining small elements
179 if current_group:
180 merged_element = self._create_merged_element(current_group)
181 merged.append(merged_element)
183 return merged
185 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement:
186 """Create a merged element from a list of small elements.
188 Args:
189 elements: List of elements to merge
191 Returns:
192 Merged code element
193 """
194 if not elements:
195 raise ValueError("Cannot merge empty list of elements")
197 if len(elements) == 1:
198 return elements[0]
200 # Create merged element
201 merged_content = "\n\n".join(element.content for element in elements)
202 merged_names = [element.name for element in elements]
204 merged_element = CodeElement(
205 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})",
206 element_type=CodeElementType.MODULE, # Use module as generic container
207 content=merged_content,
208 start_line=elements[0].start_line,
209 end_line=elements[-1].end_line,
210 level=min(element.level for element in elements),
211 )
213 # Merge dependencies
214 all_dependencies = []
215 for element in elements:
216 all_dependencies.extend(element.dependencies)
217 merged_element.dependencies = list(set(all_dependencies))
219 # Aggregate decorators
220 all_decorators = []
221 for element in elements:
222 all_decorators.extend(element.decorators)
223 merged_element.decorators = list(set(all_decorators))
225 # Set merged element properties
226 merged_element.is_async = any(element.is_async for element in elements)
227 merged_element.is_static = any(element.is_static for element in elements)
228 merged_element.complexity = sum(element.complexity for element in elements)
230 return merged_element
232 def _fallback_text_split(self, content: str) -> list[dict[str, Any]]:
233 """Fallback to simple text-based splitting for large files or parsing failures.
235 Args:
236 content: Source code content
238 Returns:
239 List of section dictionaries
240 """
241 # Split by functions and classes using simple regex patterns
242 import re
244 sections = []
245 lines = content.split("\n")
246 current_section = []
247 current_start_line = 1
249 # Common patterns for different languages
250 function_patterns = [
251 r"^\s*(def\s+\w+|function\s+\w+|func\s+\w+)", # Python, JS, Go
252 r"^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\(", # Java, C#
253 r"^\s*class\s+\w+", # Class definitions
254 ]
256 pattern = "|".join(function_patterns)
258 for i, line in enumerate(lines, 1):
259 if re.match(pattern, line) and current_section:
260 # Start of a new function/class, save current section
261 section_content = "\n".join(current_section)
262 if section_content.strip():
263 sections.append(
264 {
265 "content": section_content,
266 "metadata": {
267 "section_type": "code_block",
268 "element_type": "code_block",
269 "start_line": current_start_line,
270 "end_line": i - 1,
271 "line_count": len(current_section),
272 "parsing_method": "regex_fallback",
273 "language": "unknown",
274 },
275 }
276 )
278 # Start new section
279 current_section = [line]
280 current_start_line = i
281 else:
282 current_section.append(line)
284 # Limit section size to prevent overly large chunks
285 if len("\n".join(current_section)) > self.chunk_size and current_section:
286 section_content = "\n".join(current_section)
287 sections.append(
288 {
289 "content": section_content,
290 "metadata": {
291 "section_type": "code_block",
292 "element_type": "code_block",
293 "start_line": current_start_line,
294 "end_line": i,
295 "line_count": len(current_section),
296 "parsing_method": "regex_fallback",
297 "language": "unknown",
298 },
299 }
300 )
301 current_section = []
302 current_start_line = i + 1
304 # Add remaining content
305 if current_section:
306 section_content = "\n".join(current_section)
307 if section_content.strip():
308 sections.append(
309 {
310 "content": section_content,
311 "metadata": {
312 "section_type": "code_block",
313 "element_type": "code_block",
314 "start_line": current_start_line,
315 "end_line": len(lines),
316 "line_count": len(current_section),
317 "parsing_method": "regex_fallback",
318 "language": "unknown",
319 },
320 }
321 )
323 # If no sections found, return the entire content as one section
324 if not sections:
325 sections.append(
326 {
327 "content": content,
328 "metadata": {
329 "section_type": "code_block",
330 "element_type": "unknown",
331 "start_line": 1,
332 "end_line": len(lines),
333 "line_count": len(lines),
334 "parsing_method": "fallback_single",
335 "language": "unknown",
336 },
337 }
338 )
340 return sections