Coverage for src/qdrant_loader/core/pipeline/workers/upsert_worker.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Upsert worker for upserting embedded chunks to Qdrant.""" 

2 

3import asyncio 

4from collections.abc import AsyncIterator 

5from typing import Any 

6 

7from qdrant_client.http import models 

8 

9from qdrant_loader.core.monitoring import prometheus_metrics 

10from qdrant_loader.core.qdrant_manager import QdrantManager 

11from qdrant_loader.utils.logging import LoggingConfig 

12 

13from .base_worker import BaseWorker 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18class PipelineResult: 

19 """Result of pipeline processing.""" 

20 

21 def __init__(self): 

22 self.success_count: int = 0 

23 self.error_count: int = 0 

24 self.successfully_processed_documents: set[str] = set() 

25 self.failed_document_ids: set[str] = set() 

26 self.errors: list[str] = [] 

27 

28 

29class UpsertWorker(BaseWorker): 

30 """Handles upserting embedded chunks to Qdrant.""" 

31 

32 def __init__( 

33 self, 

34 qdrant_manager: QdrantManager, 

35 batch_size: int, 

36 max_workers: int = 4, 

37 queue_size: int = 1000, 

38 shutdown_event: asyncio.Event | None = None, 

39 ): 

40 super().__init__(max_workers, queue_size) 

41 self.qdrant_manager = qdrant_manager 

42 self.batch_size = batch_size 

43 self.shutdown_event = shutdown_event or asyncio.Event() 

44 

45 async def process( 

46 self, batch: list[tuple[Any, list[float]]] 

47 ) -> tuple[int, int, set[str], list[str]]: 

48 """Process a batch of embedded chunks. 

49 

50 Args: 

51 batch: List of (chunk, embedding) tuples 

52 

53 Returns: 

54 Tuple of (success_count, error_count, successful_doc_ids, errors) 

55 """ 

56 if not batch: 

57 return 0, 0, set(), [] 

58 

59 success_count = 0 

60 error_count = 0 

61 successful_doc_ids = set() 

62 errors = [] 

63 

64 try: 

65 with prometheus_metrics.UPSERT_DURATION.time(): 

66 points = [ 

67 models.PointStruct( 

68 id=chunk.id, 

69 vector=embedding, 

70 payload={ 

71 "content": chunk.content, 

72 "metadata": { 

73 k: v 

74 for k, v in chunk.metadata.items() 

75 if k != "parent_document" 

76 }, 

77 "source": chunk.source, 

78 "source_type": chunk.source_type, 

79 "created_at": chunk.created_at.isoformat(), 

80 "updated_at": ( 

81 getattr( 

82 chunk, "updated_at", chunk.created_at 

83 ).isoformat() 

84 if hasattr(chunk, "updated_at") 

85 else chunk.created_at.isoformat() 

86 ), 

87 "title": getattr( 

88 chunk, "title", chunk.metadata.get("title", "") 

89 ), 

90 "url": getattr(chunk, "url", chunk.metadata.get("url", "")), 

91 "document_id": chunk.metadata.get( 

92 "parent_document_id", chunk.id 

93 ), 

94 }, 

95 ) 

96 for chunk, embedding in batch 

97 ] 

98 

99 await self.qdrant_manager.upsert_points(points) 

100 prometheus_metrics.INGESTED_DOCUMENTS.inc(len(points)) 

101 success_count = len(points) 

102 

103 # Mark parent documents as successfully processed 

104 for chunk, _ in batch: 

105 parent_doc = chunk.metadata.get("parent_document") 

106 if parent_doc: 

107 successful_doc_ids.add(parent_doc.id) 

108 

109 except Exception as e: 

110 for chunk, _ in batch: 

111 logger.error(f"Upsert failed for chunk {chunk.id}: {e}") 

112 # Mark parent document as failed 

113 parent_doc = chunk.metadata.get("parent_document") 

114 if parent_doc: 

115 successful_doc_ids.discard(parent_doc.id) # Remove if it was added 

116 errors.append(f"Upsert failed for chunk {chunk.id}: {e}") 

117 error_count = len(batch) 

118 

119 return success_count, error_count, successful_doc_ids, errors 

120 

121 async def process_embedded_chunks( 

122 self, embedded_chunks: AsyncIterator[tuple[Any, list[float]]] 

123 ) -> PipelineResult: 

124 """Upsert embedded chunks to Qdrant. 

125 

126 Args: 

127 embedded_chunks: AsyncIterator of (chunk, embedding) tuples 

128 

129 Returns: 

130 PipelineResult with processing statistics 

131 """ 

132 logger.debug("UpsertWorker started") 

133 result = PipelineResult() 

134 batch = [] 

135 

136 try: 

137 async for chunk_embedding in embedded_chunks: 

138 if self.shutdown_event.is_set(): 

139 logger.debug("UpsertWorker exiting due to shutdown") 

140 break 

141 

142 batch.append(chunk_embedding) 

143 

144 # Process batch when it reaches the desired size 

145 if len(batch) >= self.batch_size: 

146 success_count, error_count, successful_doc_ids, errors = ( 

147 await self.process(batch) 

148 ) 

149 result.success_count += success_count 

150 result.error_count += error_count 

151 result.successfully_processed_documents.update(successful_doc_ids) 

152 result.errors.extend(errors) 

153 batch = [] 

154 

155 # Process any remaining chunks in the final batch 

156 if batch and not self.shutdown_event.is_set(): 

157 success_count, error_count, successful_doc_ids, errors = ( 

158 await self.process(batch) 

159 ) 

160 result.success_count += success_count 

161 result.error_count += error_count 

162 result.successfully_processed_documents.update(successful_doc_ids) 

163 result.errors.extend(errors) 

164 

165 except asyncio.CancelledError: 

166 logger.debug("UpsertWorker cancelled") 

167 raise 

168 finally: 

169 logger.debug("UpsertWorker exited") 

170 

171 return result