Coverage for src/qdrant_loader/core/chunking/strategy/base/chunk_processor.py: 94%
88 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Base class for chunk processing and analysis coordination."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING, Any
6if TYPE_CHECKING:
7 from qdrant_loader.config import Settings
8 from qdrant_loader.core.document import Document
11class BaseChunkProcessor(ABC):
12 """Base class for chunk processing and analysis coordination.
14 This class defines the interface for processing chunks, coordinating
15 semantic analysis, and creating final chunk documents. Each strategy
16 implements its own chunk processing logic while following common patterns.
17 """
19 def __init__(self, settings: "Settings"):
20 """Initialize the chunk processor.
22 Args:
23 settings: Configuration settings
24 """
25 self.settings = settings
26 self.chunk_size = settings.global_config.chunking.chunk_size
27 self.max_chunks_per_document = (
28 settings.global_config.chunking.max_chunks_per_document
29 )
31 @abstractmethod
32 def create_chunk_document(
33 self,
34 original_doc: "Document",
35 chunk_content: str,
36 chunk_index: int,
37 total_chunks: int,
38 chunk_metadata: dict[str, Any],
39 skip_nlp: bool = False,
40 ) -> "Document":
41 """Create a document for a chunk with all necessary metadata and processing.
43 This method should:
44 1. Create a new Document instance for the chunk
45 2. Apply semantic analysis if not skipped
46 3. Add chunk-specific metadata (index, total chunks, etc.)
47 4. Preserve original document metadata
48 5. Generate unique chunk ID
50 Args:
51 original_doc: The original document being chunked
52 chunk_content: The content of this chunk
53 chunk_index: Index of this chunk (0-based)
54 total_chunks: Total number of chunks
55 chunk_metadata: Metadata specific to this chunk
56 skip_nlp: Whether to skip semantic analysis for this chunk
58 Returns:
59 Document instance representing the chunk
61 Raises:
62 NotImplementedError: If the processor doesn't implement this method
63 """
64 raise NotImplementedError(
65 "Chunk processor must implement create_chunk_document method"
66 )
68 def estimate_chunk_count(self, content: str) -> int:
69 """Estimate the number of chunks that will be created from content.
71 This is a utility method that provides a rough estimate of chunk count
72 based on content size and chunk configuration.
74 Args:
75 content: The content to estimate chunks for
77 Returns:
78 Estimated number of chunks
79 """
80 if not content:
81 return 0
83 content_size = len(content)
84 if content_size <= self.chunk_size:
85 return 1
87 # Account for overlap in estimation
88 effective_chunk_size = max(
89 1, self.chunk_size - self.settings.global_config.chunking.chunk_overlap
90 )
91 estimated = max(
92 1, (content_size + effective_chunk_size - 1) // effective_chunk_size
93 )
95 # Cap at maximum allowed chunks
96 return min(estimated, self.max_chunks_per_document)
98 def generate_chunk_id(self, original_doc: "Document", chunk_index: int) -> str:
99 """Generate a unique ID for a chunk.
101 Args:
102 original_doc: The original document
103 chunk_index: Index of the chunk
105 Returns:
106 Unique chunk ID
107 """
108 import uuid
110 # Create deterministic chunk ID based on original doc ID and chunk index
111 base_id = f"{original_doc.id}_chunk_{chunk_index}"
112 # Generate UUID5 for consistency
113 return str(uuid.uuid5(uuid.NAMESPACE_DNS, base_id))
115 def create_base_chunk_metadata(
116 self,
117 original_doc: "Document",
118 chunk_index: int,
119 total_chunks: int,
120 chunk_metadata: dict[str, Any],
121 ) -> dict[str, Any]:
122 """Create base metadata that all chunks should have.
124 Args:
125 original_doc: The original document
126 chunk_index: Index of this chunk
127 total_chunks: Total number of chunks
128 chunk_metadata: Strategy-specific chunk metadata
130 Returns:
131 Combined metadata dictionary
132 """
133 # Start with original document metadata
134 base_metadata = original_doc.metadata.copy()
136 # Add chunk-specific metadata
137 base_metadata.update(
138 {
139 "chunk_index": chunk_index,
140 "total_chunks": total_chunks,
141 "is_chunk": True,
142 "parent_document_id": original_doc.id,
143 "chunk_creation_timestamp": self._get_current_timestamp(),
144 "chunking_strategy": self._get_strategy_name(),
145 }
146 )
148 # Merge with strategy-specific metadata
149 base_metadata.update(chunk_metadata)
151 return base_metadata
153 def validate_chunk_content(self, content: str) -> bool:
154 """Validate that chunk content meets quality requirements.
156 Args:
157 content: The chunk content to validate
159 Returns:
160 True if content is valid, False otherwise
161 """
162 if not content or not content.strip():
163 return False
165 # Check minimum content length
166 if len(content.strip()) < 10:
167 return False
169 # Check maximum content length (safety check)
170 if len(content) > self.chunk_size * 3: # Allow up to 3x chunk size
171 return False
173 return True
175 def should_skip_semantic_analysis(
176 self, content: str, chunk_metadata: dict[str, Any]
177 ) -> bool:
178 """Determine if semantic analysis should be skipped for this chunk.
180 This method provides default heuristics for when to skip expensive
181 semantic analysis operations. Can be overridden by specific processors.
183 Args:
184 content: The chunk content
185 chunk_metadata: Chunk metadata
187 Returns:
188 True if semantic analysis should be skipped
189 """
190 # Skip for very short content
191 if len(content) < 100:
192 return True
194 # Skip for content with too few words
195 if len(content.split()) < 20:
196 return True
198 # Skip for very simple structure
199 if content.count("\n") < 3:
200 return True
202 # Skip if explicitly marked in metadata
203 if chunk_metadata.get("skip_semantic_analysis", False):
204 return True
206 return False
208 def _get_current_timestamp(self) -> str:
209 """Get current timestamp in ISO format.
211 Returns:
212 ISO formatted timestamp string
213 """
214 from datetime import datetime
216 return datetime.now().isoformat()
218 def _get_strategy_name(self) -> str:
219 """Get the name of the chunking strategy.
221 This should be overridden by specific processors to return
222 the appropriate strategy name.
224 Returns:
225 Strategy name string
226 """
227 return self.__class__.__name__.replace("ChunkProcessor", "").lower()
229 def calculate_content_similarity(self, content1: str, content2: str) -> float:
230 """Calculate similarity between two content pieces.
232 This is a utility method that can be used for overlap detection
233 or duplicate content identification.
235 Args:
236 content1: First content piece
237 content2: Second content piece
239 Returns:
240 Similarity score between 0.0 and 1.0
241 """
242 # Handle empty content cases
243 if not content1 and not content2:
244 return 1.0 # Both empty = identical
246 if not content1 or not content2:
247 return 0.0 # One empty, one not = different
249 # Simple word-based similarity
250 words1 = set(content1.lower().split())
251 words2 = set(content2.lower().split())
253 if not words1 and not words2:
254 return 1.0
256 intersection = words1.intersection(words2)
257 union = words1.union(words2)
259 return len(intersection) / len(union) if union else 0.0
261 def optimize_chunk_boundaries(self, chunks: list[str]) -> list[str]:
262 """Optimize chunk boundaries to improve content flow.
264 This is a utility method that can be used by processors to
265 post-process chunks and improve their boundaries.
267 Args:
268 chunks: List of chunk content strings
270 Returns:
271 Optimized list of chunks
272 """
273 if len(chunks) <= 1:
274 return chunks
276 optimized = []
277 for i, chunk in enumerate(chunks):
278 # Remove leading/trailing whitespace
279 chunk = chunk.strip()
281 # Skip empty chunks
282 if not chunk:
283 continue
285 # Try to fix broken sentences at boundaries
286 if i > 0 and optimized:
287 # Check if this chunk starts with a lowercase word
288 # indicating it might be a continuation
289 words = chunk.split()
290 if words and words[0][0].islower():
291 # Look for a good spot to move content to previous chunk
292 sentence_end = chunk.find(". ")
293 if sentence_end > 0 and sentence_end < len(chunk) // 2:
294 # Move the first sentence to previous chunk
295 optimized[-1] += " " + chunk[: sentence_end + 1]
296 chunk = chunk[sentence_end + 2 :].strip()
298 if chunk: # Only add non-empty chunks
299 optimized.append(chunk)
301 return optimized
303 def shutdown(self):
304 """Shutdown the processor and clean up resources.
306 This method should be called when the processor is no longer needed
307 to clean up any resources (thread pools, connections, etc.).
308 """
309 # Default implementation - can be overridden by specific processors
310 pass
312 def __del__(self):
313 """Cleanup on deletion."""
314 try:
315 self.shutdown()
316 except Exception:
317 # Ignore errors during cleanup
318 pass