Coverage for src/qdrant_loader/core/pipeline/workers/chunking_worker.py: 94%

108 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Chunking worker for processing documents into chunks.""" 

2 

3import asyncio 

4import concurrent.futures 

5from collections.abc import AsyncIterator 

6 

7import psutil 

8 

9from qdrant_loader.core.chunking.chunking_service import ChunkingService 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.core.monitoring import prometheus_metrics 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .base_worker import BaseWorker 

15 

16logger = LoggingConfig.get_logger(__name__) 

17 

18 

19class ChunkingWorker(BaseWorker): 

20 """Handles document chunking with controlled concurrency.""" 

21 

22 def __init__( 

23 self, 

24 chunking_service: ChunkingService, 

25 chunk_executor: concurrent.futures.ThreadPoolExecutor, 

26 max_workers: int = 10, 

27 queue_size: int = 1000, 

28 shutdown_event: asyncio.Event | None = None, 

29 ): 

30 super().__init__(max_workers, queue_size) 

31 self.chunking_service = chunking_service 

32 self.chunk_executor = chunk_executor 

33 self.shutdown_event = shutdown_event or asyncio.Event() 

34 

35 async def process(self, document: Document) -> list: 

36 """Process a single document into chunks. 

37 

38 Args: 

39 document: The document to chunk 

40 

41 Returns: 

42 List of chunks 

43 """ 

44 logger.debug(f"Chunker_worker started for doc {document.id}") 

45 

46 try: 

47 # Check for shutdown signal 

48 if self.shutdown_event.is_set(): 

49 logger.debug(f"Chunker_worker {document.id} exiting due to shutdown") 

50 return [] 

51 

52 # Update metrics 

53 prometheus_metrics.CPU_USAGE.set(psutil.cpu_percent()) 

54 prometheus_metrics.MEMORY_USAGE.set(psutil.virtual_memory().percent) 

55 

56 # Run chunking in a thread pool for true parallelism 

57 with prometheus_metrics.CHUNKING_DURATION.time(): 

58 # Calculate adaptive timeout based on document size 

59 adaptive_timeout = self._calculate_adaptive_timeout(document) 

60 

61 # Log timeout decision for debugging 

62 logger.debug( 

63 f"Adaptive timeout for {document.url}: {adaptive_timeout:.1f}s " 

64 f"(size: {len(document.content)} bytes)" 

65 ) 

66 

67 # Add timeout to prevent hanging on chunking 

68 chunks = await asyncio.wait_for( 

69 asyncio.get_running_loop().run_in_executor( 

70 self.chunk_executor, 

71 self.chunking_service.chunk_document, 

72 document, 

73 ), 

74 timeout=adaptive_timeout, 

75 ) 

76 

77 # Check for shutdown before returning chunks 

78 if self.shutdown_event.is_set(): 

79 logger.debug( 

80 f"Chunker_worker {document.id} exiting due to shutdown after chunking" 

81 ) 

82 return [] 

83 

84 # Add document reference to chunk for later state tracking 

85 for chunk in chunks: 

86 chunk.metadata["parent_document"] = document 

87 

88 logger.debug(f"Chunked doc {document.id} into {len(chunks)} chunks") 

89 return chunks 

90 

91 except asyncio.CancelledError: 

92 logger.debug(f"Chunker_worker {document.id} cancelled") 

93 raise 

94 except TimeoutError: 

95 logger.error(f"Chunking timed out for doc {document.url}") 

96 raise 

97 except Exception as e: 

98 logger.error(f"Chunking failed for doc {document.url}: {e}") 

99 raise 

100 

101 async def process_documents(self, documents: list[Document]) -> AsyncIterator: 

102 """Process documents into chunks. 

103 

104 Args: 

105 documents: List of documents to process 

106 

107 Yields: 

108 Chunks from processed documents 

109 """ 

110 logger.debug("ChunkingWorker started") 

111 logger.info(f"🔄 Processing {len(documents)} documents for chunking...") 

112 

113 try: 

114 # Process documents with controlled concurrency but stream results 

115 semaphore = asyncio.Semaphore(self.max_workers) 

116 

117 async def process_and_yield(doc, doc_index): 

118 """Process a single document and yield its chunks.""" 

119 try: 

120 async with semaphore: 

121 if self.shutdown_event.is_set(): 

122 logger.debug( 

123 f"ChunkingWorker exiting due to shutdown (doc {doc_index})" 

124 ) 

125 return 

126 

127 logger.debug( 

128 f"🔄 Processing document {doc_index + 1}/{len(documents)}: {doc.id}" 

129 ) 

130 chunks = await self.process(doc) 

131 

132 if chunks: 

133 logger.debug( 

134 f"✓ Document {doc_index + 1}/{len(documents)} produced {len(chunks)} chunks" 

135 ) 

136 return chunks 

137 else: 

138 logger.debug( 

139 f"⚠️ Document {doc_index + 1}/{len(documents)} produced no chunks" 

140 ) 

141 return [] 

142 

143 except Exception as e: 

144 logger.error( 

145 f"❌ Chunking failed for document {doc_index + 1}/{len(documents)} ({doc.id}): {e}" 

146 ) 

147 return [] 

148 

149 # Create tasks for all documents 

150 tasks = [process_and_yield(doc, i) for i, doc in enumerate(documents)] 

151 

152 # Process tasks as they complete and yield chunks immediately 

153 chunk_count = 0 

154 completed_docs = 0 

155 

156 for coro in asyncio.as_completed(tasks): 

157 if self.shutdown_event.is_set(): 

158 logger.debug("ChunkingWorker exiting due to shutdown") 

159 break 

160 

161 try: 

162 chunks = await coro 

163 completed_docs += 1 

164 

165 if chunks: 

166 for chunk in chunks: 

167 if not self.shutdown_event.is_set(): 

168 chunk_count += 1 

169 yield chunk 

170 else: 

171 logger.debug("ChunkingWorker exiting due to shutdown") 

172 return 

173 

174 # Log progress every 10 documents or at completion 

175 if completed_docs % 10 == 0 or completed_docs == len(documents): 

176 logger.info( 

177 f"🔄 Chunking progress: {completed_docs}/{len(documents)} documents, {chunk_count} chunks generated" 

178 ) 

179 

180 except Exception as e: 

181 logger.error(f"❌ Error processing chunking task: {e}") 

182 completed_docs += 1 

183 

184 logger.info( 

185 f"✅ Chunking completed: {completed_docs}/{len(documents)} documents processed, {chunk_count} total chunks" 

186 ) 

187 

188 except asyncio.CancelledError: 

189 logger.debug("ChunkingWorker cancelled") 

190 raise 

191 finally: 

192 logger.debug("ChunkingWorker exited") 

193 

194 def _calculate_adaptive_timeout(self, document: Document) -> float: 

195 """Calculate adaptive timeout based on document characteristics. 

196 

197 Args: 

198 document: The document to calculate timeout for 

199 

200 Returns: 

201 Timeout in seconds 

202 """ 

203 doc_size = len(document.content) 

204 

205 # Universal scaling based on document characteristics 

206 if doc_size < 1_000: # Very small files (< 1KB) 

207 base_timeout = 10.0 

208 elif doc_size < 10_000: # Small files (< 10KB) 

209 base_timeout = 20.0 

210 elif doc_size < 50_000: # Medium files (10-50KB) 

211 base_timeout = 60.0 

212 elif doc_size < 100_000: # Large files (50-100KB) 

213 base_timeout = 120.0 

214 else: # Very large files (> 100KB) 

215 base_timeout = 180.0 

216 

217 # Special handling for HTML files which can have complex structures 

218 if document.content_type and document.content_type.lower() == "html": 

219 base_timeout *= 1.5 # Give HTML files 50% more time 

220 

221 # Additional scaling factors 

222 size_factor = min(doc_size / 50000, 4.0) # Up to 4x for very large files 

223 

224 # Final adaptive timeout 

225 adaptive_timeout = base_timeout * (1 + size_factor) 

226 

227 # Cap maximum timeout to prevent indefinite hanging 

228 return min(adaptive_timeout, 300.0) # 5 minute maximum