Coverage for src/qdrant_loader/core/pipeline/workers/chunking_worker.py: 94%
108 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Chunking worker for processing documents into chunks."""
3import asyncio
4import concurrent.futures
5from collections.abc import AsyncIterator
7import psutil
9from qdrant_loader.core.chunking.chunking_service import ChunkingService
10from qdrant_loader.core.document import Document
11from qdrant_loader.core.monitoring import prometheus_metrics
12from qdrant_loader.utils.logging import LoggingConfig
14from .base_worker import BaseWorker
16logger = LoggingConfig.get_logger(__name__)
19class ChunkingWorker(BaseWorker):
20 """Handles document chunking with controlled concurrency."""
22 def __init__(
23 self,
24 chunking_service: ChunkingService,
25 chunk_executor: concurrent.futures.ThreadPoolExecutor,
26 max_workers: int = 10,
27 queue_size: int = 1000,
28 shutdown_event: asyncio.Event | None = None,
29 ):
30 super().__init__(max_workers, queue_size)
31 self.chunking_service = chunking_service
32 self.chunk_executor = chunk_executor
33 self.shutdown_event = shutdown_event or asyncio.Event()
35 async def process(self, document: Document) -> list:
36 """Process a single document into chunks.
38 Args:
39 document: The document to chunk
41 Returns:
42 List of chunks
43 """
44 logger.debug(f"Chunker_worker started for doc {document.id}")
46 try:
47 # Check for shutdown signal
48 if self.shutdown_event.is_set():
49 logger.debug(f"Chunker_worker {document.id} exiting due to shutdown")
50 return []
52 # Update metrics
53 prometheus_metrics.CPU_USAGE.set(psutil.cpu_percent())
54 prometheus_metrics.MEMORY_USAGE.set(psutil.virtual_memory().percent)
56 # Run chunking in a thread pool for true parallelism
57 with prometheus_metrics.CHUNKING_DURATION.time():
58 # Calculate adaptive timeout based on document size
59 adaptive_timeout = self._calculate_adaptive_timeout(document)
61 # Log timeout decision for debugging
62 logger.debug(
63 f"Adaptive timeout for {document.url}: {adaptive_timeout:.1f}s "
64 f"(size: {len(document.content)} bytes)"
65 )
67 # Add timeout to prevent hanging on chunking
68 chunks = await asyncio.wait_for(
69 asyncio.get_running_loop().run_in_executor(
70 self.chunk_executor,
71 self.chunking_service.chunk_document,
72 document,
73 ),
74 timeout=adaptive_timeout,
75 )
77 # Check for shutdown before returning chunks
78 if self.shutdown_event.is_set():
79 logger.debug(
80 f"Chunker_worker {document.id} exiting due to shutdown after chunking"
81 )
82 return []
84 # Add document reference to chunk for later state tracking
85 for chunk in chunks:
86 chunk.metadata["parent_document"] = document
88 logger.debug(f"Chunked doc {document.id} into {len(chunks)} chunks")
89 return chunks
91 except asyncio.CancelledError:
92 logger.debug(f"Chunker_worker {document.id} cancelled")
93 raise
94 except TimeoutError:
95 logger.error(f"Chunking timed out for doc {document.url}")
96 raise
97 except Exception as e:
98 logger.error(f"Chunking failed for doc {document.url}: {e}")
99 raise
101 async def process_documents(self, documents: list[Document]) -> AsyncIterator:
102 """Process documents into chunks.
104 Args:
105 documents: List of documents to process
107 Yields:
108 Chunks from processed documents
109 """
110 logger.debug("ChunkingWorker started")
111 logger.info(f"🔄 Processing {len(documents)} documents for chunking...")
113 try:
114 # Process documents with controlled concurrency but stream results
115 semaphore = asyncio.Semaphore(self.max_workers)
117 async def process_and_yield(doc, doc_index):
118 """Process a single document and yield its chunks."""
119 try:
120 async with semaphore:
121 if self.shutdown_event.is_set():
122 logger.debug(
123 f"ChunkingWorker exiting due to shutdown (doc {doc_index})"
124 )
125 return
127 logger.debug(
128 f"🔄 Processing document {doc_index + 1}/{len(documents)}: {doc.id}"
129 )
130 chunks = await self.process(doc)
132 if chunks:
133 logger.debug(
134 f"✓ Document {doc_index + 1}/{len(documents)} produced {len(chunks)} chunks"
135 )
136 return chunks
137 else:
138 logger.debug(
139 f"⚠️ Document {doc_index + 1}/{len(documents)} produced no chunks"
140 )
141 return []
143 except Exception as e:
144 logger.error(
145 f"❌ Chunking failed for document {doc_index + 1}/{len(documents)} ({doc.id}): {e}"
146 )
147 return []
149 # Create tasks for all documents
150 tasks = [process_and_yield(doc, i) for i, doc in enumerate(documents)]
152 # Process tasks as they complete and yield chunks immediately
153 chunk_count = 0
154 completed_docs = 0
156 for coro in asyncio.as_completed(tasks):
157 if self.shutdown_event.is_set():
158 logger.debug("ChunkingWorker exiting due to shutdown")
159 break
161 try:
162 chunks = await coro
163 completed_docs += 1
165 if chunks:
166 for chunk in chunks:
167 if not self.shutdown_event.is_set():
168 chunk_count += 1
169 yield chunk
170 else:
171 logger.debug("ChunkingWorker exiting due to shutdown")
172 return
174 # Log progress every 10 documents or at completion
175 if completed_docs % 10 == 0 or completed_docs == len(documents):
176 logger.info(
177 f"🔄 Chunking progress: {completed_docs}/{len(documents)} documents, {chunk_count} chunks generated"
178 )
180 except Exception as e:
181 logger.error(f"❌ Error processing chunking task: {e}")
182 completed_docs += 1
184 logger.info(
185 f"✅ Chunking completed: {completed_docs}/{len(documents)} documents processed, {chunk_count} total chunks"
186 )
188 except asyncio.CancelledError:
189 logger.debug("ChunkingWorker cancelled")
190 raise
191 finally:
192 logger.debug("ChunkingWorker exited")
194 def _calculate_adaptive_timeout(self, document: Document) -> float:
195 """Calculate adaptive timeout based on document characteristics.
197 Args:
198 document: The document to calculate timeout for
200 Returns:
201 Timeout in seconds
202 """
203 doc_size = len(document.content)
205 # Universal scaling based on document characteristics
206 if doc_size < 1_000: # Very small files (< 1KB)
207 base_timeout = 10.0
208 elif doc_size < 10_000: # Small files (< 10KB)
209 base_timeout = 20.0
210 elif doc_size < 50_000: # Medium files (10-50KB)
211 base_timeout = 60.0
212 elif doc_size < 100_000: # Large files (50-100KB)
213 base_timeout = 120.0
214 else: # Very large files (> 100KB)
215 base_timeout = 180.0
217 # Special handling for HTML files which can have complex structures
218 if document.content_type and document.content_type.lower() == "html":
219 base_timeout *= 1.5 # Give HTML files 50% more time
221 # Additional scaling factors
222 size_factor = min(doc_size / 50000, 4.0) # Up to 4x for very large files
224 # Final adaptive timeout
225 adaptive_timeout = base_timeout * (1 + size_factor)
227 # Cap maximum timeout to prevent indefinite hanging
228 return min(adaptive_timeout, 300.0) # 5 minute maximum