Coverage for src / qdrant_loader / core / pipeline / document_pipeline.py: 64%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Document processing pipeline that coordinates chunking, embedding, and upserting.""" 

2 

3import asyncio 

4import time 

5from dataclasses import dataclass 

6 

7from qdrant_loader.core.document import Document 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker 

11from .workers.upsert_worker import PipelineResult 

12 

13logger = LoggingConfig.get_logger(__name__) 

14 

15 

16@dataclass 

17class BatchResult: 

18 """Result of processing a bounded batch of documents.""" 

19 

20 success_count: int = 0 

21 failure_count: int = 0 

22 skipped_count: int = 0 

23 successfully_processed_documents: set[str] | None = None 

24 failed_document_ids: set[str] | None = None 

25 errors: list[str] | None = None 

26 

27 def __post_init__(self) -> None: 

28 if self.successfully_processed_documents is None: 

29 self.successfully_processed_documents = set() 

30 if self.failed_document_ids is None: 

31 self.failed_document_ids = set() 

32 if self.errors is None: 

33 self.errors = [] 

34 

35 

36class DocumentPipeline: 

37 """Handles the chunking -> embedding -> upsert pipeline.""" 

38 

39 def __init__( 

40 self, 

41 chunking_worker: ChunkingWorker, 

42 embedding_worker: EmbeddingWorker, 

43 upsert_worker: UpsertWorker, 

44 ): 

45 self.chunking_worker = chunking_worker 

46 self.embedding_worker = embedding_worker 

47 self.upsert_worker = upsert_worker 

48 

49 async def process_batch(self, batch: list[Document]) -> BatchResult: 

50 """Process a bounded batch of documents through the pipeline. 

51 

52 Args: 

53 batch: List of documents to process (bounded size, typically 256) 

54 

55 Returns: 

56 BatchResult with processing statistics. 

57 """ 

58 logger.info(f"⚙️ Processing batch of {len(batch)} documents through pipeline") 

59 start_time = time.time() 

60 

61 try: 

62 logger.debug("🔄 Starting chunking phase for batch...") 

63 chunking_start = time.time() 

64 chunks_iter = self.chunking_worker.process_documents(batch) 

65 

66 logger.debug("🔄 Chunking completed, transitioning to embedding phase...") 

67 chunking_duration = time.time() - chunking_start 

68 logger.debug(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds") 

69 

70 embedding_start = time.time() 

71 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter) 

72 

73 logger.debug("🔄 Embedding phase ready, starting upsert phase...") 

74 

75 try: 

76 pipeline_result = await asyncio.wait_for( 

77 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter), 

78 timeout=600.0, # 10 minute timeout per batch 

79 ) 

80 except TimeoutError: 

81 logger.error("❌ Batch processing timed out after 10 minutes") 

82 return BatchResult( 

83 failure_count=len(batch), 

84 errors=["Batch processing timed out after 10 minutes"], 

85 ) 

86 

87 total_duration = time.time() - start_time 

88 embedding_duration = time.time() - embedding_start 

89 

90 logger.debug( 

91 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds" 

92 ) 

93 logger.info( 

94 f"✅ Batch processing completed: {pipeline_result.success_count} chunks, " 

95 f"{pipeline_result.error_count} errors in {total_duration:.2f}s" 

96 ) 

97 

98 return BatchResult( 

99 success_count=pipeline_result.success_count, 

100 failure_count=pipeline_result.error_count, 

101 skipped_count=0, 

102 successfully_processed_documents=pipeline_result.successfully_processed_documents, 

103 failed_document_ids=pipeline_result.failed_document_ids, 

104 errors=pipeline_result.errors, 

105 ) 

106 

107 except Exception as e: 

108 total_duration = time.time() - start_time 

109 logger.error( 

110 f"❌ Batch processing failed after {total_duration:.2f} seconds: {e}", 

111 exc_info=True, 

112 ) 

113 return BatchResult( 

114 failure_count=len(batch), 

115 errors=[f"Batch processing failed: {e}"], 

116 ) 

117 

118 async def process_documents(self, documents: list[Document]) -> PipelineResult: 

119 """Process documents through the pipeline. 

120 

121 Args: 

122 documents: List of documents to process 

123 

124 Returns: 

125 PipelineResult with processing statistics 

126 """ 

127 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline") 

128 start_time = time.time() 

129 

130 try: 

131 logger.info("🔄 Starting chunking phase...") 

132 chunking_start = time.time() 

133 chunks_iter = self.chunking_worker.process_documents(documents) 

134 

135 logger.info("🔄 Chunking completed, transitioning to embedding phase...") 

136 chunking_duration = time.time() - chunking_start 

137 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds") 

138 

139 embedding_start = time.time() 

140 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter) 

141 

142 logger.info("🔄 Embedding phase ready, starting upsert phase...") 

143 

144 try: 

145 result = await asyncio.wait_for( 

146 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter), 

147 timeout=3600.0, # 1 hour timeout for the entire pipeline 

148 ) 

149 except TimeoutError: 

150 logger.error("❌ Pipeline timed out after 1 hour") 

151 result = PipelineResult() 

152 result.error_count = len(documents) 

153 result.errors = ["Pipeline timed out after 1 hour"] 

154 return result 

155 

156 total_duration = time.time() - start_time 

157 embedding_duration = time.time() - embedding_start 

158 

159 logger.info( 

160 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds" 

161 ) 

162 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds") 

163 logger.info( 

164 f"✅ Pipeline completed: {result.success_count} chunks processed, " 

165 f"{result.error_count} errors" 

166 ) 

167 

168 return result 

169 

170 except Exception as e: 

171 total_duration = time.time() - start_time 

172 logger.error( 

173 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}", 

174 exc_info=True, 

175 ) 

176 result = PipelineResult() 

177 result.error_count = len(documents) 

178 result.errors = [f"Pipeline failed: {e}"] 

179 return result