Coverage for src / qdrant_loader / core / chunking / strategy / markdown / chunk_processor.py: 100%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Chunk processing coordination for markdown strategy."""
3from typing import TYPE_CHECKING, Any
5import structlog
7from qdrant_loader.core.document import Document
8from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer
10if TYPE_CHECKING:
11 from qdrant_loader.config import Settings
13logger = structlog.get_logger(__name__)
16class ChunkProcessor:
17 """Handles chunk processing coordination including parallel execution and semantic analysis."""
19 def __init__(self, settings: "Settings"):
20 """Initialize the chunk processor.
22 Args:
23 settings: Configuration settings
24 """
25 self.settings = settings
27 # Initialize semantic analyzer only if enabled
28 self._semantic_analysis_enabled = (
29 settings.global_config.chunking.enable_semantic_analysis
30 )
32 self._enhanced_semantic_analysis_enabled = (
33 settings.global_config.chunking.enable_enhanced_semantic_analysis
34 )
36 if self._semantic_analysis_enabled:
37 self.semantic_analyzer = SemanticAnalyzer(
38 spacy_model=settings.global_config.semantic_analysis.spacy_model,
39 num_topics=settings.global_config.semantic_analysis.num_topics,
40 passes=settings.global_config.semantic_analysis.lda_passes,
41 )
42 else:
43 self.semantic_analyzer = None
44 logger.info(
45 "Semantic analysis disabled — skipping spaCy/LDA initialization"
46 )
48 # Cache for processed chunks to avoid recomputation
49 self._processed_chunks: dict[str, dict[str, Any]] = {}
51 def process_chunk(
52 self, chunk: str, chunk_index: int, total_chunks: int
53 ) -> dict[str, Any]:
54 """Process a single chunk in parallel.
56 Args:
57 chunk: The chunk to process
58 chunk_index: Index of the chunk
59 total_chunks: Total number of chunks
61 Returns:
62 Dictionary containing processing results
63 """
64 logger.debug(
65 "Processing chunk",
66 chunk_index=chunk_index,
67 total_chunks=total_chunks,
68 chunk_length=len(chunk),
69 )
71 # Perform semantic analysis only if enabled
72 if self._semantic_analysis_enabled and self.semantic_analyzer:
73 logger.debug(
74 "Starting semantic analysis for chunk", chunk_index=chunk_index
75 )
76 analysis_result = self.semantic_analyzer.analyze_text(
77 chunk,
78 doc_id=f"chunk_{chunk_index}",
79 include_enhanced=self._enhanced_semantic_analysis_enabled,
80 )
81 results = {
82 "entities": analysis_result.entities,
83 "topics": analysis_result.topics,
84 "key_phrases": analysis_result.key_phrases,
85 }
86 self._processed_chunks[chunk] = results
87 logger.debug(
88 "Completed semantic analysis for chunk", chunk_index=chunk_index
89 )
90 if self._enhanced_semantic_analysis_enabled:
91 results.update(
92 {
93 "pos_tags": analysis_result.pos_tags,
94 "dependencies": analysis_result.dependencies,
95 "document_similarity": analysis_result.document_similarity,
96 }
97 )
98 else:
99 results = {
100 "entities": [],
101 "topics": [],
102 "key_phrases": [],
103 }
104 self._processed_chunks[chunk] = results
105 logger.debug("Semantic analysis skipped for chunk", chunk_index=chunk_index)
106 return results
108 def create_chunk_document(
109 self,
110 original_doc: Document,
111 chunk_content: str,
112 chunk_index: int,
113 total_chunks: int,
114 chunk_metadata: dict[str, Any],
115 skip_nlp: bool = False,
116 ) -> Document:
117 """Create a chunk document with enhanced metadata.
119 Args:
120 original_doc: Original document being chunked
121 chunk_content: Content of the chunk
122 chunk_index: Index of the chunk
123 total_chunks: Total number of chunks
124 chunk_metadata: Chunk-specific metadata
125 skip_nlp: Whether to skip NLP processing
127 Returns:
128 Document representing the chunk
129 """
130 # Create base chunk document
131 chunk_doc = Document(
132 content=chunk_content,
133 title=f"{original_doc.title} - Chunk {chunk_index + 1}",
134 source=original_doc.source,
135 source_type=original_doc.source_type,
136 url=original_doc.url,
137 content_type=original_doc.content_type,
138 metadata=original_doc.metadata.copy(),
139 )
141 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies)
142 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index)
144 # Add chunk-specific metadata
145 chunk_doc.metadata.update(chunk_metadata)
146 chunk_doc.metadata.update(
147 {
148 "chunk_index": chunk_index,
149 "total_chunks": total_chunks,
150 "chunk_size": len(chunk_content),
151 "parent_document_id": original_doc.id,
152 "chunking_strategy": "markdown",
153 }
154 )
156 # Perform semantic analysis if not skipped
157 if not skip_nlp:
158 semantic_results = self.process_chunk(
159 chunk_content, chunk_index, total_chunks
160 )
161 chunk_doc.metadata.update(semantic_results)
163 return chunk_doc
165 def estimate_chunk_count(self, content: str) -> int:
166 """Estimate the number of chunks that will be generated.
168 Args:
169 content: The content to estimate chunks for
171 Returns:
172 int: Estimated number of chunks
173 """
174 chunk_size = self.settings.global_config.chunking.chunk_size
176 # Simple estimation: total chars / chunk_size
177 # This is approximate since we split by paragraphs and have overlap
178 estimated = len(content) // chunk_size
180 # Add some buffer for overlap and paragraph boundaries
181 # Apply estimation buffer from configuration
182 buffer_factor = (
183 1.0
184 + self.settings.global_config.chunking.strategies.markdown.estimation_buffer
185 )
186 estimated = int(estimated * buffer_factor)
188 return max(1, estimated) # At least 1 chunk
190 def shutdown(self):
191 """Shutdown and clean up resources."""
192 if getattr(self, "semantic_analyzer", None) is not None:
193 self.semantic_analyzer.shutdown()
195 def __del__(self):
196 """Cleanup on deletion."""
197 self.shutdown()