Coverage for src/qdrant_loader/core/pipeline/workers/chunking_worker.py: 94%
112 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Chunking worker for processing documents into chunks."""
3import asyncio
4import concurrent.futures
5from collections.abc import AsyncIterator
7import psutil
9from qdrant_loader.core.chunking.chunking_service import ChunkingService
10from qdrant_loader.core.document import Document
11from qdrant_loader.core.monitoring import prometheus_metrics
12from qdrant_loader.utils.logging import LoggingConfig
14from .base_worker import BaseWorker
16logger = LoggingConfig.get_logger(__name__)
19class ChunkingWorker(BaseWorker):
20 """Handles document chunking with controlled concurrency."""
22 def __init__(
23 self,
24 chunking_service: ChunkingService,
25 chunk_executor: concurrent.futures.ThreadPoolExecutor,
26 max_workers: int = 10,
27 queue_size: int = 1000,
28 shutdown_event: asyncio.Event | None = None,
29 ):
30 super().__init__(max_workers, queue_size)
31 self.chunking_service = chunking_service
32 self.chunk_executor = chunk_executor
33 self.shutdown_event = shutdown_event or asyncio.Event()
35 async def process(self, document: Document) -> list:
36 """Process a single document into chunks.
38 Args:
39 document: The document to chunk
41 Returns:
42 List of chunks
43 """
44 logger.debug(f"Chunker_worker started for doc {document.id}")
46 try:
47 # Check for shutdown signal
48 if self.shutdown_event.is_set():
49 logger.debug(f"Chunker_worker {document.id} exiting due to shutdown")
50 return []
52 # Update metrics
53 prometheus_metrics.CPU_USAGE.set(psutil.cpu_percent())
54 prometheus_metrics.MEMORY_USAGE.set(psutil.virtual_memory().percent)
56 # Run chunking in a thread pool for true parallelism
57 with prometheus_metrics.CHUNKING_DURATION.time():
58 # Calculate adaptive timeout based on document size
59 adaptive_timeout = self._calculate_adaptive_timeout(document)
61 # Log timeout decision for debugging
62 logger.debug(
63 f"Adaptive timeout for {document.url}: {adaptive_timeout:.1f}s "
64 f"(size: {len(document.content)} bytes)"
65 )
67 # Add timeout to prevent hanging on chunking
68 chunks = await asyncio.wait_for(
69 asyncio.get_running_loop().run_in_executor(
70 self.chunk_executor,
71 self.chunking_service.chunk_document,
72 document,
73 ),
74 timeout=adaptive_timeout,
75 )
77 # Check for shutdown before returning chunks
78 if self.shutdown_event.is_set():
79 logger.debug(
80 f"Chunker_worker {document.id} exiting due to shutdown after chunking"
81 )
82 return []
84 # Add document reference to chunk for later state tracking
85 for chunk in chunks:
86 chunk.metadata["parent_document"] = document
88 logger.debug(f"Chunked doc {document.id} into {len(chunks)} chunks")
89 return chunks
91 except asyncio.CancelledError:
92 logger.debug(f"Chunker_worker {document.id} cancelled")
93 raise
94 except TimeoutError:
95 # Provide more detailed timeout information
96 doc_size = len(document.content)
97 timeout_used = self._calculate_adaptive_timeout(document)
98 logger.error(
99 f"Chunking timed out for document '{document.url}' "
100 f"(size: {doc_size:,} bytes, timeout: {timeout_used:.1f}s). "
101 f"Consider increasing chunking timeout or checking document complexity."
102 )
103 raise
104 except Exception as e:
105 logger.error(f"Chunking failed for doc {document.url}: {e}")
106 raise
108 async def process_documents(self, documents: list[Document]) -> AsyncIterator:
109 """Process documents into chunks.
111 Args:
112 documents: List of documents to process
114 Yields:
115 Chunks from processed documents
116 """
117 logger.debug("ChunkingWorker started")
118 logger.info(f"🔄 Processing {len(documents)} documents for chunking...")
120 try:
121 # Process documents with controlled concurrency but stream results
122 semaphore = asyncio.Semaphore(self.max_workers)
124 async def process_and_yield(doc, doc_index):
125 """Process a single document and yield its chunks."""
126 try:
127 async with semaphore:
128 if self.shutdown_event.is_set():
129 logger.debug(
130 f"ChunkingWorker exiting due to shutdown (doc {doc_index})"
131 )
132 return
134 logger.debug(
135 f"🔄 Processing document {doc_index + 1}/{len(documents)}: {doc.id}"
136 )
137 chunks = await self.process(doc)
139 if chunks:
140 logger.debug(
141 f"✓ Document {doc_index + 1}/{len(documents)} produced {len(chunks)} chunks"
142 )
143 return chunks
144 else:
145 logger.debug(
146 f"⚠️ Document {doc_index + 1}/{len(documents)} produced no chunks"
147 )
148 return []
150 except Exception as e:
151 logger.error(
152 f"❌ Chunking failed for document {doc_index + 1}/{len(documents)} ({doc.id}): {e}"
153 )
154 return []
156 # Create tasks for all documents
157 tasks = [process_and_yield(doc, i) for i, doc in enumerate(documents)]
159 # Process tasks as they complete and yield chunks immediately
160 chunk_count = 0
161 completed_docs = 0
163 for coro in asyncio.as_completed(tasks):
164 if self.shutdown_event.is_set():
165 logger.debug("ChunkingWorker exiting due to shutdown")
166 break
168 try:
169 chunks = await coro
170 completed_docs += 1
172 if chunks:
173 for chunk in chunks:
174 if not self.shutdown_event.is_set():
175 chunk_count += 1
176 yield chunk
177 else:
178 logger.debug("ChunkingWorker exiting due to shutdown")
179 return
181 # Log progress every 10 documents or at completion
182 if completed_docs % 10 == 0 or completed_docs == len(documents):
183 logger.info(
184 f"🔄 Chunking progress: {completed_docs}/{len(documents)} documents, {chunk_count} chunks generated"
185 )
187 except Exception as e:
188 logger.error(f"❌ Error processing chunking task: {e}")
189 completed_docs += 1
191 logger.info(
192 f"✅ Chunking completed: {completed_docs}/{len(documents)} documents processed, {chunk_count} total chunks"
193 )
195 except asyncio.CancelledError:
196 logger.debug("ChunkingWorker cancelled")
197 raise
198 finally:
199 logger.debug("ChunkingWorker exited")
201 def _calculate_adaptive_timeout(self, document: Document) -> float:
202 """Calculate adaptive timeout based on document characteristics.
204 Args:
205 document: The document to calculate timeout for
207 Returns:
208 Timeout in seconds
209 """
210 doc_size = len(document.content)
212 # More generous base timeouts to reduce false positives
213 if doc_size < 1_000: # Very small files (< 1KB)
214 base_timeout = 30.0 # Increased from 10.0
215 elif doc_size < 10_000: # Small files (< 10KB)
216 base_timeout = 60.0 # Increased from 20.0
217 elif doc_size < 50_000: # Medium files (10-50KB)
218 base_timeout = 120.0 # Increased from 60.0
219 elif doc_size < 100_000: # Large files (50-100KB)
220 base_timeout = 240.0 # Increased from 120.0
221 else: # Very large files (> 100KB)
222 base_timeout = 360.0 # Increased from 180.0
224 # Special handling for HTML files which can have complex structures
225 if document.content_type and document.content_type.lower() == "html":
226 base_timeout *= 1.5 # Give HTML files 50% more time
228 # Special handling for converted files which often have complex markdown
229 if hasattr(document, 'metadata') and document.metadata.get('conversion_method'):
230 base_timeout *= 1.5 # Give converted files 50% more time
232 # Additional scaling factors
233 size_factor = min(doc_size / 50000, 4.0) # Up to 4x for very large files
235 # Final adaptive timeout
236 adaptive_timeout = base_timeout * (1 + size_factor)
238 # Increased maximum timeout to handle complex documents
239 return min(adaptive_timeout, 600.0) # 10 minute maximum (increased from 5 minutes)