Coverage for src/qdrant_loader/core/chunking/strategy/code_strategy.py: 81%
343 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Code-specific chunking strategy for programming languages."""
3import ast
4from dataclasses import dataclass, field
5from enum import Enum
6from typing import Any, Optional
8import structlog
10# Tree-sitter imports with error handling
11try:
12 from tree_sitter_languages import get_language, get_parser
14 TREE_SITTER_AVAILABLE = True
15except ImportError:
16 TREE_SITTER_AVAILABLE = False
17 get_language = None
18 get_parser = None
20from qdrant_loader.config import Settings
21from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
22from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
23from qdrant_loader.core.document import Document
25logger = structlog.get_logger(__name__)
27# Performance constants - Universal limits for all code files
28MAX_FILE_SIZE_FOR_AST = (
29 75_000 # 75KB limit for AST parsing (balanced for all languages)
30)
31MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts
32CHUNK_SIZE_THRESHOLD = 40_000 # Files larger than this use simple chunking
33MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth
34MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this
37class CodeElementType(Enum):
38 """Types of code elements."""
40 MODULE = "module"
41 CLASS = "class"
42 FUNCTION = "function"
43 METHOD = "method"
44 PROPERTY = "property"
45 VARIABLE = "variable"
46 IMPORT = "import"
47 COMMENT = "comment"
48 DOCSTRING = "docstring"
49 DECORATOR = "decorator"
50 CONSTANT = "constant"
51 INTERFACE = "interface"
52 ENUM = "enum"
53 STRUCT = "struct"
54 NAMESPACE = "namespace"
55 PACKAGE = "package"
58@dataclass
59class CodeElement:
60 """Represents a code element with its metadata."""
62 name: str
63 element_type: CodeElementType
64 content: str
65 start_line: int
66 end_line: int
67 level: int = 0
68 parent: Optional["CodeElement"] = None
69 children: list["CodeElement"] = field(default_factory=list)
70 docstring: str | None = None
71 decorators: list[str] = field(default_factory=list)
72 parameters: list[str] = field(default_factory=list)
73 return_type: str | None = None
74 visibility: str = "public" # public, private, protected
75 is_async: bool = False
76 is_static: bool = False
77 is_abstract: bool = False
78 complexity: int = 0 # Cyclomatic complexity
79 dependencies: list[str] = field(default_factory=list)
81 def add_child(self, child: "CodeElement"):
82 """Add a child element."""
83 self.children.append(child)
84 child.parent = self
87class CodeChunkingStrategy(BaseChunkingStrategy):
88 """Strategy for chunking code files based on programming language structure.
90 This strategy uses AST parsing (primarily tree-sitter) to split code files into
91 chunks based on semantic code elements, preserving the code structure and hierarchy.
92 """
94 def __init__(self, settings: Settings):
95 """Initialize the code chunking strategy.
97 Args:
98 settings: Configuration settings
99 """
100 super().__init__(settings)
101 self.logger = logger
102 self.progress_tracker = ChunkingProgressTracker(logger)
104 # Language detection patterns
105 self.language_patterns = {
106 ".py": "python",
107 ".pyx": "python",
108 ".pyi": "python",
109 ".java": "java",
110 ".js": "javascript",
111 ".jsx": "javascript",
112 ".mjs": "javascript",
113 ".ts": "typescript",
114 ".tsx": "typescript",
115 ".go": "go",
116 ".rs": "rust",
117 ".cpp": "cpp",
118 ".cc": "cpp",
119 ".cxx": "cpp",
120 ".c": "c",
121 ".h": "c",
122 ".cs": "c_sharp",
123 ".php": "php",
124 ".rb": "ruby",
125 ".kt": "kotlin",
126 ".scala": "scala",
127 ".swift": "swift",
128 ".dart": "dart",
129 }
131 # Cache for Tree-sitter parsers
132 self._parsers = {}
134 # Check tree-sitter availability
135 if not TREE_SITTER_AVAILABLE:
136 self.logger.warning("Tree-sitter not available, will use fallback parsing")
138 def _detect_language(self, file_path: str, content: str) -> str:
139 """Detect programming language from file extension.
141 Args:
142 file_path: Path to the file
143 content: File content (for future content-based detection)
145 Returns:
146 Detected language name or "unknown"
147 """
148 # Get file extension
149 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else ""
151 return self.language_patterns.get(ext, "unknown")
153 def _get_tree_sitter_parser(self, language: str):
154 """Get or create a Tree-sitter parser for the given language.
156 Args:
157 language: Tree-sitter language name
159 Returns:
160 Tree-sitter parser or None if not available
161 """
162 if not TREE_SITTER_AVAILABLE or get_parser is None:
163 return None
165 if language in self._parsers:
166 return self._parsers[language]
168 try:
169 parser = get_parser(language)
170 self._parsers[language] = parser
171 return parser
172 except Exception as e:
173 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}")
174 return None
176 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]:
177 """Parse code using Tree-sitter AST.
179 Args:
180 content: Source code content
181 language: Programming language
183 Returns:
184 List of code elements
185 """
186 # Performance check: universal size limit for all languages
187 if len(content) > MAX_FILE_SIZE_FOR_AST:
188 self.logger.info(
189 f"{language.title()} file too large for AST parsing ({len(content)} bytes), using fallback"
190 )
191 return []
193 parser = self._get_tree_sitter_parser(language)
194 if not parser:
195 return []
197 try:
198 tree = parser.parse(content.encode("utf-8"))
199 root_node = tree.root_node
201 elements = []
202 self._extract_ast_elements(root_node, content, elements, language)
204 # Limit number of elements to prevent timeouts (universal limit)
205 if len(elements) > MAX_ELEMENTS_TO_PROCESS:
206 self.logger.warning(
207 f"Too many {language} elements ({len(elements)}), truncating to {MAX_ELEMENTS_TO_PROCESS}"
208 )
209 elements = elements[:MAX_ELEMENTS_TO_PROCESS]
211 return elements
213 except Exception as e:
214 self.logger.warning(f"Failed to parse with Tree-sitter for {language}: {e}")
215 return []
217 def _extract_ast_elements(
218 self,
219 node,
220 content: str,
221 elements: list[CodeElement],
222 language: str,
223 level: int = 0,
224 ):
225 """Extract code elements from Tree-sitter AST node.
227 Args:
228 node: Tree-sitter AST node
229 content: Source code content
230 elements: List to append elements to
231 language: Programming language
232 level: Nesting level
233 """
234 # Performance check: limit recursion depth
235 if level > MAX_RECURSION_DEPTH: # Prevent deep recursion
236 return
238 # Performance check: limit total elements (universal limit)
239 if len(elements) >= MAX_ELEMENTS_TO_PROCESS:
240 return
242 # Define node types that represent code elements for different languages
243 element_mappings = {
244 "python": {
245 "function_definition": CodeElementType.FUNCTION,
246 "async_function_definition": CodeElementType.FUNCTION,
247 "class_definition": CodeElementType.CLASS,
248 "import_statement": CodeElementType.IMPORT,
249 "import_from_statement": CodeElementType.IMPORT,
250 },
251 "java": {
252 "method_declaration": CodeElementType.METHOD,
253 "constructor_declaration": CodeElementType.METHOD,
254 "class_declaration": CodeElementType.CLASS,
255 "interface_declaration": CodeElementType.INTERFACE,
256 "import_declaration": CodeElementType.IMPORT,
257 },
258 "javascript": {
259 "function_declaration": CodeElementType.FUNCTION,
260 "method_definition": CodeElementType.METHOD,
261 "class_declaration": CodeElementType.CLASS,
262 "import_statement": CodeElementType.IMPORT,
263 "variable_declaration": CodeElementType.VARIABLE,
264 },
265 "typescript": {
266 "function_declaration": CodeElementType.FUNCTION,
267 "method_definition": CodeElementType.METHOD,
268 "class_declaration": CodeElementType.CLASS,
269 "interface_declaration": CodeElementType.INTERFACE,
270 "import_statement": CodeElementType.IMPORT,
271 },
272 "go": {
273 "function_declaration": CodeElementType.FUNCTION,
274 "method_declaration": CodeElementType.METHOD,
275 "type_declaration": CodeElementType.STRUCT,
276 "import_declaration": CodeElementType.IMPORT,
277 },
278 "rust": {
279 "function_item": CodeElementType.FUNCTION,
280 "impl_item": CodeElementType.CLASS,
281 "struct_item": CodeElementType.STRUCT,
282 "enum_item": CodeElementType.ENUM,
283 "trait_item": CodeElementType.INTERFACE,
284 "use_declaration": CodeElementType.IMPORT,
285 },
286 "cpp": {
287 "function_definition": CodeElementType.FUNCTION,
288 "class_specifier": CodeElementType.CLASS,
289 "struct_specifier": CodeElementType.STRUCT,
290 "namespace_definition": CodeElementType.NAMESPACE,
291 "preproc_include": CodeElementType.IMPORT,
292 },
293 "c": {
294 "function_definition": CodeElementType.FUNCTION,
295 "struct_specifier": CodeElementType.STRUCT,
296 "preproc_include": CodeElementType.IMPORT,
297 },
298 }
300 # Get element types for this language
301 lang_elements = element_mappings.get(language, {})
303 # Check if this node represents a code element
304 if node.type in lang_elements:
305 element_type = lang_elements[node.type]
307 # Extract element name
308 name = self._extract_element_name(node, language)
310 # Get node text
311 start_byte = node.start_byte
312 end_byte = node.end_byte
313 element_content = content[start_byte:end_byte]
315 # Skip very large elements to prevent timeouts (universal limit)
316 if len(element_content) > MAX_ELEMENT_SIZE:
317 self.logger.debug(
318 f"Skipping large {language} element {name} ({len(element_content)} bytes)"
319 )
320 return
322 # Create code element
323 element = CodeElement(
324 name=name,
325 element_type=element_type,
326 content=element_content,
327 start_line=node.start_point[0] + 1,
328 end_line=node.end_point[0] + 1,
329 level=level,
330 )
332 # Extract additional metadata (simplified for performance)
333 if element.element_type in [
334 CodeElementType.FUNCTION,
335 CodeElementType.METHOD,
336 ]:
337 params_node = node.child_by_field_name("parameters")
338 if params_node:
339 element.parameters = self._extract_parameters_from_node(params_node)
341 elements.append(element)
343 # Process children with increased level (limited depth)
344 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting
345 for child in node.children:
346 self._extract_ast_elements(
347 child, content, elements, language, level + 1
348 )
349 else:
350 # Process children at same level (limited depth)
351 if level < MAX_RECURSION_DEPTH: # Use full depth limit
352 for child in node.children:
353 self._extract_ast_elements(
354 child, content, elements, language, level
355 )
357 def _extract_element_name(self, node, language: str) -> str:
358 """Extract the name of a code element from Tree-sitter node.
360 Args:
361 node: Tree-sitter AST node
362 language: Programming language
364 Returns:
365 Element name or "unknown"
366 """
367 # Common patterns for finding names in different node types
368 name_fields = ["name", "identifier", "field_identifier"]
370 for field_name in name_fields:
371 name_node = node.child_by_field_name(field_name)
372 if name_node:
373 return name_node.text.decode("utf-8")
375 # Fallback: look for identifier children (limited search)
376 for i, child in enumerate(node.children):
377 if i > 5: # Limit search to first few children
378 break
379 if child.type == "identifier":
380 return child.text.decode("utf-8")
382 return "unknown"
384 def _extract_parameters_from_node(self, params_node) -> list[str]:
385 """Extract parameter names from a parameters node.
387 Args:
388 params_node: Tree-sitter parameters node
390 Returns:
391 List of parameter names
392 """
393 parameters = []
394 for i, child in enumerate(params_node.children):
395 if i > 20: # Limit to prevent timeouts
396 break
397 if child.type in ["identifier", "parameter", "typed_parameter"]:
398 if child.type == "identifier":
399 parameters.append(child.text.decode("utf-8"))
400 else:
401 # Look for identifier within parameter
402 for subchild in child.children:
403 if subchild.type == "identifier":
404 parameters.append(subchild.text.decode("utf-8"))
405 break
406 return parameters
408 def _parse_python_ast(self, content: str) -> list[CodeElement]:
409 """Parse Python code using Python's built-in AST as fallback.
411 Args:
412 content: Python source code
414 Returns:
415 List of code elements
416 """
417 # Performance check: skip AST parsing for very large files
418 if len(content) > MAX_FILE_SIZE_FOR_AST:
419 self.logger.info(
420 f"Python file too large for AST parsing ({len(content)} bytes)"
421 )
422 return []
424 elements = []
426 try:
427 tree = ast.parse(content)
428 except SyntaxError as e:
429 self.logger.warning(f"Failed to parse Python AST: {e}")
430 return []
432 def extract_docstring(node) -> str | None:
433 """Extract docstring from a node."""
434 if (
435 isinstance(node, ast.FunctionDef | ast.ClassDef | ast.AsyncFunctionDef)
436 and node.body
437 and isinstance(node.body[0], ast.Expr)
438 and isinstance(node.body[0].value, ast.Constant)
439 and isinstance(node.body[0].value.value, str)
440 ):
441 return node.body[0].value.value
442 return None
444 def get_decorators(node) -> list[str]:
445 """Extract decorator names from a node."""
446 decorators = []
447 if hasattr(node, "decorator_list"):
448 for decorator in node.decorator_list[:5]: # Limit decorators
449 if isinstance(decorator, ast.Name):
450 decorators.append(decorator.id)
451 elif isinstance(decorator, ast.Attribute):
452 decorators.append(f"{decorator.attr}")
453 return decorators
455 def get_parameters(node) -> list[str]:
456 """Extract parameter names from a function node."""
457 if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
458 return []
460 params = []
461 for arg in node.args.args[:20]: # Limit parameters
462 params.append(arg.arg)
463 return params
465 def visit_node(node, level=0, parent_element=None):
466 """Recursively visit AST nodes."""
467 # Performance checks
468 if level > MAX_RECURSION_DEPTH: # Limit recursion depth
469 return
470 if len(elements) >= MAX_ELEMENTS_TO_PROCESS:
471 return
473 element = None
475 if isinstance(node, ast.ClassDef):
476 element = CodeElement(
477 name=node.name,
478 element_type=CodeElementType.CLASS,
479 content=ast.get_source_segment(content, node) or "",
480 start_line=node.lineno,
481 end_line=node.end_lineno or node.lineno,
482 level=level,
483 docstring=extract_docstring(node),
484 decorators=get_decorators(node),
485 )
487 elif isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
488 element_type = (
489 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION
490 )
491 element = CodeElement(
492 name=node.name,
493 element_type=element_type,
494 content=ast.get_source_segment(content, node) or "",
495 start_line=node.lineno,
496 end_line=node.end_lineno or node.lineno,
497 level=level,
498 docstring=extract_docstring(node),
499 decorators=get_decorators(node),
500 parameters=get_parameters(node),
501 is_async=isinstance(node, ast.AsyncFunctionDef),
502 )
504 elif isinstance(node, ast.Import | ast.ImportFrom):
505 import_names = []
506 if isinstance(node, ast.Import):
507 import_names = [
508 alias.name for alias in node.names[:10]
509 ] # Limit imports
510 else:
511 module = node.module or ""
512 import_names = [
513 f"{module}.{alias.name}" for alias in node.names[:10]
514 ]
516 element = CodeElement(
517 name=", ".join(import_names),
518 element_type=CodeElementType.IMPORT,
519 content=ast.get_source_segment(content, node) or "",
520 start_line=node.lineno,
521 end_line=node.end_lineno or node.lineno,
522 level=level,
523 dependencies=import_names,
524 )
526 if element:
527 # Skip very large elements
528 if len(element.content) > MAX_ELEMENT_SIZE:
529 return
531 if parent_element:
532 parent_element.add_child(element)
533 else:
534 elements.append(element)
536 # Recursively process children (limited depth)
537 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting
538 for child in ast.iter_child_nodes(node):
539 visit_node(child, level + 1, element)
540 else:
541 # For nodes we don't handle, still process their children (limited depth)
542 if level < MAX_RECURSION_DEPTH: # Use full depth limit
543 for child in ast.iter_child_nodes(node):
544 visit_node(child, level, parent_element)
546 # Start processing from the root
547 for node in ast.iter_child_nodes(tree):
548 visit_node(node)
550 return elements
552 def _extract_code_metadata(
553 self, element: CodeElement, language: str
554 ) -> dict[str, Any]:
555 """Extract metadata from a code element.
557 Args:
558 element: The code element to analyze
559 language: Programming language
561 Returns:
562 Dictionary containing element metadata
563 """
564 metadata = {
565 "element_type": element.element_type.value,
566 "name": element.name,
567 "language": language,
568 "start_line": element.start_line,
569 "end_line": element.end_line,
570 "line_count": element.end_line - element.start_line + 1,
571 "level": element.level,
572 "visibility": element.visibility,
573 "is_async": element.is_async,
574 "is_static": element.is_static,
575 "is_abstract": element.is_abstract,
576 "complexity": element.complexity,
577 "has_docstring": element.docstring is not None,
578 "docstring_length": len(element.docstring) if element.docstring else 0,
579 "parameter_count": len(element.parameters),
580 "decorator_count": len(element.decorators),
581 "child_count": len(element.children),
582 "dependency_count": len(element.dependencies),
583 }
585 # Add specific metadata based on element type
586 if element.element_type in [CodeElementType.FUNCTION, CodeElementType.METHOD]:
587 metadata.update(
588 {
589 "parameters": element.parameters,
590 "return_type": element.return_type,
591 "decorators": element.decorators,
592 }
593 )
595 if element.element_type == CodeElementType.CLASS:
596 metadata.update(
597 {
598 "method_count": len(
599 [
600 c
601 for c in element.children
602 if c.element_type == CodeElementType.METHOD
603 ]
604 ),
605 "property_count": len(
606 [
607 c
608 for c in element.children
609 if c.element_type == CodeElementType.PROPERTY
610 ]
611 ),
612 }
613 )
615 if element.element_type == CodeElementType.IMPORT:
616 metadata.update({"dependencies": element.dependencies})
618 # Add parent context
619 if element.parent:
620 metadata.update(
621 {
622 "parent_name": element.parent.name,
623 "parent_type": element.parent.element_type.value,
624 "parent_level": element.parent.level,
625 }
626 )
628 return metadata
630 def _merge_small_elements(
631 self, elements: list[CodeElement], min_size: int = 200
632 ) -> list[CodeElement]:
633 """Merge small code elements to create more meaningful chunks.
635 Args:
636 elements: List of code elements
637 min_size: Minimum size for standalone elements
639 Returns:
640 List of merged elements
641 """
642 if not elements:
643 return []
645 merged = []
646 current_group = []
647 current_size = 0
649 for element in elements:
650 element_size = len(element.content)
652 # If element is large enough or is a significant code structure, keep it separate
653 if (
654 element_size >= min_size
655 or element.element_type
656 in [CodeElementType.CLASS, CodeElementType.FUNCTION]
657 or (
658 element.element_type == CodeElementType.METHOD
659 and element_size > 100
660 )
661 ):
662 # First, add any accumulated small elements
663 if current_group:
664 merged_element = self._create_merged_element(current_group)
665 merged.append(merged_element)
666 current_group = []
667 current_size = 0
669 # Add the large element
670 merged.append(element)
671 else:
672 # Accumulate small elements
673 current_group.append(element)
674 current_size += element_size
676 # If accumulated size is large enough, create a merged element
677 if current_size >= min_size:
678 merged_element = self._create_merged_element(current_group)
679 merged.append(merged_element)
680 current_group = []
681 current_size = 0
683 # Handle remaining small elements
684 if current_group:
685 merged_element = self._create_merged_element(current_group)
686 merged.append(merged_element)
688 return merged
690 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement:
691 """Create a merged element from a list of small elements.
693 Args:
694 elements: List of elements to merge
696 Returns:
697 Merged code element
698 """
699 if not elements:
700 raise ValueError("Cannot merge empty list of elements")
702 if len(elements) == 1:
703 return elements[0]
705 # Create merged element
706 merged_content = "\n\n".join(element.content for element in elements)
707 merged_names = [element.name for element in elements]
709 merged_element = CodeElement(
710 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})",
711 element_type=CodeElementType.MODULE, # Use module as generic container
712 content=merged_content,
713 start_line=elements[0].start_line,
714 end_line=elements[-1].end_line,
715 level=min(element.level for element in elements),
716 )
718 # Merge dependencies
719 all_dependencies = []
720 for element in elements:
721 all_dependencies.extend(element.dependencies)
722 merged_element.dependencies = list(set(all_dependencies))
724 return merged_element
726 def _split_text(self, content: str) -> list[dict[str, Any]]:
727 """Split code content into chunks based on programming language structure.
729 Args:
730 content: The code content to split
732 Returns:
733 List of dictionaries with chunk content and metadata
734 """
735 # This method is required by the base class but not used in our implementation
736 # We override chunk_document instead
737 return [{"content": content, "metadata": {"element_type": "unknown"}}]
739 def chunk_document(self, document: Document) -> list[Document]:
740 """Chunk a code document using AST parsing.
742 Args:
743 document: Document to chunk
745 Returns:
746 List of chunked documents
747 """
748 file_name = (
749 document.metadata.get("file_name")
750 or document.metadata.get("original_filename")
751 or document.title
752 or f"{document.source_type}:{document.source}"
753 )
755 # Start progress tracking
756 self.progress_tracker.start_chunking(
757 document.id,
758 document.source,
759 document.source_type,
760 len(document.content),
761 file_name,
762 )
764 try:
765 # Detect language from file path first for language-specific optimizations
766 file_path = document.metadata.get("file_name", "") or document.source
767 language = self._detect_language(file_path, document.content)
769 # Performance check: universal threshold for all code files
770 if len(document.content) > CHUNK_SIZE_THRESHOLD:
771 self.progress_tracker.log_fallback(
772 document.id,
773 f"Large {language} file ({len(document.content)} bytes)",
774 )
775 return self._fallback_chunking(document)
777 self.logger.debug(f"Detected language: {language}")
779 # Parse code structure using AST
780 elements = []
781 parsing_method = "unknown"
783 if language == "python":
784 # Try Python AST first for Python files
785 self.logger.debug("Parsing Python with built-in AST")
786 elements = self._parse_python_ast(document.content)
787 parsing_method = "python_ast"
789 # Fallback to tree-sitter if Python AST fails
790 if not elements and TREE_SITTER_AVAILABLE:
791 self.logger.debug("Falling back to Tree-sitter for Python")
792 elements = self._parse_with_tree_sitter(document.content, language)
793 parsing_method = "tree_sitter"
794 elif language != "unknown" and TREE_SITTER_AVAILABLE:
795 # Use tree-sitter for other supported languages
796 self.logger.debug(f"Parsing {language} with Tree-sitter")
797 elements = self._parse_with_tree_sitter(document.content, language)
798 parsing_method = "tree_sitter"
800 if not elements:
801 self.progress_tracker.log_fallback(
802 document.id, f"No {language} elements found"
803 )
804 return self._fallback_chunking(document)
806 # Merge small elements to optimize chunk size
807 final_elements = self._merge_small_elements(elements)
808 if len(final_elements) > 100: # Limit total chunks
809 final_elements = final_elements[:100]
811 # Create chunked documents
812 chunked_docs = []
813 for i, element in enumerate(final_elements):
814 self.logger.debug(
815 f"Processing element {i+1}/{len(final_elements)}",
816 extra={
817 "element_name": element.name,
818 "element_type": element.element_type.value,
819 "content_size": len(element.content),
820 },
821 )
823 # Create chunk document with optimized metadata processing
824 chunk_doc = self._create_chunk_document(
825 original_doc=document,
826 chunk_content=element.content,
827 chunk_index=i,
828 total_chunks=len(final_elements),
829 skip_nlp=False,
830 )
832 # Add code-specific metadata
833 code_metadata = self._extract_code_metadata(element, language)
834 code_metadata["parsing_method"] = parsing_method
835 code_metadata["chunking_strategy"] = "code"
836 code_metadata["parent_document_id"] = document.id
837 chunk_doc.metadata.update(code_metadata)
839 chunked_docs.append(chunk_doc)
841 # Finish progress tracking
842 self.progress_tracker.finish_chunking(
843 document.id, len(chunked_docs), f"code ({language})"
844 )
845 return chunked_docs
847 except Exception as e:
848 self.progress_tracker.log_error(document.id, str(e))
849 # Fallback to default chunking
850 self.progress_tracker.log_fallback(
851 document.id, f"Code parsing failed: {str(e)}"
852 )
853 return self._fallback_chunking(document)
855 def _fallback_chunking(self, document: Document) -> list[Document]:
856 """Fallback to simple text-based chunking when AST parsing fails.
858 Args:
859 document: Document to chunk
861 Returns:
862 List of chunked documents
863 """
864 self.logger.warning("Falling back to simple text chunking for code document")
866 # Use simple line-based splitting for code (optimized)
867 lines = document.content.split("\n")
868 chunks = []
869 current_chunk = []
870 current_size = 0
872 for line in lines:
873 line_size = len(line) + 1 # +1 for newline
875 if current_size + line_size > self.chunk_size and current_chunk:
876 chunks.append("\n".join(current_chunk))
877 current_chunk = [line]
878 current_size = line_size
879 else:
880 current_chunk.append(line)
881 current_size += line_size
883 # Add remaining lines
884 if current_chunk:
885 chunks.append("\n".join(current_chunk))
887 # Create chunk documents (limited)
888 chunked_docs = []
889 for i, chunk_content in enumerate(chunks[:50]): # Limit chunks
890 chunk_doc = self._create_chunk_document(
891 original_doc=document,
892 chunk_content=chunk_content,
893 chunk_index=i,
894 total_chunks=len(chunks),
895 )
897 chunk_doc.id = Document.generate_chunk_id(document.id, i)
898 chunk_doc.metadata["parent_document_id"] = document.id
899 chunk_doc.metadata["chunking_method"] = "fallback_text"
901 chunked_docs.append(chunk_doc)
903 return chunked_docs