Coverage for src / qdrant_loader / core / pipeline / workers / upsert_worker.py: 100%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Upsert worker for upserting embedded chunks to Qdrant."""
3import asyncio
4from collections import Counter
5from collections.abc import AsyncIterator
6from typing import Any
8from qdrant_client.http import models
10from qdrant_loader.core.monitoring import prometheus_metrics
11from qdrant_loader.core.qdrant_manager import QdrantManager
12from qdrant_loader.utils.logging import LoggingConfig
14from .base_worker import BaseWorker
16logger = LoggingConfig.get_logger(__name__)
19class PipelineResult:
20 """Result of pipeline processing."""
22 def __init__(self):
23 self.success_count: int = 0
24 self.error_count: int = 0
25 self.successfully_processed_documents: set[str] = set()
26 self.failed_document_ids: set[str] = set()
27 self.errors: list[str] = []
30class UpsertWorker(BaseWorker):
31 """Handles upserting embedded chunks to Qdrant."""
33 def __init__(
34 self,
35 qdrant_manager: QdrantManager,
36 batch_size: int,
37 max_workers: int = 4,
38 queue_size: int = 1000,
39 shutdown_event: asyncio.Event | None = None,
40 ):
41 super().__init__(max_workers, queue_size)
42 self.qdrant_manager = qdrant_manager
43 self.batch_size = batch_size
44 self.shutdown_event = shutdown_event or asyncio.Event()
46 def _handle_duplicate_chunk_ids(
47 self,
48 batch: list[tuple[Any, list[float]]],
49 batch_chunk_id_counts: Counter,
50 duplicate_chunk_ids: set[str],
51 same_batch_duplicates: set[str],
52 cross_batch_duplicates: set[str],
53 new_chunk_ids: set[str],
54 successful_doc_ids: set[str],
55 result: PipelineResult,
56 errors: list[str],
57 ) -> None:
58 """Handle duplicate chunk IDs and update result/error bookkeeping."""
59 if not duplicate_chunk_ids:
60 return
62 duplicate_doc_ids = set()
63 for chunk, _ in batch:
64 if str(chunk.id) in duplicate_chunk_ids:
65 parent_doc = chunk.metadata.get("parent_document")
66 if parent_doc:
67 duplicate_doc_ids.add(parent_doc.id)
69 successful_doc_ids -= duplicate_doc_ids
70 result.successfully_processed_documents -= duplicate_doc_ids
72 same_batch_duplicate_occurrences = sum(
73 count - 1 for count in batch_chunk_id_counts.values() if count > 1
74 )
75 total_duplicate_impact = len(duplicate_doc_ids)
76 duplicate_chunk_attempts = len(batch) - len(new_chunk_ids)
78 logger.warning(
79 "Detected chunk ID collisions during upsert; existing points will be overwritten",
80 duplicate_count=len(duplicate_chunk_ids),
81 same_batch_duplicate_count=len(same_batch_duplicates),
82 same_batch_duplicate_occurrences=same_batch_duplicate_occurrences,
83 cross_batch_duplicate_count=len(cross_batch_duplicates),
84 affected_documents=total_duplicate_impact,
85 )
86 errors.append(
87 "Detected duplicate chunk IDs during upsert: "
88 f"{len(cross_batch_duplicates)} cross-batch IDs and "
89 f"{same_batch_duplicate_occurrences} same-batch duplicate occurrences "
90 f"across {len(same_batch_duplicates)} IDs affecting {total_duplicate_impact} document(s): "
91 f"{sorted(duplicate_doc_ids)}"
92 )
93 result.error_count += duplicate_chunk_attempts
95 async def process(
96 self, batch: list[tuple[Any, list[float]]]
97 ) -> tuple[int, int, set[str], list[str]]:
98 """Process a batch of embedded chunks.
100 Args:
101 batch: List of (chunk, embedding) tuples
103 Returns:
104 Tuple of (success_count, error_count, successful_doc_ids, errors)
105 """
106 if not batch:
107 return 0, 0, set(), []
109 success_count = 0
110 error_count = 0
111 successful_doc_ids = set()
112 errors = []
114 try:
115 with prometheus_metrics.UPSERT_DURATION.time():
116 # QdrantManager.build_point_vector owns the dense / dense+sparse
117 # decision and has its own dense-only fallback on encode failure,
118 # so no defensive wrapper is needed here.
119 points = [
120 models.PointStruct(
121 id=chunk.id,
122 vector=self.qdrant_manager.build_point_vector(
123 embedding, chunk.content
124 ),
125 payload={
126 "content": chunk.content,
127 "contextual_content": chunk.contextual_content,
128 "metadata": {
129 k: v
130 for k, v in chunk.metadata.items()
131 if k != "parent_document"
132 },
133 "source": chunk.source,
134 "source_type": chunk.source_type,
135 "created_at": chunk.created_at.isoformat(),
136 "updated_at": (
137 getattr(
138 chunk, "updated_at", chunk.created_at
139 ).isoformat()
140 if hasattr(chunk, "updated_at")
141 else chunk.created_at.isoformat()
142 ),
143 "title": getattr(
144 chunk, "title", chunk.metadata.get("title", "")
145 ),
146 "url": getattr(chunk, "url", chunk.metadata.get("url", "")),
147 "document_id": chunk.metadata.get(
148 "parent_document_id", chunk.id
149 ),
150 },
151 )
152 for chunk, embedding in batch
153 ]
155 await self.qdrant_manager.upsert_points(points)
156 prometheus_metrics.INGESTED_DOCUMENTS.inc(len(points))
157 success_count = len(points)
159 # Mark parent documents as successfully processed
160 for chunk, _ in batch:
161 parent_doc = chunk.metadata.get("parent_document")
162 if parent_doc:
163 successful_doc_ids.add(parent_doc.id)
165 except Exception as e:
166 for chunk, _ in batch:
167 logger.error(f"Upsert failed for chunk {chunk.id}: {e}")
168 # Mark parent document as failed
169 parent_doc = chunk.metadata.get("parent_document")
170 if parent_doc:
171 successful_doc_ids.discard(parent_doc.id) # Remove if it was added
172 errors.append(f"Upsert failed for chunk {chunk.id}: {e}")
173 error_count = len(batch)
175 return success_count, error_count, successful_doc_ids, errors
177 async def process_embedded_chunks(
178 self, embedded_chunks: AsyncIterator[tuple[Any, list[float]]]
179 ) -> PipelineResult:
180 """Upsert embedded chunks to Qdrant.
182 Args:
183 embedded_chunks: AsyncIterator of (chunk, embedding) tuples
185 Returns:
186 PipelineResult with processing statistics
187 """
188 logger.debug("UpsertWorker started")
189 result = PipelineResult()
190 batch = []
191 seen_chunk_ids: set[str] = set()
193 try:
194 async for chunk_embedding in embedded_chunks:
195 if self.shutdown_event.is_set():
196 logger.debug("UpsertWorker exiting due to shutdown")
197 break
199 batch.append(chunk_embedding)
201 # Process batch when it reaches the desired size
202 if len(batch) >= self.batch_size:
203 batch_chunk_id_list = [str(chunk.id) for chunk, _ in batch]
204 batch_chunk_ids = set(batch_chunk_id_list)
205 batch_chunk_id_counts = Counter(batch_chunk_id_list)
206 success_count, error_count, successful_doc_ids, errors = (
207 await self.process(batch)
208 )
210 if success_count > 0:
211 same_batch_duplicates = {
212 chunk_id
213 for chunk_id, count in batch_chunk_id_counts.items()
214 if count > 1
215 }
216 cross_batch_duplicates = batch_chunk_ids & seen_chunk_ids
217 duplicate_chunk_ids = (
218 cross_batch_duplicates | same_batch_duplicates
219 )
220 new_chunk_ids = (
221 batch_chunk_ids - seen_chunk_ids - same_batch_duplicates
222 )
224 self._handle_duplicate_chunk_ids(
225 batch=batch,
226 batch_chunk_id_counts=batch_chunk_id_counts,
227 duplicate_chunk_ids=duplicate_chunk_ids,
228 same_batch_duplicates=same_batch_duplicates,
229 cross_batch_duplicates=cross_batch_duplicates,
230 new_chunk_ids=new_chunk_ids,
231 successful_doc_ids=successful_doc_ids,
232 result=result,
233 errors=errors,
234 )
236 # Only update seen_chunk_ids with non-duplicate IDs
237 seen_chunk_ids.update(new_chunk_ids)
238 result.success_count += len(new_chunk_ids)
240 result.error_count += error_count
241 result.successfully_processed_documents.update(successful_doc_ids)
242 result.errors.extend(errors)
243 batch = []
245 # Process any remaining chunks in the final batch
246 if batch and not self.shutdown_event.is_set():
247 batch_chunk_id_list = [str(chunk.id) for chunk, _ in batch]
248 batch_chunk_ids = set(batch_chunk_id_list)
249 batch_chunk_id_counts = Counter(batch_chunk_id_list)
250 success_count, error_count, successful_doc_ids, errors = (
251 await self.process(batch)
252 )
254 if success_count > 0:
255 same_batch_duplicates = {
256 chunk_id
257 for chunk_id, count in batch_chunk_id_counts.items()
258 if count > 1
259 }
260 cross_batch_duplicates = batch_chunk_ids & seen_chunk_ids
261 duplicate_chunk_ids = cross_batch_duplicates | same_batch_duplicates
262 new_chunk_ids = (
263 batch_chunk_ids - seen_chunk_ids - same_batch_duplicates
264 )
266 self._handle_duplicate_chunk_ids(
267 batch=batch,
268 batch_chunk_id_counts=batch_chunk_id_counts,
269 duplicate_chunk_ids=duplicate_chunk_ids,
270 same_batch_duplicates=same_batch_duplicates,
271 cross_batch_duplicates=cross_batch_duplicates,
272 new_chunk_ids=new_chunk_ids,
273 successful_doc_ids=successful_doc_ids,
274 result=result,
275 errors=errors,
276 )
278 # Only update seen_chunk_ids with non-duplicate IDs
279 seen_chunk_ids.update(new_chunk_ids)
280 result.success_count += len(new_chunk_ids)
282 result.error_count += error_count
283 result.successfully_processed_documents.update(successful_doc_ids)
284 result.errors.extend(errors)
286 except asyncio.CancelledError:
287 logger.debug("UpsertWorker cancelled")
288 raise
289 finally:
290 logger.debug("UpsertWorker exited")
292 return result