Coverage for src / qdrant_loader / core / pipeline / workers / upsert_worker.py: 100%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Upsert worker for upserting embedded chunks to Qdrant.""" 

2 

3import asyncio 

4from collections import Counter 

5from collections.abc import AsyncIterator 

6from typing import Any 

7 

8from qdrant_client.http import models 

9 

10from qdrant_loader.core.monitoring import prometheus_metrics 

11from qdrant_loader.core.qdrant_manager import QdrantManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .base_worker import BaseWorker 

15 

16logger = LoggingConfig.get_logger(__name__) 

17 

18 

19class PipelineResult: 

20 """Result of pipeline processing.""" 

21 

22 def __init__(self): 

23 self.success_count: int = 0 

24 self.error_count: int = 0 

25 self.successfully_processed_documents: set[str] = set() 

26 self.failed_document_ids: set[str] = set() 

27 self.errors: list[str] = [] 

28 

29 

30class UpsertWorker(BaseWorker): 

31 """Handles upserting embedded chunks to Qdrant.""" 

32 

33 def __init__( 

34 self, 

35 qdrant_manager: QdrantManager, 

36 batch_size: int, 

37 max_workers: int = 4, 

38 queue_size: int = 1000, 

39 shutdown_event: asyncio.Event | None = None, 

40 ): 

41 super().__init__(max_workers, queue_size) 

42 self.qdrant_manager = qdrant_manager 

43 self.batch_size = batch_size 

44 self.shutdown_event = shutdown_event or asyncio.Event() 

45 

46 def _handle_duplicate_chunk_ids( 

47 self, 

48 batch: list[tuple[Any, list[float]]], 

49 batch_chunk_id_counts: Counter, 

50 duplicate_chunk_ids: set[str], 

51 same_batch_duplicates: set[str], 

52 cross_batch_duplicates: set[str], 

53 new_chunk_ids: set[str], 

54 successful_doc_ids: set[str], 

55 result: PipelineResult, 

56 errors: list[str], 

57 ) -> None: 

58 """Handle duplicate chunk IDs and update result/error bookkeeping.""" 

59 if not duplicate_chunk_ids: 

60 return 

61 

62 duplicate_doc_ids = set() 

63 for chunk, _ in batch: 

64 if str(chunk.id) in duplicate_chunk_ids: 

65 parent_doc = chunk.metadata.get("parent_document") 

66 if parent_doc: 

67 duplicate_doc_ids.add(parent_doc.id) 

68 

69 successful_doc_ids -= duplicate_doc_ids 

70 result.successfully_processed_documents -= duplicate_doc_ids 

71 

72 same_batch_duplicate_occurrences = sum( 

73 count - 1 for count in batch_chunk_id_counts.values() if count > 1 

74 ) 

75 total_duplicate_impact = len(duplicate_doc_ids) 

76 duplicate_chunk_attempts = len(batch) - len(new_chunk_ids) 

77 

78 logger.warning( 

79 "Detected chunk ID collisions during upsert; existing points will be overwritten", 

80 duplicate_count=len(duplicate_chunk_ids), 

81 same_batch_duplicate_count=len(same_batch_duplicates), 

82 same_batch_duplicate_occurrences=same_batch_duplicate_occurrences, 

83 cross_batch_duplicate_count=len(cross_batch_duplicates), 

84 affected_documents=total_duplicate_impact, 

85 ) 

86 errors.append( 

87 "Detected duplicate chunk IDs during upsert: " 

88 f"{len(cross_batch_duplicates)} cross-batch IDs and " 

89 f"{same_batch_duplicate_occurrences} same-batch duplicate occurrences " 

90 f"across {len(same_batch_duplicates)} IDs affecting {total_duplicate_impact} document(s): " 

91 f"{sorted(duplicate_doc_ids)}" 

92 ) 

93 result.error_count += duplicate_chunk_attempts 

94 

95 async def process( 

96 self, batch: list[tuple[Any, list[float]]] 

97 ) -> tuple[int, int, set[str], list[str]]: 

98 """Process a batch of embedded chunks. 

99 

100 Args: 

101 batch: List of (chunk, embedding) tuples 

102 

103 Returns: 

104 Tuple of (success_count, error_count, successful_doc_ids, errors) 

105 """ 

106 if not batch: 

107 return 0, 0, set(), [] 

108 

109 success_count = 0 

110 error_count = 0 

111 successful_doc_ids = set() 

112 errors = [] 

113 

114 try: 

115 with prometheus_metrics.UPSERT_DURATION.time(): 

116 # QdrantManager.build_point_vector owns the dense / dense+sparse 

117 # decision and has its own dense-only fallback on encode failure, 

118 # so no defensive wrapper is needed here. 

119 points = [ 

120 models.PointStruct( 

121 id=chunk.id, 

122 vector=self.qdrant_manager.build_point_vector( 

123 embedding, chunk.content 

124 ), 

125 payload={ 

126 "content": chunk.content, 

127 "contextual_content": chunk.contextual_content, 

128 "metadata": { 

129 k: v 

130 for k, v in chunk.metadata.items() 

131 if k != "parent_document" 

132 }, 

133 "source": chunk.source, 

134 "source_type": chunk.source_type, 

135 "created_at": chunk.created_at.isoformat(), 

136 "updated_at": ( 

137 getattr( 

138 chunk, "updated_at", chunk.created_at 

139 ).isoformat() 

140 if hasattr(chunk, "updated_at") 

141 else chunk.created_at.isoformat() 

142 ), 

143 "title": getattr( 

144 chunk, "title", chunk.metadata.get("title", "") 

145 ), 

146 "url": getattr(chunk, "url", chunk.metadata.get("url", "")), 

147 "document_id": chunk.metadata.get( 

148 "parent_document_id", chunk.id 

149 ), 

150 }, 

151 ) 

152 for chunk, embedding in batch 

153 ] 

154 

155 await self.qdrant_manager.upsert_points(points) 

156 prometheus_metrics.INGESTED_DOCUMENTS.inc(len(points)) 

157 success_count = len(points) 

158 

159 # Mark parent documents as successfully processed 

160 for chunk, _ in batch: 

161 parent_doc = chunk.metadata.get("parent_document") 

162 if parent_doc: 

163 successful_doc_ids.add(parent_doc.id) 

164 

165 except Exception as e: 

166 for chunk, _ in batch: 

167 logger.error(f"Upsert failed for chunk {chunk.id}: {e}") 

168 # Mark parent document as failed 

169 parent_doc = chunk.metadata.get("parent_document") 

170 if parent_doc: 

171 successful_doc_ids.discard(parent_doc.id) # Remove if it was added 

172 errors.append(f"Upsert failed for chunk {chunk.id}: {e}") 

173 error_count = len(batch) 

174 

175 return success_count, error_count, successful_doc_ids, errors 

176 

177 async def process_embedded_chunks( 

178 self, embedded_chunks: AsyncIterator[tuple[Any, list[float]]] 

179 ) -> PipelineResult: 

180 """Upsert embedded chunks to Qdrant. 

181 

182 Args: 

183 embedded_chunks: AsyncIterator of (chunk, embedding) tuples 

184 

185 Returns: 

186 PipelineResult with processing statistics 

187 """ 

188 logger.debug("UpsertWorker started") 

189 result = PipelineResult() 

190 batch = [] 

191 seen_chunk_ids: set[str] = set() 

192 

193 try: 

194 async for chunk_embedding in embedded_chunks: 

195 if self.shutdown_event.is_set(): 

196 logger.debug("UpsertWorker exiting due to shutdown") 

197 break 

198 

199 batch.append(chunk_embedding) 

200 

201 # Process batch when it reaches the desired size 

202 if len(batch) >= self.batch_size: 

203 batch_chunk_id_list = [str(chunk.id) for chunk, _ in batch] 

204 batch_chunk_ids = set(batch_chunk_id_list) 

205 batch_chunk_id_counts = Counter(batch_chunk_id_list) 

206 success_count, error_count, successful_doc_ids, errors = ( 

207 await self.process(batch) 

208 ) 

209 

210 if success_count > 0: 

211 same_batch_duplicates = { 

212 chunk_id 

213 for chunk_id, count in batch_chunk_id_counts.items() 

214 if count > 1 

215 } 

216 cross_batch_duplicates = batch_chunk_ids & seen_chunk_ids 

217 duplicate_chunk_ids = ( 

218 cross_batch_duplicates | same_batch_duplicates 

219 ) 

220 new_chunk_ids = ( 

221 batch_chunk_ids - seen_chunk_ids - same_batch_duplicates 

222 ) 

223 

224 self._handle_duplicate_chunk_ids( 

225 batch=batch, 

226 batch_chunk_id_counts=batch_chunk_id_counts, 

227 duplicate_chunk_ids=duplicate_chunk_ids, 

228 same_batch_duplicates=same_batch_duplicates, 

229 cross_batch_duplicates=cross_batch_duplicates, 

230 new_chunk_ids=new_chunk_ids, 

231 successful_doc_ids=successful_doc_ids, 

232 result=result, 

233 errors=errors, 

234 ) 

235 

236 # Only update seen_chunk_ids with non-duplicate IDs 

237 seen_chunk_ids.update(new_chunk_ids) 

238 result.success_count += len(new_chunk_ids) 

239 

240 result.error_count += error_count 

241 result.successfully_processed_documents.update(successful_doc_ids) 

242 result.errors.extend(errors) 

243 batch = [] 

244 

245 # Process any remaining chunks in the final batch 

246 if batch and not self.shutdown_event.is_set(): 

247 batch_chunk_id_list = [str(chunk.id) for chunk, _ in batch] 

248 batch_chunk_ids = set(batch_chunk_id_list) 

249 batch_chunk_id_counts = Counter(batch_chunk_id_list) 

250 success_count, error_count, successful_doc_ids, errors = ( 

251 await self.process(batch) 

252 ) 

253 

254 if success_count > 0: 

255 same_batch_duplicates = { 

256 chunk_id 

257 for chunk_id, count in batch_chunk_id_counts.items() 

258 if count > 1 

259 } 

260 cross_batch_duplicates = batch_chunk_ids & seen_chunk_ids 

261 duplicate_chunk_ids = cross_batch_duplicates | same_batch_duplicates 

262 new_chunk_ids = ( 

263 batch_chunk_ids - seen_chunk_ids - same_batch_duplicates 

264 ) 

265 

266 self._handle_duplicate_chunk_ids( 

267 batch=batch, 

268 batch_chunk_id_counts=batch_chunk_id_counts, 

269 duplicate_chunk_ids=duplicate_chunk_ids, 

270 same_batch_duplicates=same_batch_duplicates, 

271 cross_batch_duplicates=cross_batch_duplicates, 

272 new_chunk_ids=new_chunk_ids, 

273 successful_doc_ids=successful_doc_ids, 

274 result=result, 

275 errors=errors, 

276 ) 

277 

278 # Only update seen_chunk_ids with non-duplicate IDs 

279 seen_chunk_ids.update(new_chunk_ids) 

280 result.success_count += len(new_chunk_ids) 

281 

282 result.error_count += error_count 

283 result.successfully_processed_documents.update(successful_doc_ids) 

284 result.errors.extend(errors) 

285 

286 except asyncio.CancelledError: 

287 logger.debug("UpsertWorker cancelled") 

288 raise 

289 finally: 

290 logger.debug("UpsertWorker exited") 

291 

292 return result