Coverage for src/qdrant_loader/core/chunking/strategy/base/chunk_processor.py: 94%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Base class for chunk processing and analysis coordination.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING, Any 

5 

6if TYPE_CHECKING: 

7 from qdrant_loader.config import Settings 

8 from qdrant_loader.core.document import Document 

9 

10 

11class BaseChunkProcessor(ABC): 

12 """Base class for chunk processing and analysis coordination. 

13 

14 This class defines the interface for processing chunks, coordinating 

15 semantic analysis, and creating final chunk documents. Each strategy 

16 implements its own chunk processing logic while following common patterns. 

17 """ 

18 

19 def __init__(self, settings: "Settings"): 

20 """Initialize the chunk processor. 

21 

22 Args: 

23 settings: Configuration settings 

24 """ 

25 self.settings = settings 

26 self.chunk_size = settings.global_config.chunking.chunk_size 

27 self.max_chunks_per_document = ( 

28 settings.global_config.chunking.max_chunks_per_document 

29 ) 

30 

31 @abstractmethod 

32 def create_chunk_document( 

33 self, 

34 original_doc: "Document", 

35 chunk_content: str, 

36 chunk_index: int, 

37 total_chunks: int, 

38 chunk_metadata: dict[str, Any], 

39 skip_nlp: bool = False, 

40 ) -> "Document": 

41 """Create a document for a chunk with all necessary metadata and processing. 

42 

43 This method should: 

44 1. Create a new Document instance for the chunk 

45 2. Apply semantic analysis if not skipped 

46 3. Add chunk-specific metadata (index, total chunks, etc.) 

47 4. Preserve original document metadata 

48 5. Generate unique chunk ID 

49 

50 Args: 

51 original_doc: The original document being chunked 

52 chunk_content: The content of this chunk 

53 chunk_index: Index of this chunk (0-based) 

54 total_chunks: Total number of chunks 

55 chunk_metadata: Metadata specific to this chunk 

56 skip_nlp: Whether to skip semantic analysis for this chunk 

57 

58 Returns: 

59 Document instance representing the chunk 

60 

61 Raises: 

62 NotImplementedError: If the processor doesn't implement this method 

63 """ 

64 raise NotImplementedError( 

65 "Chunk processor must implement create_chunk_document method" 

66 ) 

67 

68 def estimate_chunk_count(self, content: str) -> int: 

69 """Estimate the number of chunks that will be created from content. 

70 

71 This is a utility method that provides a rough estimate of chunk count 

72 based on content size and chunk configuration. 

73 

74 Args: 

75 content: The content to estimate chunks for 

76 

77 Returns: 

78 Estimated number of chunks 

79 """ 

80 if not content: 

81 return 0 

82 

83 content_size = len(content) 

84 if content_size <= self.chunk_size: 

85 return 1 

86 

87 # Account for overlap in estimation 

88 effective_chunk_size = max( 

89 1, self.chunk_size - self.settings.global_config.chunking.chunk_overlap 

90 ) 

91 estimated = max( 

92 1, (content_size + effective_chunk_size - 1) // effective_chunk_size 

93 ) 

94 

95 # Cap at maximum allowed chunks 

96 return min(estimated, self.max_chunks_per_document) 

97 

98 def generate_chunk_id(self, original_doc: "Document", chunk_index: int) -> str: 

99 """Generate a unique ID for a chunk. 

100 

101 Args: 

102 original_doc: The original document 

103 chunk_index: Index of the chunk 

104 

105 Returns: 

106 Unique chunk ID 

107 """ 

108 import uuid 

109 

110 # Create deterministic chunk ID based on original doc ID and chunk index 

111 base_id = f"{original_doc.id}_chunk_{chunk_index}" 

112 # Generate UUID5 for consistency 

113 return str(uuid.uuid5(uuid.NAMESPACE_DNS, base_id)) 

114 

115 def create_base_chunk_metadata( 

116 self, 

117 original_doc: "Document", 

118 chunk_index: int, 

119 total_chunks: int, 

120 chunk_metadata: dict[str, Any], 

121 ) -> dict[str, Any]: 

122 """Create base metadata that all chunks should have. 

123 

124 Args: 

125 original_doc: The original document 

126 chunk_index: Index of this chunk 

127 total_chunks: Total number of chunks 

128 chunk_metadata: Strategy-specific chunk metadata 

129 

130 Returns: 

131 Combined metadata dictionary 

132 """ 

133 # Start with original document metadata 

134 base_metadata = original_doc.metadata.copy() 

135 

136 # Add chunk-specific metadata 

137 base_metadata.update( 

138 { 

139 "chunk_index": chunk_index, 

140 "total_chunks": total_chunks, 

141 "is_chunk": True, 

142 "parent_document_id": original_doc.id, 

143 "chunk_creation_timestamp": self._get_current_timestamp(), 

144 "chunking_strategy": self._get_strategy_name(), 

145 } 

146 ) 

147 

148 # Merge with strategy-specific metadata 

149 base_metadata.update(chunk_metadata) 

150 

151 return base_metadata 

152 

153 def validate_chunk_content(self, content: str) -> bool: 

154 """Validate that chunk content meets quality requirements. 

155 

156 Args: 

157 content: The chunk content to validate 

158 

159 Returns: 

160 True if content is valid, False otherwise 

161 """ 

162 if not content or not content.strip(): 

163 return False 

164 

165 # Check minimum content length 

166 if len(content.strip()) < 10: 

167 return False 

168 

169 # Check maximum content length (safety check) 

170 if len(content) > self.chunk_size * 3: # Allow up to 3x chunk size 

171 return False 

172 

173 return True 

174 

175 def should_skip_semantic_analysis( 

176 self, content: str, chunk_metadata: dict[str, Any] 

177 ) -> bool: 

178 """Determine if semantic analysis should be skipped for this chunk. 

179 

180 This method provides default heuristics for when to skip expensive 

181 semantic analysis operations. Can be overridden by specific processors. 

182 

183 Args: 

184 content: The chunk content 

185 chunk_metadata: Chunk metadata 

186 

187 Returns: 

188 True if semantic analysis should be skipped 

189 """ 

190 # Skip for very short content 

191 if len(content) < 100: 

192 return True 

193 

194 # Skip for content with too few words 

195 if len(content.split()) < 20: 

196 return True 

197 

198 # Skip for very simple structure 

199 if content.count("\n") < 3: 

200 return True 

201 

202 # Skip if explicitly marked in metadata 

203 if chunk_metadata.get("skip_semantic_analysis", False): 

204 return True 

205 

206 return False 

207 

208 def _get_current_timestamp(self) -> str: 

209 """Get current timestamp in ISO format. 

210 

211 Returns: 

212 ISO formatted timestamp string 

213 """ 

214 from datetime import datetime 

215 

216 return datetime.now().isoformat() 

217 

218 def _get_strategy_name(self) -> str: 

219 """Get the name of the chunking strategy. 

220 

221 This should be overridden by specific processors to return 

222 the appropriate strategy name. 

223 

224 Returns: 

225 Strategy name string 

226 """ 

227 return self.__class__.__name__.replace("ChunkProcessor", "").lower() 

228 

229 def calculate_content_similarity(self, content1: str, content2: str) -> float: 

230 """Calculate similarity between two content pieces. 

231 

232 This is a utility method that can be used for overlap detection 

233 or duplicate content identification. 

234 

235 Args: 

236 content1: First content piece 

237 content2: Second content piece 

238 

239 Returns: 

240 Similarity score between 0.0 and 1.0 

241 """ 

242 # Handle empty content cases 

243 if not content1 and not content2: 

244 return 1.0 # Both empty = identical 

245 

246 if not content1 or not content2: 

247 return 0.0 # One empty, one not = different 

248 

249 # Simple word-based similarity 

250 words1 = set(content1.lower().split()) 

251 words2 = set(content2.lower().split()) 

252 

253 if not words1 and not words2: 

254 return 1.0 

255 

256 intersection = words1.intersection(words2) 

257 union = words1.union(words2) 

258 

259 return len(intersection) / len(union) if union else 0.0 

260 

261 def optimize_chunk_boundaries(self, chunks: list[str]) -> list[str]: 

262 """Optimize chunk boundaries to improve content flow. 

263 

264 This is a utility method that can be used by processors to 

265 post-process chunks and improve their boundaries. 

266 

267 Args: 

268 chunks: List of chunk content strings 

269 

270 Returns: 

271 Optimized list of chunks 

272 """ 

273 if len(chunks) <= 1: 

274 return chunks 

275 

276 optimized = [] 

277 for i, chunk in enumerate(chunks): 

278 # Remove leading/trailing whitespace 

279 chunk = chunk.strip() 

280 

281 # Skip empty chunks 

282 if not chunk: 

283 continue 

284 

285 # Try to fix broken sentences at boundaries 

286 if i > 0 and optimized: 

287 # Check if this chunk starts with a lowercase word 

288 # indicating it might be a continuation 

289 words = chunk.split() 

290 if words and words[0][0].islower(): 

291 # Look for a good spot to move content to previous chunk 

292 sentence_end = chunk.find(". ") 

293 if sentence_end > 0 and sentence_end < len(chunk) // 2: 

294 # Move the first sentence to previous chunk 

295 optimized[-1] += " " + chunk[: sentence_end + 1] 

296 chunk = chunk[sentence_end + 2 :].strip() 

297 

298 if chunk: # Only add non-empty chunks 

299 optimized.append(chunk) 

300 

301 return optimized 

302 

303 def shutdown(self): 

304 """Shutdown the processor and clean up resources. 

305 

306 This method should be called when the processor is no longer needed 

307 to clean up any resources (thread pools, connections, etc.). 

308 """ 

309 # Default implementation - can be overridden by specific processors 

310 pass 

311 

312 def __del__(self): 

313 """Cleanup on deletion.""" 

314 try: 

315 self.shutdown() 

316 except Exception: 

317 # Ignore errors during cleanup 

318 pass