Coverage for src/qdrant_loader/core/chunking/strategy/code_strategy.py: 81%
344 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Code-specific chunking strategy for programming languages."""
3import ast
4import re
5from dataclasses import dataclass, field
6from enum import Enum
7from typing import Any, Optional
9import structlog
11# Tree-sitter imports with error handling
12try:
13 from tree_sitter_languages import get_language, get_parser
15 TREE_SITTER_AVAILABLE = True
16except ImportError:
17 TREE_SITTER_AVAILABLE = False
18 get_language = None
19 get_parser = None
21from qdrant_loader.config import Settings
22from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
23from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
24from qdrant_loader.core.document import Document
26logger = structlog.get_logger(__name__)
28# Performance constants - Universal limits for all code files
29MAX_FILE_SIZE_FOR_AST = (
30 75_000 # 75KB limit for AST parsing (balanced for all languages)
31)
32MAX_ELEMENTS_TO_PROCESS = 800 # Limit number of elements to prevent timeouts
33CHUNK_SIZE_THRESHOLD = 40_000 # Files larger than this use simple chunking
34MAX_RECURSION_DEPTH = 8 # Limit AST recursion depth
35MAX_ELEMENT_SIZE = 20_000 # Skip individual elements larger than this
38class CodeElementType(Enum):
39 """Types of code elements."""
41 MODULE = "module"
42 CLASS = "class"
43 FUNCTION = "function"
44 METHOD = "method"
45 PROPERTY = "property"
46 VARIABLE = "variable"
47 IMPORT = "import"
48 COMMENT = "comment"
49 DOCSTRING = "docstring"
50 DECORATOR = "decorator"
51 CONSTANT = "constant"
52 INTERFACE = "interface"
53 ENUM = "enum"
54 STRUCT = "struct"
55 NAMESPACE = "namespace"
56 PACKAGE = "package"
59@dataclass
60class CodeElement:
61 """Represents a code element with its metadata."""
63 name: str
64 element_type: CodeElementType
65 content: str
66 start_line: int
67 end_line: int
68 level: int = 0
69 parent: Optional["CodeElement"] = None
70 children: list["CodeElement"] = field(default_factory=list)
71 docstring: str | None = None
72 decorators: list[str] = field(default_factory=list)
73 parameters: list[str] = field(default_factory=list)
74 return_type: str | None = None
75 visibility: str = "public" # public, private, protected
76 is_async: bool = False
77 is_static: bool = False
78 is_abstract: bool = False
79 complexity: int = 0 # Cyclomatic complexity
80 dependencies: list[str] = field(default_factory=list)
82 def add_child(self, child: "CodeElement"):
83 """Add a child element."""
84 self.children.append(child)
85 child.parent = self
88class CodeChunkingStrategy(BaseChunkingStrategy):
89 """Strategy for chunking code files based on programming language structure.
91 This strategy uses AST parsing (primarily tree-sitter) to split code files into
92 chunks based on semantic code elements, preserving the code structure and hierarchy.
93 """
95 def __init__(self, settings: Settings):
96 """Initialize the code chunking strategy.
98 Args:
99 settings: Configuration settings
100 """
101 super().__init__(settings)
102 self.logger = logger
103 self.progress_tracker = ChunkingProgressTracker(logger)
105 # Language detection patterns
106 self.language_patterns = {
107 ".py": "python",
108 ".pyx": "python",
109 ".pyi": "python",
110 ".java": "java",
111 ".js": "javascript",
112 ".jsx": "javascript",
113 ".mjs": "javascript",
114 ".ts": "typescript",
115 ".tsx": "typescript",
116 ".go": "go",
117 ".rs": "rust",
118 ".cpp": "cpp",
119 ".cc": "cpp",
120 ".cxx": "cpp",
121 ".c": "c",
122 ".h": "c",
123 ".cs": "c_sharp",
124 ".php": "php",
125 ".rb": "ruby",
126 ".kt": "kotlin",
127 ".scala": "scala",
128 ".swift": "swift",
129 ".dart": "dart",
130 }
132 # Cache for Tree-sitter parsers
133 self._parsers = {}
135 # Check tree-sitter availability
136 if not TREE_SITTER_AVAILABLE:
137 self.logger.warning("Tree-sitter not available, will use fallback parsing")
139 def _detect_language(self, file_path: str, content: str) -> str:
140 """Detect programming language from file extension.
142 Args:
143 file_path: Path to the file
144 content: File content (for future content-based detection)
146 Returns:
147 Detected language name or "unknown"
148 """
149 # Get file extension
150 ext = f".{file_path.lower().split('.')[-1]}" if "." in file_path else ""
152 return self.language_patterns.get(ext, "unknown")
154 def _get_tree_sitter_parser(self, language: str):
155 """Get or create a Tree-sitter parser for the given language.
157 Args:
158 language: Tree-sitter language name
160 Returns:
161 Tree-sitter parser or None if not available
162 """
163 if not TREE_SITTER_AVAILABLE or get_parser is None:
164 return None
166 if language in self._parsers:
167 return self._parsers[language]
169 try:
170 parser = get_parser(language)
171 self._parsers[language] = parser
172 return parser
173 except Exception as e:
174 self.logger.warning(f"Failed to get Tree-sitter parser for {language}: {e}")
175 return None
177 def _parse_with_tree_sitter(self, content: str, language: str) -> list[CodeElement]:
178 """Parse code using Tree-sitter AST.
180 Args:
181 content: Source code content
182 language: Programming language
184 Returns:
185 List of code elements
186 """
187 # Performance check: universal size limit for all languages
188 if len(content) > MAX_FILE_SIZE_FOR_AST:
189 self.logger.info(
190 f"{language.title()} file too large for AST parsing ({len(content)} bytes), using fallback"
191 )
192 return []
194 parser = self._get_tree_sitter_parser(language)
195 if not parser:
196 return []
198 try:
199 tree = parser.parse(content.encode("utf-8"))
200 root_node = tree.root_node
202 elements = []
203 self._extract_ast_elements(root_node, content, elements, language)
205 # Limit number of elements to prevent timeouts (universal limit)
206 if len(elements) > MAX_ELEMENTS_TO_PROCESS:
207 self.logger.warning(
208 f"Too many {language} elements ({len(elements)}), truncating to {MAX_ELEMENTS_TO_PROCESS}"
209 )
210 elements = elements[:MAX_ELEMENTS_TO_PROCESS]
212 return elements
214 except Exception as e:
215 self.logger.warning(f"Failed to parse with Tree-sitter for {language}: {e}")
216 return []
218 def _extract_ast_elements(
219 self,
220 node,
221 content: str,
222 elements: list[CodeElement],
223 language: str,
224 level: int = 0,
225 ):
226 """Extract code elements from Tree-sitter AST node.
228 Args:
229 node: Tree-sitter AST node
230 content: Source code content
231 elements: List to append elements to
232 language: Programming language
233 level: Nesting level
234 """
235 # Performance check: limit recursion depth
236 if level > MAX_RECURSION_DEPTH: # Prevent deep recursion
237 return
239 # Performance check: limit total elements (universal limit)
240 if len(elements) >= MAX_ELEMENTS_TO_PROCESS:
241 return
243 # Define node types that represent code elements for different languages
244 element_mappings = {
245 "python": {
246 "function_definition": CodeElementType.FUNCTION,
247 "async_function_definition": CodeElementType.FUNCTION,
248 "class_definition": CodeElementType.CLASS,
249 "import_statement": CodeElementType.IMPORT,
250 "import_from_statement": CodeElementType.IMPORT,
251 },
252 "java": {
253 "method_declaration": CodeElementType.METHOD,
254 "constructor_declaration": CodeElementType.METHOD,
255 "class_declaration": CodeElementType.CLASS,
256 "interface_declaration": CodeElementType.INTERFACE,
257 "import_declaration": CodeElementType.IMPORT,
258 },
259 "javascript": {
260 "function_declaration": CodeElementType.FUNCTION,
261 "method_definition": CodeElementType.METHOD,
262 "class_declaration": CodeElementType.CLASS,
263 "import_statement": CodeElementType.IMPORT,
264 "variable_declaration": CodeElementType.VARIABLE,
265 },
266 "typescript": {
267 "function_declaration": CodeElementType.FUNCTION,
268 "method_definition": CodeElementType.METHOD,
269 "class_declaration": CodeElementType.CLASS,
270 "interface_declaration": CodeElementType.INTERFACE,
271 "import_statement": CodeElementType.IMPORT,
272 },
273 "go": {
274 "function_declaration": CodeElementType.FUNCTION,
275 "method_declaration": CodeElementType.METHOD,
276 "type_declaration": CodeElementType.STRUCT,
277 "import_declaration": CodeElementType.IMPORT,
278 },
279 "rust": {
280 "function_item": CodeElementType.FUNCTION,
281 "impl_item": CodeElementType.CLASS,
282 "struct_item": CodeElementType.STRUCT,
283 "enum_item": CodeElementType.ENUM,
284 "trait_item": CodeElementType.INTERFACE,
285 "use_declaration": CodeElementType.IMPORT,
286 },
287 "cpp": {
288 "function_definition": CodeElementType.FUNCTION,
289 "class_specifier": CodeElementType.CLASS,
290 "struct_specifier": CodeElementType.STRUCT,
291 "namespace_definition": CodeElementType.NAMESPACE,
292 "preproc_include": CodeElementType.IMPORT,
293 },
294 "c": {
295 "function_definition": CodeElementType.FUNCTION,
296 "struct_specifier": CodeElementType.STRUCT,
297 "preproc_include": CodeElementType.IMPORT,
298 },
299 }
301 # Get element types for this language
302 lang_elements = element_mappings.get(language, {})
304 # Check if this node represents a code element
305 if node.type in lang_elements:
306 element_type = lang_elements[node.type]
308 # Extract element name
309 name = self._extract_element_name(node, language)
311 # Get node text
312 start_byte = node.start_byte
313 end_byte = node.end_byte
314 element_content = content[start_byte:end_byte]
316 # Skip very large elements to prevent timeouts (universal limit)
317 if len(element_content) > MAX_ELEMENT_SIZE:
318 self.logger.debug(
319 f"Skipping large {language} element {name} ({len(element_content)} bytes)"
320 )
321 return
323 # Create code element
324 element = CodeElement(
325 name=name,
326 element_type=element_type,
327 content=element_content,
328 start_line=node.start_point[0] + 1,
329 end_line=node.end_point[0] + 1,
330 level=level,
331 )
333 # Extract additional metadata (simplified for performance)
334 if element.element_type in [
335 CodeElementType.FUNCTION,
336 CodeElementType.METHOD,
337 ]:
338 params_node = node.child_by_field_name("parameters")
339 if params_node:
340 element.parameters = self._extract_parameters_from_node(params_node)
342 elements.append(element)
344 # Process children with increased level (limited depth)
345 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting
346 for child in node.children:
347 self._extract_ast_elements(
348 child, content, elements, language, level + 1
349 )
350 else:
351 # Process children at same level (limited depth)
352 if level < MAX_RECURSION_DEPTH: # Use full depth limit
353 for child in node.children:
354 self._extract_ast_elements(
355 child, content, elements, language, level
356 )
358 def _extract_element_name(self, node, language: str) -> str:
359 """Extract the name of a code element from Tree-sitter node.
361 Args:
362 node: Tree-sitter AST node
363 language: Programming language
365 Returns:
366 Element name or "unknown"
367 """
368 # Common patterns for finding names in different node types
369 name_fields = ["name", "identifier", "field_identifier"]
371 for field_name in name_fields:
372 name_node = node.child_by_field_name(field_name)
373 if name_node:
374 return name_node.text.decode("utf-8")
376 # Fallback: look for identifier children (limited search)
377 for i, child in enumerate(node.children):
378 if i > 5: # Limit search to first few children
379 break
380 if child.type == "identifier":
381 return child.text.decode("utf-8")
383 return "unknown"
385 def _extract_parameters_from_node(self, params_node) -> list[str]:
386 """Extract parameter names from a parameters node.
388 Args:
389 params_node: Tree-sitter parameters node
391 Returns:
392 List of parameter names
393 """
394 parameters = []
395 for i, child in enumerate(params_node.children):
396 if i > 20: # Limit to prevent timeouts
397 break
398 if child.type in ["identifier", "parameter", "typed_parameter"]:
399 if child.type == "identifier":
400 parameters.append(child.text.decode("utf-8"))
401 else:
402 # Look for identifier within parameter
403 for subchild in child.children:
404 if subchild.type == "identifier":
405 parameters.append(subchild.text.decode("utf-8"))
406 break
407 return parameters
409 def _parse_python_ast(self, content: str) -> list[CodeElement]:
410 """Parse Python code using Python's built-in AST as fallback.
412 Args:
413 content: Python source code
415 Returns:
416 List of code elements
417 """
418 # Performance check: skip AST parsing for very large files
419 if len(content) > MAX_FILE_SIZE_FOR_AST:
420 self.logger.info(
421 f"Python file too large for AST parsing ({len(content)} bytes)"
422 )
423 return []
425 elements = []
427 try:
428 tree = ast.parse(content)
429 except SyntaxError as e:
430 self.logger.warning(f"Failed to parse Python AST: {e}")
431 return []
433 def extract_docstring(node) -> str | None:
434 """Extract docstring from a node."""
435 if (
436 isinstance(node, ast.FunctionDef | ast.ClassDef | ast.AsyncFunctionDef)
437 and node.body
438 and isinstance(node.body[0], ast.Expr)
439 and isinstance(node.body[0].value, ast.Constant)
440 and isinstance(node.body[0].value.value, str)
441 ):
442 return node.body[0].value.value
443 return None
445 def get_decorators(node) -> list[str]:
446 """Extract decorator names from a node."""
447 decorators = []
448 if hasattr(node, "decorator_list"):
449 for decorator in node.decorator_list[:5]: # Limit decorators
450 if isinstance(decorator, ast.Name):
451 decorators.append(decorator.id)
452 elif isinstance(decorator, ast.Attribute):
453 decorators.append(f"{decorator.attr}")
454 return decorators
456 def get_parameters(node) -> list[str]:
457 """Extract parameter names from a function node."""
458 if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
459 return []
461 params = []
462 for arg in node.args.args[:20]: # Limit parameters
463 params.append(arg.arg)
464 return params
466 def visit_node(node, level=0, parent_element=None):
467 """Recursively visit AST nodes."""
468 # Performance checks
469 if level > MAX_RECURSION_DEPTH: # Limit recursion depth
470 return
471 if len(elements) >= MAX_ELEMENTS_TO_PROCESS:
472 return
474 element = None
476 if isinstance(node, ast.ClassDef):
477 element = CodeElement(
478 name=node.name,
479 element_type=CodeElementType.CLASS,
480 content=ast.get_source_segment(content, node) or "",
481 start_line=node.lineno,
482 end_line=node.end_lineno or node.lineno,
483 level=level,
484 docstring=extract_docstring(node),
485 decorators=get_decorators(node),
486 )
488 elif isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
489 element_type = (
490 CodeElementType.METHOD if level > 0 else CodeElementType.FUNCTION
491 )
492 element = CodeElement(
493 name=node.name,
494 element_type=element_type,
495 content=ast.get_source_segment(content, node) or "",
496 start_line=node.lineno,
497 end_line=node.end_lineno or node.lineno,
498 level=level,
499 docstring=extract_docstring(node),
500 decorators=get_decorators(node),
501 parameters=get_parameters(node),
502 is_async=isinstance(node, ast.AsyncFunctionDef),
503 )
505 elif isinstance(node, ast.Import | ast.ImportFrom):
506 import_names = []
507 if isinstance(node, ast.Import):
508 import_names = [
509 alias.name for alias in node.names[:10]
510 ] # Limit imports
511 else:
512 module = node.module or ""
513 import_names = [
514 f"{module}.{alias.name}" for alias in node.names[:10]
515 ]
517 element = CodeElement(
518 name=", ".join(import_names),
519 element_type=CodeElementType.IMPORT,
520 content=ast.get_source_segment(content, node) or "",
521 start_line=node.lineno,
522 end_line=node.end_lineno or node.lineno,
523 level=level,
524 dependencies=import_names,
525 )
527 if element:
528 # Skip very large elements
529 if len(element.content) > MAX_ELEMENT_SIZE:
530 return
532 if parent_element:
533 parent_element.add_child(element)
534 else:
535 elements.append(element)
537 # Recursively process children (limited depth)
538 if level < MAX_RECURSION_DEPTH - 3: # Leave room for deeper nesting
539 for child in ast.iter_child_nodes(node):
540 visit_node(child, level + 1, element)
541 else:
542 # For nodes we don't handle, still process their children (limited depth)
543 if level < MAX_RECURSION_DEPTH: # Use full depth limit
544 for child in ast.iter_child_nodes(node):
545 visit_node(child, level, parent_element)
547 # Start processing from the root
548 for node in ast.iter_child_nodes(tree):
549 visit_node(node)
551 return elements
553 def _extract_code_metadata(
554 self, element: CodeElement, language: str
555 ) -> dict[str, Any]:
556 """Extract metadata from a code element.
558 Args:
559 element: The code element to analyze
560 language: Programming language
562 Returns:
563 Dictionary containing element metadata
564 """
565 metadata = {
566 "element_type": element.element_type.value,
567 "name": element.name,
568 "language": language,
569 "start_line": element.start_line,
570 "end_line": element.end_line,
571 "line_count": element.end_line - element.start_line + 1,
572 "level": element.level,
573 "visibility": element.visibility,
574 "is_async": element.is_async,
575 "is_static": element.is_static,
576 "is_abstract": element.is_abstract,
577 "complexity": element.complexity,
578 "has_docstring": element.docstring is not None,
579 "docstring_length": len(element.docstring) if element.docstring else 0,
580 "parameter_count": len(element.parameters),
581 "decorator_count": len(element.decorators),
582 "child_count": len(element.children),
583 "dependency_count": len(element.dependencies),
584 }
586 # Add specific metadata based on element type
587 if element.element_type in [CodeElementType.FUNCTION, CodeElementType.METHOD]:
588 metadata.update(
589 {
590 "parameters": element.parameters,
591 "return_type": element.return_type,
592 "decorators": element.decorators,
593 }
594 )
596 if element.element_type == CodeElementType.CLASS:
597 metadata.update(
598 {
599 "method_count": len(
600 [
601 c
602 for c in element.children
603 if c.element_type == CodeElementType.METHOD
604 ]
605 ),
606 "property_count": len(
607 [
608 c
609 for c in element.children
610 if c.element_type == CodeElementType.PROPERTY
611 ]
612 ),
613 }
614 )
616 if element.element_type == CodeElementType.IMPORT:
617 metadata.update({"dependencies": element.dependencies})
619 # Add parent context
620 if element.parent:
621 metadata.update(
622 {
623 "parent_name": element.parent.name,
624 "parent_type": element.parent.element_type.value,
625 "parent_level": element.parent.level,
626 }
627 )
629 return metadata
631 def _merge_small_elements(
632 self, elements: list[CodeElement], min_size: int = 200
633 ) -> list[CodeElement]:
634 """Merge small code elements to create more meaningful chunks.
636 Args:
637 elements: List of code elements
638 min_size: Minimum size for standalone elements
640 Returns:
641 List of merged elements
642 """
643 if not elements:
644 return []
646 merged = []
647 current_group = []
648 current_size = 0
650 for element in elements:
651 element_size = len(element.content)
653 # If element is large enough or is a significant code structure, keep it separate
654 if (
655 element_size >= min_size
656 or element.element_type
657 in [CodeElementType.CLASS, CodeElementType.FUNCTION]
658 or (
659 element.element_type == CodeElementType.METHOD
660 and element_size > 100
661 )
662 ):
663 # First, add any accumulated small elements
664 if current_group:
665 merged_element = self._create_merged_element(current_group)
666 merged.append(merged_element)
667 current_group = []
668 current_size = 0
670 # Add the large element
671 merged.append(element)
672 else:
673 # Accumulate small elements
674 current_group.append(element)
675 current_size += element_size
677 # If accumulated size is large enough, create a merged element
678 if current_size >= min_size:
679 merged_element = self._create_merged_element(current_group)
680 merged.append(merged_element)
681 current_group = []
682 current_size = 0
684 # Handle remaining small elements
685 if current_group:
686 merged_element = self._create_merged_element(current_group)
687 merged.append(merged_element)
689 return merged
691 def _create_merged_element(self, elements: list[CodeElement]) -> CodeElement:
692 """Create a merged element from a list of small elements.
694 Args:
695 elements: List of elements to merge
697 Returns:
698 Merged code element
699 """
700 if not elements:
701 raise ValueError("Cannot merge empty list of elements")
703 if len(elements) == 1:
704 return elements[0]
706 # Create merged element
707 merged_content = "\n\n".join(element.content for element in elements)
708 merged_names = [element.name for element in elements]
710 merged_element = CodeElement(
711 name=f"merged_({', '.join(merged_names[:3])}{'...' if len(merged_names) > 3 else ''})",
712 element_type=CodeElementType.MODULE, # Use module as generic container
713 content=merged_content,
714 start_line=elements[0].start_line,
715 end_line=elements[-1].end_line,
716 level=min(element.level for element in elements),
717 )
719 # Merge dependencies
720 all_dependencies = []
721 for element in elements:
722 all_dependencies.extend(element.dependencies)
723 merged_element.dependencies = list(set(all_dependencies))
725 return merged_element
727 def _split_text(self, content: str) -> list[dict[str, Any]]:
728 """Split code content into chunks based on programming language structure.
730 Args:
731 content: The code content to split
733 Returns:
734 List of dictionaries with chunk content and metadata
735 """
736 # This method is required by the base class but not used in our implementation
737 # We override chunk_document instead
738 return [{"content": content, "metadata": {"element_type": "unknown"}}]
740 def chunk_document(self, document: Document) -> list[Document]:
741 """Chunk a code document using AST parsing.
743 Args:
744 document: Document to chunk
746 Returns:
747 List of chunked documents
748 """
749 file_name = (
750 document.metadata.get("file_name")
751 or document.metadata.get("original_filename")
752 or document.title
753 or f"{document.source_type}:{document.source}"
754 )
756 # Start progress tracking
757 self.progress_tracker.start_chunking(
758 document.id,
759 document.source,
760 document.source_type,
761 len(document.content),
762 file_name,
763 )
765 try:
766 # Detect language from file path first for language-specific optimizations
767 file_path = document.metadata.get("file_name", "") or document.source
768 language = self._detect_language(file_path, document.content)
770 # Performance check: universal threshold for all code files
771 if len(document.content) > CHUNK_SIZE_THRESHOLD:
772 self.progress_tracker.log_fallback(
773 document.id,
774 f"Large {language} file ({len(document.content)} bytes)",
775 )
776 return self._fallback_chunking(document)
778 self.logger.debug(f"Detected language: {language}")
780 # Parse code structure using AST
781 elements = []
782 parsing_method = "unknown"
784 if language == "python":
785 # Try Python AST first for Python files
786 self.logger.debug("Parsing Python with built-in AST")
787 elements = self._parse_python_ast(document.content)
788 parsing_method = "python_ast"
790 # Fallback to tree-sitter if Python AST fails
791 if not elements and TREE_SITTER_AVAILABLE:
792 self.logger.debug("Falling back to Tree-sitter for Python")
793 elements = self._parse_with_tree_sitter(document.content, language)
794 parsing_method = "tree_sitter"
795 elif language != "unknown" and TREE_SITTER_AVAILABLE:
796 # Use tree-sitter for other supported languages
797 self.logger.debug(f"Parsing {language} with Tree-sitter")
798 elements = self._parse_with_tree_sitter(document.content, language)
799 parsing_method = "tree_sitter"
801 if not elements:
802 self.progress_tracker.log_fallback(
803 document.id, f"No {language} elements found"
804 )
805 return self._fallback_chunking(document)
807 # Merge small elements to optimize chunk size
808 final_elements = self._merge_small_elements(elements)
809 if len(final_elements) > 100: # Limit total chunks
810 final_elements = final_elements[:100]
812 # Create chunked documents
813 chunked_docs = []
814 for i, element in enumerate(final_elements):
815 self.logger.debug(
816 f"Processing element {i+1}/{len(final_elements)}",
817 extra={
818 "element_name": element.name,
819 "element_type": element.element_type.value,
820 "content_size": len(element.content),
821 },
822 )
824 # Create chunk document with optimized metadata processing
825 chunk_doc = self._create_chunk_document(
826 original_doc=document,
827 chunk_content=element.content,
828 chunk_index=i,
829 total_chunks=len(final_elements),
830 skip_nlp=False,
831 )
833 # Add code-specific metadata
834 code_metadata = self._extract_code_metadata(element, language)
835 code_metadata["parsing_method"] = parsing_method
836 code_metadata["chunking_strategy"] = "code"
837 code_metadata["parent_document_id"] = document.id
838 chunk_doc.metadata.update(code_metadata)
840 chunked_docs.append(chunk_doc)
842 # Finish progress tracking
843 self.progress_tracker.finish_chunking(
844 document.id, len(chunked_docs), f"code ({language})"
845 )
846 return chunked_docs
848 except Exception as e:
849 self.progress_tracker.log_error(document.id, str(e))
850 # Fallback to default chunking
851 self.progress_tracker.log_fallback(
852 document.id, f"Code parsing failed: {str(e)}"
853 )
854 return self._fallback_chunking(document)
856 def _fallback_chunking(self, document: Document) -> list[Document]:
857 """Fallback to simple text-based chunking when AST parsing fails.
859 Args:
860 document: Document to chunk
862 Returns:
863 List of chunked documents
864 """
865 self.logger.warning("Falling back to simple text chunking for code document")
867 # Use simple line-based splitting for code (optimized)
868 lines = document.content.split("\n")
869 chunks = []
870 current_chunk = []
871 current_size = 0
873 for line in lines:
874 line_size = len(line) + 1 # +1 for newline
876 if current_size + line_size > self.chunk_size and current_chunk:
877 chunks.append("\n".join(current_chunk))
878 current_chunk = [line]
879 current_size = line_size
880 else:
881 current_chunk.append(line)
882 current_size += line_size
884 # Add remaining lines
885 if current_chunk:
886 chunks.append("\n".join(current_chunk))
888 # Create chunk documents (limited)
889 chunked_docs = []
890 for i, chunk_content in enumerate(chunks[:50]): # Limit chunks
891 chunk_doc = self._create_chunk_document(
892 original_doc=document,
893 chunk_content=chunk_content,
894 chunk_index=i,
895 total_chunks=len(chunks),
896 )
898 chunk_doc.id = Document.generate_chunk_id(document.id, i)
899 chunk_doc.metadata["parent_document_id"] = document.id
900 chunk_doc.metadata["chunking_method"] = "fallback_text"
902 chunked_docs.append(chunk_doc)
904 return chunked_docs