Coverage for src/qdrant_loader/core/pipeline/workers/chunking_worker.py: 94%

112 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Chunking worker for processing documents into chunks.""" 

2 

3import asyncio 

4import concurrent.futures 

5from collections.abc import AsyncIterator 

6 

7import psutil 

8 

9from qdrant_loader.core.chunking.chunking_service import ChunkingService 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.core.monitoring import prometheus_metrics 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .base_worker import BaseWorker 

15 

16logger = LoggingConfig.get_logger(__name__) 

17 

18 

19class ChunkingWorker(BaseWorker): 

20 """Handles document chunking with controlled concurrency.""" 

21 

22 def __init__( 

23 self, 

24 chunking_service: ChunkingService, 

25 chunk_executor: concurrent.futures.ThreadPoolExecutor, 

26 max_workers: int = 10, 

27 queue_size: int = 1000, 

28 shutdown_event: asyncio.Event | None = None, 

29 ): 

30 super().__init__(max_workers, queue_size) 

31 self.chunking_service = chunking_service 

32 self.chunk_executor = chunk_executor 

33 self.shutdown_event = shutdown_event or asyncio.Event() 

34 

35 async def process(self, document: Document) -> list: 

36 """Process a single document into chunks. 

37 

38 Args: 

39 document: The document to chunk 

40 

41 Returns: 

42 List of chunks 

43 """ 

44 logger.debug(f"Chunker_worker started for doc {document.id}") 

45 

46 try: 

47 # Check for shutdown signal 

48 if self.shutdown_event.is_set(): 

49 logger.debug(f"Chunker_worker {document.id} exiting due to shutdown") 

50 return [] 

51 

52 # Update metrics 

53 prometheus_metrics.CPU_USAGE.set(psutil.cpu_percent()) 

54 prometheus_metrics.MEMORY_USAGE.set(psutil.virtual_memory().percent) 

55 

56 # Run chunking in a thread pool for true parallelism 

57 with prometheus_metrics.CHUNKING_DURATION.time(): 

58 # Calculate adaptive timeout based on document size 

59 adaptive_timeout = self._calculate_adaptive_timeout(document) 

60 

61 # Log timeout decision for debugging 

62 logger.debug( 

63 f"Adaptive timeout for {document.url}: {adaptive_timeout:.1f}s " 

64 f"(size: {len(document.content)} bytes)" 

65 ) 

66 

67 # Add timeout to prevent hanging on chunking 

68 chunks = await asyncio.wait_for( 

69 asyncio.get_running_loop().run_in_executor( 

70 self.chunk_executor, 

71 self.chunking_service.chunk_document, 

72 document, 

73 ), 

74 timeout=adaptive_timeout, 

75 ) 

76 

77 # Check for shutdown before returning chunks 

78 if self.shutdown_event.is_set(): 

79 logger.debug( 

80 f"Chunker_worker {document.id} exiting due to shutdown after chunking" 

81 ) 

82 return [] 

83 

84 # Add document reference to chunk for later state tracking 

85 for chunk in chunks: 

86 chunk.metadata["parent_document"] = document 

87 

88 logger.debug(f"Chunked doc {document.id} into {len(chunks)} chunks") 

89 return chunks 

90 

91 except asyncio.CancelledError: 

92 logger.debug(f"Chunker_worker {document.id} cancelled") 

93 raise 

94 except TimeoutError: 

95 # Provide more detailed timeout information 

96 doc_size = len(document.content) 

97 timeout_used = self._calculate_adaptive_timeout(document) 

98 logger.error( 

99 f"Chunking timed out for document '{document.url}' " 

100 f"(size: {doc_size:,} bytes, timeout: {timeout_used:.1f}s). " 

101 f"Consider increasing chunking timeout or checking document complexity." 

102 ) 

103 raise 

104 except Exception as e: 

105 logger.error(f"Chunking failed for doc {document.url}: {e}") 

106 raise 

107 

108 async def process_documents(self, documents: list[Document]) -> AsyncIterator: 

109 """Process documents into chunks. 

110 

111 Args: 

112 documents: List of documents to process 

113 

114 Yields: 

115 Chunks from processed documents 

116 """ 

117 logger.debug("ChunkingWorker started") 

118 logger.info(f"🔄 Processing {len(documents)} documents for chunking...") 

119 

120 try: 

121 # Process documents with controlled concurrency but stream results 

122 semaphore = asyncio.Semaphore(self.max_workers) 

123 

124 async def process_and_yield(doc, doc_index): 

125 """Process a single document and yield its chunks.""" 

126 try: 

127 async with semaphore: 

128 if self.shutdown_event.is_set(): 

129 logger.debug( 

130 f"ChunkingWorker exiting due to shutdown (doc {doc_index})" 

131 ) 

132 return 

133 

134 logger.debug( 

135 f"🔄 Processing document {doc_index + 1}/{len(documents)}: {doc.id}" 

136 ) 

137 chunks = await self.process(doc) 

138 

139 if chunks: 

140 logger.debug( 

141 f"✓ Document {doc_index + 1}/{len(documents)} produced {len(chunks)} chunks" 

142 ) 

143 return chunks 

144 else: 

145 logger.debug( 

146 f"⚠️ Document {doc_index + 1}/{len(documents)} produced no chunks" 

147 ) 

148 return [] 

149 

150 except Exception as e: 

151 logger.error( 

152 f"❌ Chunking failed for document {doc_index + 1}/{len(documents)} ({doc.id}): {e}" 

153 ) 

154 return [] 

155 

156 # Create tasks for all documents 

157 tasks = [process_and_yield(doc, i) for i, doc in enumerate(documents)] 

158 

159 # Process tasks as they complete and yield chunks immediately 

160 chunk_count = 0 

161 completed_docs = 0 

162 

163 for coro in asyncio.as_completed(tasks): 

164 if self.shutdown_event.is_set(): 

165 logger.debug("ChunkingWorker exiting due to shutdown") 

166 break 

167 

168 try: 

169 chunks = await coro 

170 completed_docs += 1 

171 

172 if chunks: 

173 for chunk in chunks: 

174 if not self.shutdown_event.is_set(): 

175 chunk_count += 1 

176 yield chunk 

177 else: 

178 logger.debug("ChunkingWorker exiting due to shutdown") 

179 return 

180 

181 # Log progress every 10 documents or at completion 

182 if completed_docs % 10 == 0 or completed_docs == len(documents): 

183 logger.info( 

184 f"🔄 Chunking progress: {completed_docs}/{len(documents)} documents, {chunk_count} chunks generated" 

185 ) 

186 

187 except Exception as e: 

188 logger.error(f"❌ Error processing chunking task: {e}") 

189 completed_docs += 1 

190 

191 logger.info( 

192 f"✅ Chunking completed: {completed_docs}/{len(documents)} documents processed, {chunk_count} total chunks" 

193 ) 

194 

195 except asyncio.CancelledError: 

196 logger.debug("ChunkingWorker cancelled") 

197 raise 

198 finally: 

199 logger.debug("ChunkingWorker exited") 

200 

201 def _calculate_adaptive_timeout(self, document: Document) -> float: 

202 """Calculate adaptive timeout based on document characteristics. 

203 

204 Args: 

205 document: The document to calculate timeout for 

206 

207 Returns: 

208 Timeout in seconds 

209 """ 

210 doc_size = len(document.content) 

211 

212 # More generous base timeouts to reduce false positives 

213 if doc_size < 1_000: # Very small files (< 1KB) 

214 base_timeout = 30.0 # Increased from 10.0 

215 elif doc_size < 10_000: # Small files (< 10KB) 

216 base_timeout = 60.0 # Increased from 20.0 

217 elif doc_size < 50_000: # Medium files (10-50KB) 

218 base_timeout = 120.0 # Increased from 60.0 

219 elif doc_size < 100_000: # Large files (50-100KB) 

220 base_timeout = 240.0 # Increased from 120.0 

221 else: # Very large files (> 100KB) 

222 base_timeout = 360.0 # Increased from 180.0 

223 

224 # Special handling for HTML files which can have complex structures 

225 if document.content_type and document.content_type.lower() == "html": 

226 base_timeout *= 1.5 # Give HTML files 50% more time 

227 

228 # Special handling for converted files which often have complex markdown 

229 if hasattr(document, 'metadata') and document.metadata.get('conversion_method'): 

230 base_timeout *= 1.5 # Give converted files 50% more time 

231 

232 # Additional scaling factors 

233 size_factor = min(doc_size / 50000, 4.0) # Up to 4x for very large files 

234 

235 # Final adaptive timeout 

236 adaptive_timeout = base_timeout * (1 + size_factor) 

237 

238 # Increased maximum timeout to handle complex documents 

239 return min(adaptive_timeout, 600.0) # 10 minute maximum (increased from 5 minutes)