Coverage for src/qdrant_loader/core/chunking/strategy/code/code_chunk_processor.py: 75%
251 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Code chunk processor for creating enhanced code chunk documents."""
3from typing import Any
5import structlog
7from qdrant_loader.core.chunking.strategy.base.chunk_processor import BaseChunkProcessor
8from qdrant_loader.core.document import Document
10logger = structlog.get_logger(__name__)
13class CodeChunkProcessor(BaseChunkProcessor):
14 """Chunk processor for code documents with programming language context."""
16 def __init__(self, settings):
17 """Initialize the code chunk processor.
19 Args:
20 settings: Configuration settings
21 """
22 super().__init__(settings)
23 self.logger = logger
25 # Code-specific configuration
26 self.code_config = getattr(
27 settings.global_config.chunking.strategies, "code", None
28 )
29 self.max_chunk_size_for_nlp = getattr(
30 self.code_config, "max_chunk_size_for_nlp", 20000
31 )
33 # NLP skip conditions for code
34 self.skip_conditions = {
35 "large_content": self.max_chunk_size_for_nlp,
36 "binary_patterns": ["\x00", "\xff", "\xfe"],
37 "minified_code_threshold": 0.1, # Ratio of meaningful chars
38 "generated_code_patterns": [
39 "auto-generated",
40 "do not edit",
41 "generated by",
42 ],
43 }
45 def create_chunk_document(
46 self,
47 original_doc: Document,
48 chunk_content: str,
49 chunk_index: int,
50 total_chunks: int,
51 chunk_metadata: dict[str, Any],
52 skip_nlp: bool = False,
53 ) -> Document:
54 """Create a document for a code chunk with enhanced metadata.
56 Args:
57 original_doc: The original document being chunked
58 chunk_content: The content of this chunk
59 chunk_index: Index of this chunk (0-based)
60 total_chunks: Total number of chunks
61 chunk_metadata: Metadata specific to this chunk
62 skip_nlp: Whether to skip semantic analysis for this chunk
64 Returns:
65 Document instance representing the code chunk
66 """
67 # Generate unique chunk ID
68 chunk_id = self.generate_chunk_id(original_doc, chunk_index)
70 # Create base metadata
71 base_metadata = self.create_base_chunk_metadata(
72 original_doc, chunk_index, total_chunks, chunk_metadata
73 )
75 # Add code-specific metadata
76 code_metadata = self._create_code_specific_metadata(
77 chunk_content, chunk_metadata, original_doc
78 )
79 base_metadata.update(code_metadata)
81 # Determine if we should skip NLP for this chunk
82 if not skip_nlp:
83 skip_nlp, skip_reason = self.should_skip_semantic_analysis(
84 chunk_content, chunk_metadata
85 )
86 if skip_nlp:
87 base_metadata["nlp_skip_reason"] = skip_reason
89 # Create chunk document
90 chunk_doc = Document(
91 id=chunk_id,
92 content=chunk_content,
93 metadata=base_metadata,
94 source=original_doc.source,
95 source_type=original_doc.source_type,
96 url=original_doc.url,
97 content_type=original_doc.content_type,
98 title=self._generate_chunk_title(original_doc, chunk_metadata, chunk_index),
99 )
101 return chunk_doc
103 def should_skip_semantic_analysis(
104 self, chunk_content: str, chunk_metadata: dict[str, Any]
105 ) -> tuple[bool, str]:
106 """Determine whether to skip semantic analysis for a code chunk.
108 Args:
109 chunk_content: Content of the chunk
110 chunk_metadata: Metadata for the chunk
112 Returns:
113 Tuple of (should_skip, reason)
114 """
115 content_length = len(chunk_content)
117 # Skip if content is too large
118 if content_length > self.skip_conditions["large_content"]:
119 return True, "content_too_large"
121 # Skip if content appears to be binary
122 if any(
123 pattern in chunk_content
124 for pattern in self.skip_conditions["binary_patterns"]
125 ):
126 return True, "binary_content"
128 # Skip if content appears to be minified
129 if self._is_minified_code(chunk_content):
130 return True, "minified_code"
132 # Skip if content appears to be auto-generated
133 if self._is_generated_code(chunk_content):
134 return True, "generated_code"
136 # Skip if it's mostly comments (low semantic value)
137 if self._is_mostly_comments(chunk_content):
138 return True, "mostly_comments"
140 # Skip test files with many assertions (low semantic complexity)
141 if chunk_metadata.get("element_type") == "test" and content_length < 500:
142 return True, "simple_test_code"
144 # Skip configuration or data files
145 if chunk_metadata.get("language") in ["json", "yaml", "xml", "ini"]:
146 return True, "configuration_file"
148 return False, "suitable_for_nlp"
150 def _create_code_specific_metadata(
151 self, content: str, chunk_metadata: dict[str, Any], original_doc: Document
152 ) -> dict[str, Any]:
153 """Create code-specific metadata for the chunk.
155 Args:
156 content: Chunk content
157 chunk_metadata: Existing chunk metadata
158 original_doc: Original document
160 Returns:
161 Code-specific metadata dictionary
162 """
163 metadata = {
164 "content_analysis": self._analyze_code_content(content),
165 "language_context": self._extract_language_context(content, chunk_metadata),
166 "code_quality": self._assess_code_quality(content, chunk_metadata),
167 "educational_value": self._assess_educational_value(
168 content, chunk_metadata
169 ),
170 "reusability_score": self._calculate_reusability_score(
171 content, chunk_metadata
172 ),
173 "chunking_strategy": "code_modular",
174 }
176 # Add element-specific context
177 element_type = chunk_metadata.get("element_type", "unknown")
178 if element_type != "unknown":
179 metadata["element_context"] = self._extract_element_context(
180 content, element_type
181 )
183 return metadata
185 def _analyze_code_content(self, content: str) -> dict[str, Any]:
186 """Analyze the code content characteristics.
188 Args:
189 content: Code content
191 Returns:
192 Content analysis metrics
193 """
194 lines = content.split("\n")
195 non_empty_lines = [line for line in lines if line.strip()]
196 comment_lines = [
197 line for line in lines if line.strip().startswith(("#", "//", "/*", "--"))
198 ]
200 return {
201 "total_lines": len(lines),
202 "code_lines": len(non_empty_lines) - len(comment_lines),
203 "comment_lines": len(comment_lines),
204 "blank_lines": len(lines) - len(non_empty_lines),
205 "comment_ratio": (
206 len(comment_lines) / len(non_empty_lines) if non_empty_lines else 0
207 ),
208 "avg_line_length": (
209 sum(len(line) for line in lines) / len(lines) if lines else 0
210 ),
211 "max_line_length": max(len(line) for line in lines) if lines else 0,
212 "indentation_consistency": self._check_indentation_consistency(lines),
213 "has_documentation": '"""' in content
214 or "'''" in content
215 or "/*" in content,
216 }
218 def _extract_language_context(
219 self, content: str, chunk_metadata: dict[str, Any]
220 ) -> dict[str, Any]:
221 """Extract programming language context.
223 Args:
224 content: Code content
225 chunk_metadata: Chunk metadata
227 Returns:
228 Language context information
229 """
230 language = chunk_metadata.get("language", "unknown")
232 context = {
233 "language": language,
234 "paradigm": self._identify_programming_paradigm(content, language),
235 "framework_indicators": self._identify_frameworks(content, language),
236 "version_indicators": self._identify_language_version(content, language),
237 "style_conventions": self._analyze_style_conventions(content, language),
238 }
240 return context
242 def _assess_code_quality(
243 self, content: str, chunk_metadata: dict[str, Any]
244 ) -> dict[str, Any]:
245 """Assess code quality indicators.
247 Args:
248 content: Code content
249 chunk_metadata: Chunk metadata
251 Returns:
252 Code quality assessment
253 """
254 # Get complexity from metadata if available
255 complexity = chunk_metadata.get("complexity", 0)
257 quality_score = 100 # Start with perfect score
259 # Deduct points for various quality issues
260 if complexity > 10:
261 quality_score -= 20
262 elif complexity > 5:
263 quality_score -= 10
265 # Check for long lines
266 lines = content.split("\n")
267 long_lines = [line for line in lines if len(line) > 120]
268 if len(long_lines) > len(lines) * 0.3:
269 quality_score -= 15
271 # Check for documentation
272 has_docs = '"""' in content or "'''" in content
273 if not has_docs and len(content) > 500:
274 quality_score -= 10
276 # Check for meaningful naming
277 if self._has_meaningful_names(content):
278 quality_score += 5
279 else:
280 quality_score -= 10
282 return {
283 "quality_score": max(0, quality_score),
284 "complexity_level": (
285 "low" if complexity < 3 else "medium" if complexity < 8 else "high"
286 ),
287 "readability_indicators": {
288 "has_documentation": has_docs,
289 "reasonable_line_length": (
290 len(long_lines) / len(lines) < 0.1 if lines else True
291 ),
292 "meaningful_names": self._has_meaningful_names(content),
293 },
294 }
296 def _assess_educational_value(
297 self, content: str, chunk_metadata: dict[str, Any]
298 ) -> dict[str, Any]:
299 """Assess educational value of the code chunk.
301 Args:
302 content: Code content
303 chunk_metadata: Chunk metadata
305 Returns:
306 Educational value assessment
307 """
308 educational_indicators = []
310 # Check for common educational patterns
311 if "example" in content.lower() or "demo" in content.lower():
312 educational_indicators.append("example_code")
314 if '"""' in content or "'''" in content:
315 educational_indicators.append("well_documented")
317 if "TODO" in content or "FIXME" in content:
318 educational_indicators.append("learning_opportunity")
320 # Check complexity level for learning
321 complexity = chunk_metadata.get("complexity", 0)
322 if 2 <= complexity <= 6:
323 educational_indicators.append("good_complexity_for_learning")
325 # Check for design patterns
326 element_type = chunk_metadata.get("element_type", "unknown")
327 if element_type in ["class", "interface"]:
328 educational_indicators.append("object_oriented_concepts")
330 return {
331 "educational_indicators": educational_indicators,
332 "learning_level": self._determine_learning_level(content, chunk_metadata),
333 "concepts_demonstrated": self._identify_programming_concepts(content),
334 }
336 def _calculate_reusability_score(
337 self, content: str, chunk_metadata: dict[str, Any]
338 ) -> int:
339 """Calculate reusability score for the code chunk.
341 Args:
342 content: Code content
343 chunk_metadata: Chunk metadata
345 Returns:
346 Reusability score (0-100)
347 """
348 score = 50 # Base score
350 # Higher score for certain element types
351 element_type = chunk_metadata.get("element_type", "unknown")
352 if element_type in ["function", "class", "interface"]:
353 score += 20
354 elif element_type == "method":
355 score += 10
357 # Higher score for documented code
358 if '"""' in content or "'''" in content:
359 score += 15
361 # Higher score for parameterized code
362 if "def " in content and "(" in content:
363 param_count = content.count(",") + 1 if "(" in content else 0
364 if param_count > 0:
365 score += min(15, param_count * 3)
367 # Lower score for hardcoded values
368 if any(
369 pattern in content
370 for pattern in ["localhost", "127.0.0.1", "C:\\", "/tmp/"]
371 ):
372 score -= 10
374 # Lower score for very specific implementations
375 if any(
376 keyword in content.lower()
377 for keyword in ["specific", "hardcode", "hack", "temporary"]
378 ):
379 score -= 15
381 return max(0, min(100, score))
383 def _generate_chunk_title(
384 self, original_doc: Document, chunk_metadata: dict[str, Any], chunk_index: int
385 ) -> str:
386 """Generate a descriptive title for the code chunk.
388 Args:
389 original_doc: Original document
390 chunk_metadata: Chunk metadata
391 chunk_index: Chunk index
393 Returns:
394 Generated chunk title
395 """
396 base_title = original_doc.title
398 # Try to use element name if available
399 element_name = chunk_metadata.get("element_name")
400 element_type = chunk_metadata.get("element_type", "code")
401 language = chunk_metadata.get("language", "unknown")
403 if element_name and element_name != "unknown":
404 if element_type in ["function", "method"]:
405 return f"{base_title} - {element_type.title()}: {element_name}()"
406 elif element_type == "class":
407 return f"{base_title} - Class: {element_name}"
408 else:
409 return f"{base_title} - {element_type.title()}: {element_name}"
411 # Fallback to generic naming
412 if language != "unknown":
413 return f"{base_title} - {language.title()} Code Chunk {chunk_index + 1}"
414 else:
415 return f"{base_title} - Code Chunk {chunk_index + 1}"
417 def _is_minified_code(self, content: str) -> bool:
418 """Check if code appears to be minified.
420 Args:
421 content: Code content
423 Returns:
424 True if code appears minified
425 """
426 lines = content.split("\n")
427 if not lines:
428 return False
430 # Check for very long lines (typical of minified code)
431 avg_line_length = sum(len(line) for line in lines) / len(lines)
432 max_line_length = max(len(line) for line in lines)
434 # Check ratio of meaningful characters
435 meaningful_chars = sum(1 for char in content if char.isalnum() or char in "_$")
436 total_chars = len(content)
437 meaningful_ratio = meaningful_chars / total_chars if total_chars > 0 else 0
439 return (
440 avg_line_length > 200
441 or max_line_length > 1000
442 or meaningful_ratio < self.skip_conditions["minified_code_threshold"]
443 )
445 def _is_generated_code(self, content: str) -> bool:
446 """Check if code appears to be auto-generated.
448 Args:
449 content: Code content
451 Returns:
452 True if code appears auto-generated
453 """
454 content_lower = content.lower()
455 return any(
456 pattern in content_lower
457 for pattern in self.skip_conditions["generated_code_patterns"]
458 )
460 def _is_mostly_comments(self, content: str) -> bool:
461 """Check if content is mostly comments.
463 Args:
464 content: Code content
466 Returns:
467 True if content is mostly comments
468 """
469 lines = content.split("\n")
470 comment_lines = sum(
471 1 for line in lines if line.strip().startswith(("#", "//", "/*", "--"))
472 )
473 non_empty_lines = sum(1 for line in lines if line.strip())
475 return comment_lines / non_empty_lines > 0.8 if non_empty_lines > 0 else False
477 def _check_indentation_consistency(self, lines: list) -> bool:
478 """Check if indentation is consistent.
480 Args:
481 lines: List of code lines
483 Returns:
484 True if indentation is consistent
485 """
486 indentations = []
487 for line in lines:
488 if line.strip(): # Only check non-empty lines
489 leading_spaces = len(line) - len(line.lstrip())
490 if leading_spaces > 0:
491 indentations.append(leading_spaces)
493 if not indentations:
494 return True
496 # Check if indentations follow a pattern (multiples of 2, 4, or 8)
497 for base in [2, 4, 8]:
498 if all(indent % base == 0 for indent in indentations):
499 return True
501 return False
503 def _identify_programming_paradigm(self, content: str, language: str) -> str:
504 """Identify the programming paradigm used.
506 Args:
507 content: Code content
508 language: Programming language
510 Returns:
511 Identified paradigm
512 """
513 paradigms = []
515 if "class " in content:
516 paradigms.append("object_oriented")
517 if any(keyword in content for keyword in ["def ", "function ", "func "]):
518 paradigms.append("procedural")
519 if any(
520 keyword in content for keyword in ["lambda", "map(", "filter(", "reduce("]
521 ):
522 paradigms.append("functional")
523 if "async" in content or "await" in content:
524 paradigms.append("asynchronous")
526 return paradigms[0] if paradigms else "unknown"
528 def _identify_frameworks(self, content: str, language: str) -> list:
529 """Identify frameworks used in the code.
531 Args:
532 content: Code content
533 language: Programming language
535 Returns:
536 List of identified frameworks
537 """
538 frameworks = []
539 content_lower = content.lower()
541 # Python frameworks
542 if language == "python":
543 framework_indicators = {
544 "django": ["django", "models.model", "request.get"],
545 "flask": ["flask", "app.route", "@app."],
546 "fastapi": ["fastapi", "pydantic", "async def"],
547 "pandas": ["pandas", "dataframe", "pd."],
548 "numpy": ["numpy", "np.", "array"],
549 "tensorflow": ["tensorflow", "tf.", "keras"],
550 "pytorch": ["torch", "pytorch", "tensor"],
551 }
552 elif language in ["javascript", "typescript"]:
553 framework_indicators = {
554 "react": ["react", "usestate", "component"],
555 "vue": ["vue", "v-if", "v-for"],
556 "angular": ["angular", "@component", "ngfor"],
557 "express": ["express", "app.get", "middleware"],
558 "jquery": ["jquery", "$", ".click"],
559 }
560 else:
561 framework_indicators = {}
563 for framework, indicators in framework_indicators.items():
564 if any(indicator in content_lower for indicator in indicators):
565 frameworks.append(framework)
567 return frameworks
569 def _identify_language_version(self, content: str, language: str) -> str:
570 """Identify language version indicators.
572 Args:
573 content: Code content
574 language: Programming language
576 Returns:
577 Version indicators
578 """
579 if language == "python":
580 if ":=" in content:
581 return "3.8+"
582 elif 'f"' in content or "f'" in content:
583 return "3.6+"
584 elif "async def" in content:
585 return "3.5+"
586 elif "yield from" in content:
587 return "3.3+"
588 elif language == "javascript":
589 if "=>" in content:
590 return "ES6+"
591 elif "const " in content or "let " in content:
592 return "ES6+"
594 return "unknown"
596 def _analyze_style_conventions(self, content: str, language: str) -> dict[str, Any]:
597 """Analyze coding style conventions.
599 Args:
600 content: Code content
601 language: Programming language
603 Returns:
604 Style analysis
605 """
606 conventions = {}
608 if language == "python":
609 # Check naming conventions
610 conventions["snake_case_functions"] = bool(
611 re.search(r"def [a-z_]+\(", content)
612 )
613 conventions["pascal_case_classes"] = bool(
614 re.search(r"class [A-Z][a-zA-Z]*", content)
615 )
617 elif language in ["javascript", "typescript"]:
618 # Check naming conventions
619 conventions["camel_case_functions"] = bool(
620 re.search(r"function [a-z][a-zA-Z]*\(", content)
621 )
622 conventions["pascal_case_classes"] = bool(
623 re.search(r"class [A-Z][a-zA-Z]*", content)
624 )
626 return conventions
628 def _has_meaningful_names(self, content: str) -> bool:
629 """Check if the code uses meaningful variable/function names.
631 Args:
632 content: Code content
634 Returns:
635 True if names appear meaningful
636 """
637 # Extract identifiers
638 import re
640 identifiers = re.findall(r"\b[a-zA-Z_][a-zA-Z0-9_]*\b", content)
642 # Filter out keywords and single character names
643 meaningful_names = [
644 name
645 for name in identifiers
646 if len(name) > 2
647 and name not in ["def", "class", "for", "if", "else", "try", "except"]
648 ]
650 # Check for non-descriptive patterns
651 non_descriptive = [
652 name for name in meaningful_names if re.match(r"^[a-z]{1,2}\d*$", name)
653 ]
655 if not meaningful_names:
656 return True # No names to judge
658 return len(non_descriptive) / len(meaningful_names) < 0.3
660 def _determine_learning_level(
661 self, content: str, chunk_metadata: dict[str, Any]
662 ) -> str:
663 """Determine the learning level of the code.
665 Args:
666 content: Code content
667 chunk_metadata: Chunk metadata
669 Returns:
670 Learning level (beginner, intermediate, advanced)
671 """
672 complexity = chunk_metadata.get("complexity", 0)
673 element_type = chunk_metadata.get("element_type", "unknown")
675 # Advanced indicators
676 advanced_patterns = [
677 "metaclass",
678 "decorator",
679 "generator",
680 "async",
681 "threading",
682 "multiprocessing",
683 ]
684 if any(pattern in content.lower() for pattern in advanced_patterns):
685 return "advanced"
687 # Intermediate indicators
688 if complexity > 5 or element_type in ["class", "interface"]:
689 return "intermediate"
691 # Simple function or straightforward code
692 if complexity <= 3 and len(content.split("\n")) < 20:
693 return "beginner"
695 return "intermediate"
697 def _identify_programming_concepts(self, content: str) -> list:
698 """Identify programming concepts demonstrated in the code.
700 Args:
701 content: Code content
703 Returns:
704 List of programming concepts
705 """
706 concepts = []
707 content_lower = content.lower()
709 # Basic concepts
710 if "if " in content_lower:
711 concepts.append("conditionals")
712 if "for " in content_lower or "while " in content_lower:
713 concepts.append("loops")
714 if "def " in content_lower or "function " in content_lower:
715 concepts.append("functions")
716 if "class " in content_lower:
717 concepts.append("classes")
719 # Advanced concepts
720 if "try:" in content_lower or "except:" in content_lower:
721 concepts.append("exception_handling")
722 if "async" in content_lower:
723 concepts.append("asynchronous_programming")
724 if "yield" in content_lower:
725 concepts.append("generators")
726 if "@" in content:
727 concepts.append("decorators")
728 if "lambda" in content_lower:
729 concepts.append("lambda_functions")
731 return concepts
733 def _extract_element_context(
734 self, content: str, element_type: str
735 ) -> dict[str, Any]:
736 """Extract context specific to the code element type.
738 Args:
739 content: Code content
740 element_type: Type of code element
742 Returns:
743 Element-specific context
744 """
745 context = {"element_type": element_type}
747 if element_type in ["function", "method"]:
748 context.update(
749 {
750 "parameter_count": content.count(",") + 1 if "(" in content else 0,
751 "has_return_statement": "return " in content,
752 "has_docstring": '"""' in content or "'''" in content,
753 "is_recursive": content.count(self._extract_function_name(content))
754 > 1,
755 }
756 )
757 elif element_type == "class":
758 context.update(
759 {
760 "method_count": content.count("def "),
761 "has_constructor": "__init__" in content
762 or "constructor" in content,
763 "has_inheritance": "extends" in content
764 or "(" in content.split("class")[1].split(":")[0],
765 "has_docstring": '"""' in content or "'''" in content,
766 }
767 )
769 return context
771 def _extract_function_name(self, content: str) -> str:
772 """Extract function name from content.
774 Args:
775 content: Code content
777 Returns:
778 Function name or empty string
779 """
780 import re
782 match = re.search(r"def\s+([a-zA-Z_][a-zA-Z0-9_]*)", content)
783 return match.group(1) if match else ""