Coverage for src/qdrant_loader/core/pipeline/workers/upsert_worker.py: 100%
76 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Upsert worker for upserting embedded chunks to Qdrant."""
3import asyncio
4from collections.abc import AsyncIterator
5from typing import Any
7from qdrant_client.http import models
9from qdrant_loader.core.monitoring import prometheus_metrics
10from qdrant_loader.core.qdrant_manager import QdrantManager
11from qdrant_loader.utils.logging import LoggingConfig
13from .base_worker import BaseWorker
15logger = LoggingConfig.get_logger(__name__)
18class PipelineResult:
19 """Result of pipeline processing."""
21 def __init__(self):
22 self.success_count: int = 0
23 self.error_count: int = 0
24 self.successfully_processed_documents: set[str] = set()
25 self.failed_document_ids: set[str] = set()
26 self.errors: list[str] = []
29class UpsertWorker(BaseWorker):
30 """Handles upserting embedded chunks to Qdrant."""
32 def __init__(
33 self,
34 qdrant_manager: QdrantManager,
35 batch_size: int,
36 max_workers: int = 4,
37 queue_size: int = 1000,
38 shutdown_event: asyncio.Event | None = None,
39 ):
40 super().__init__(max_workers, queue_size)
41 self.qdrant_manager = qdrant_manager
42 self.batch_size = batch_size
43 self.shutdown_event = shutdown_event or asyncio.Event()
45 async def process(
46 self, batch: list[tuple[Any, list[float]]]
47 ) -> tuple[int, int, set[str], list[str]]:
48 """Process a batch of embedded chunks.
50 Args:
51 batch: List of (chunk, embedding) tuples
53 Returns:
54 Tuple of (success_count, error_count, successful_doc_ids, errors)
55 """
56 if not batch:
57 return 0, 0, set(), []
59 success_count = 0
60 error_count = 0
61 successful_doc_ids = set()
62 errors = []
64 try:
65 with prometheus_metrics.UPSERT_DURATION.time():
66 points = [
67 models.PointStruct(
68 id=chunk.id,
69 vector=embedding,
70 payload={
71 "content": chunk.content,
72 "metadata": {
73 k: v
74 for k, v in chunk.metadata.items()
75 if k != "parent_document"
76 },
77 "source": chunk.source,
78 "source_type": chunk.source_type,
79 "created_at": chunk.created_at.isoformat(),
80 "updated_at": (
81 getattr(
82 chunk, "updated_at", chunk.created_at
83 ).isoformat()
84 if hasattr(chunk, "updated_at")
85 else chunk.created_at.isoformat()
86 ),
87 "title": getattr(
88 chunk, "title", chunk.metadata.get("title", "")
89 ),
90 "url": getattr(chunk, "url", chunk.metadata.get("url", "")),
91 "document_id": chunk.metadata.get(
92 "parent_document_id", chunk.id
93 ),
94 },
95 )
96 for chunk, embedding in batch
97 ]
99 await self.qdrant_manager.upsert_points(points)
100 prometheus_metrics.INGESTED_DOCUMENTS.inc(len(points))
101 success_count = len(points)
103 # Mark parent documents as successfully processed
104 for chunk, _ in batch:
105 parent_doc = chunk.metadata.get("parent_document")
106 if parent_doc:
107 successful_doc_ids.add(parent_doc.id)
109 except Exception as e:
110 for chunk, _ in batch:
111 logger.error(f"Upsert failed for chunk {chunk.id}: {e}")
112 # Mark parent document as failed
113 parent_doc = chunk.metadata.get("parent_document")
114 if parent_doc:
115 successful_doc_ids.discard(parent_doc.id) # Remove if it was added
116 errors.append(f"Upsert failed for chunk {chunk.id}: {e}")
117 error_count = len(batch)
119 return success_count, error_count, successful_doc_ids, errors
121 async def process_embedded_chunks(
122 self, embedded_chunks: AsyncIterator[tuple[Any, list[float]]]
123 ) -> PipelineResult:
124 """Upsert embedded chunks to Qdrant.
126 Args:
127 embedded_chunks: AsyncIterator of (chunk, embedding) tuples
129 Returns:
130 PipelineResult with processing statistics
131 """
132 logger.debug("UpsertWorker started")
133 result = PipelineResult()
134 batch = []
136 try:
137 async for chunk_embedding in embedded_chunks:
138 if self.shutdown_event.is_set():
139 logger.debug("UpsertWorker exiting due to shutdown")
140 break
142 batch.append(chunk_embedding)
144 # Process batch when it reaches the desired size
145 if len(batch) >= self.batch_size:
146 success_count, error_count, successful_doc_ids, errors = (
147 await self.process(batch)
148 )
149 result.success_count += success_count
150 result.error_count += error_count
151 result.successfully_processed_documents.update(successful_doc_ids)
152 result.errors.extend(errors)
153 batch = []
155 # Process any remaining chunks in the final batch
156 if batch and not self.shutdown_event.is_set():
157 success_count, error_count, successful_doc_ids, errors = (
158 await self.process(batch)
159 )
160 result.success_count += success_count
161 result.error_count += error_count
162 result.successfully_processed_documents.update(successful_doc_ids)
163 result.errors.extend(errors)
165 except asyncio.CancelledError:
166 logger.debug("UpsertWorker cancelled")
167 raise
168 finally:
169 logger.debug("UpsertWorker exited")
171 return result