Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 95%

128 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Refactored async ingestion pipeline using the new modular architecture.""" 

2 

3from pathlib import Path 

4 

5from qdrant_loader.config import Settings, SourcesConfig 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.monitoring import prometheus_metrics 

8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor 

9from qdrant_loader.core.qdrant_manager import QdrantManager 

10from qdrant_loader.core.state.state_manager import StateManager 

11from qdrant_loader.core.project_manager import ProjectManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .pipeline import ( 

15 PipelineComponentsFactory, 

16 PipelineConfig, 

17 PipelineOrchestrator, 

18 ResourceManager, 

19) 

20 

21logger = LoggingConfig.get_logger(__name__) 

22 

23 

24class AsyncIngestionPipeline: 

25 """Refactored async ingestion pipeline using modular architecture. 

26 

27 This class maintains backward compatibility with the original interface 

28 while using the new modular pipeline architecture internally. 

29 """ 

30 

31 def __init__( 

32 self, 

33 settings: Settings, 

34 qdrant_manager: QdrantManager, 

35 state_manager: StateManager | None = None, 

36 embedding_cache=None, # Placeholder for future cache (maintained for compatibility) 

37 max_chunk_workers: int = 10, 

38 max_embed_workers: int = 4, 

39 max_upsert_workers: int = 4, 

40 queue_size: int = 1000, 

41 upsert_batch_size: int | None = None, 

42 enable_metrics: bool = False, 

43 metrics_dir: Path | None = None, # New parameter for workspace support 

44 ): 

45 """Initialize the async ingestion pipeline. 

46 

47 Args: 

48 settings: Application settings 

49 qdrant_manager: QdrantManager instance 

50 state_manager: Optional state manager 

51 embedding_cache: Placeholder for future cache (unused) 

52 max_chunk_workers: Maximum number of chunking workers 

53 max_embed_workers: Maximum number of embedding workers 

54 max_upsert_workers: Maximum number of upsert workers 

55 queue_size: Queue size for workers 

56 upsert_batch_size: Batch size for upserts 

57 enable_metrics: Whether to enable metrics server 

58 metrics_dir: Custom metrics directory (for workspace support) 

59 """ 

60 self.settings = settings 

61 self.qdrant_manager = qdrant_manager 

62 self.embedding_cache = embedding_cache # Maintained for compatibility 

63 

64 # Validate global configuration 

65 if not settings.global_config: 

66 raise ValueError( 

67 "Global configuration not available. Please check your configuration file." 

68 ) 

69 

70 # Create pipeline configuration 

71 self.pipeline_config = PipelineConfig( 

72 max_chunk_workers=max_chunk_workers, 

73 max_embed_workers=max_embed_workers, 

74 max_upsert_workers=max_upsert_workers, 

75 queue_size=queue_size, 

76 upsert_batch_size=upsert_batch_size, 

77 enable_metrics=enable_metrics, 

78 ) 

79 

80 # Create resource manager 

81 self.resource_manager = ResourceManager() 

82 self.resource_manager.register_signal_handlers() 

83 

84 # Create state manager if not provided 

85 self.state_manager = state_manager or StateManager( 

86 settings.global_config.state_management 

87 ) 

88 

89 # Initialize project manager for multi-project support 

90 if not settings.global_config.qdrant: 

91 raise ValueError( 

92 "Qdrant configuration is required for project manager initialization" 

93 ) 

94 

95 self.project_manager = ProjectManager( 

96 projects_config=settings.projects_config, 

97 global_collection_name=settings.global_config.qdrant.collection_name, 

98 ) 

99 

100 # Create pipeline components using factory 

101 factory = PipelineComponentsFactory() 

102 self.components = factory.create_components( 

103 settings=settings, 

104 config=self.pipeline_config, 

105 qdrant_manager=qdrant_manager, 

106 state_manager=self.state_manager, 

107 resource_manager=self.resource_manager, 

108 ) 

109 

110 # Create orchestrator with project manager support 

111 self.orchestrator = PipelineOrchestrator( 

112 settings, self.components, self.project_manager 

113 ) 

114 

115 # Initialize performance monitor with custom or default metrics directory 

116 if metrics_dir: 

117 # Use provided metrics directory (workspace mode) 

118 final_metrics_dir = metrics_dir 

119 else: 

120 # Use default metrics directory 

121 final_metrics_dir = Path.cwd() / "metrics" 

122 

123 final_metrics_dir.mkdir(parents=True, exist_ok=True) 

124 logger.info(f"Initializing metrics directory at {final_metrics_dir}") 

125 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute())) 

126 

127 # Start metrics server if enabled 

128 if enable_metrics: 

129 prometheus_metrics.start_metrics_server() 

130 

131 logger.info("AsyncIngestionPipeline initialized with new modular architecture") 

132 

133 # Track cleanup state to prevent duplicate cleanup 

134 self._cleanup_performed = False 

135 

136 async def initialize(self): 

137 """Initialize the pipeline (maintained for compatibility).""" 

138 logger.debug("Pipeline initialization called") 

139 

140 # Initialize state manager first 

141 if not self.state_manager._initialized: 

142 logger.debug("Initializing state manager") 

143 await self.state_manager.initialize() 

144 

145 # Initialize project manager 

146 if not self.project_manager._initialized: 

147 logger.debug("Initializing project manager") 

148 if self.state_manager._session_factory: 

149 async with self.state_manager._session_factory() as session: 

150 await self.project_manager.initialize(session) 

151 

152 async def process_documents( 

153 self, 

154 sources_config: SourcesConfig | None = None, 

155 source_type: str | None = None, 

156 source: str | None = None, 

157 project_id: str | None = None, 

158 ) -> list[Document]: 

159 """Process documents from all configured sources. 

160 

161 Args: 

162 sources_config: Sources configuration to use (deprecated, use project_id instead) 

163 source_type: Filter by source type 

164 source: Filter by specific source name 

165 project_id: Process documents for a specific project 

166 

167 Returns: 

168 List of processed documents 

169 """ 

170 # Ensure the pipeline is initialized 

171 await self.initialize() 

172 

173 # Reset metrics for new run 

174 self.monitor.clear_metrics() 

175 

176 self.monitor.start_operation( 

177 "ingestion_process", 

178 metadata={ 

179 "source_type": source_type, 

180 "source": source, 

181 "project_id": project_id, 

182 }, 

183 ) 

184 

185 try: 

186 logger.debug("Starting document processing with new pipeline architecture") 

187 

188 # Use the orchestrator to process documents with project support 

189 documents = await self.orchestrator.process_documents( 

190 sources_config=sources_config, 

191 source_type=source_type, 

192 source=source, 

193 project_id=project_id, 

194 ) 

195 

196 # Update metrics (maintained for compatibility) 

197 if documents: 

198 self.monitor.start_batch( 

199 "document_batch", 

200 batch_size=len(documents), 

201 metadata={ 

202 "source_type": source_type, 

203 "source": source, 

204 "project_id": project_id, 

205 }, 

206 ) 

207 # Note: Success/error counts are handled internally by the new architecture 

208 self.monitor.end_batch("document_batch", len(documents), 0, []) 

209 

210 self.monitor.end_operation("ingestion_process") 

211 

212 logger.debug( 

213 f"Document processing completed. Processed {len(documents)} documents" 

214 ) 

215 return documents 

216 

217 except Exception as e: 

218 logger.error(f"Document processing failed: {e}", exc_info=True) 

219 self.monitor.end_operation("ingestion_process", error=str(e)) 

220 raise 

221 

222 async def cleanup(self): 

223 """Clean up resources.""" 

224 if self._cleanup_performed: 

225 return 

226 

227 logger.info("Cleaning up pipeline resources") 

228 self._cleanup_performed = True 

229 

230 try: 

231 # Save metrics 

232 if hasattr(self, "monitor"): 

233 self.monitor.save_metrics() 

234 

235 # Stop metrics server 

236 try: 

237 prometheus_metrics.stop_metrics_server() 

238 except Exception as e: 

239 logger.warning(f"Error stopping metrics server: {e}") 

240 

241 # Use resource manager for cleanup 

242 if hasattr(self, "resource_manager"): 

243 await self.resource_manager.cleanup() 

244 

245 logger.info("Pipeline cleanup completed") 

246 except Exception as e: 

247 logger.error(f"Error during pipeline cleanup: {e}") 

248 

249 def __del__(self): 

250 """Destructor to ensure cleanup.""" 

251 try: 

252 # Can't await in __del__, so use the sync cleanup method 

253 self._sync_cleanup() 

254 except Exception as e: 

255 logger.error(f"Error in destructor cleanup: {e}") 

256 

257 def _sync_cleanup(self): 

258 """Synchronous cleanup for destructor and signal handlers.""" 

259 if self._cleanup_performed: 

260 return 

261 

262 logger.info("Cleaning up pipeline resources (sync)") 

263 self._cleanup_performed = True 

264 

265 # Save metrics 

266 try: 

267 if hasattr(self, "monitor"): 

268 self.monitor.save_metrics() 

269 except Exception as e: 

270 logger.error(f"Error saving metrics: {e}") 

271 

272 # Stop metrics server 

273 try: 

274 prometheus_metrics.stop_metrics_server() 

275 except Exception as e: 

276 logger.error(f"Error stopping metrics server: {e}") 

277 

278 # Use resource manager sync cleanup 

279 try: 

280 if hasattr(self, "resource_manager"): 

281 self.resource_manager._cleanup() 

282 except Exception as e: 

283 logger.error(f"Error in resource manager cleanup: {e}") 

284 

285 logger.info("Pipeline cleanup completed (sync)") 

286 

287 # Backward compatibility properties 

288 @property 

289 def _shutdown_event(self): 

290 """Backward compatibility property for shutdown event.""" 

291 return self.resource_manager.shutdown_event 

292 

293 @property 

294 def _active_tasks(self): 

295 """Backward compatibility property for active tasks.""" 

296 return self.resource_manager.active_tasks 

297 

298 @property 

299 def _cleanup_done(self): 

300 """Backward compatibility property for cleanup status.""" 

301 return self.resource_manager.cleanup_done 

302 

303 # Legacy methods maintained for compatibility 

304 def _cleanup(self): 

305 """Legacy cleanup method (redirects to sync cleanup).""" 

306 self._sync_cleanup() 

307 

308 async def _async_cleanup(self): 

309 """Legacy async cleanup method (redirects to resource manager).""" 

310 await self.resource_manager.cleanup() 

311 

312 def _handle_sigint(self, signum, frame): 

313 """Legacy signal handler (redirects to resource manager).""" 

314 self.resource_manager._handle_sigint(signum, frame) 

315 

316 def _handle_sigterm(self, signum, frame): 

317 """Legacy signal handler (redirects to resource manager).""" 

318 self.resource_manager._handle_sigterm(signum, frame) 

319 

320 def _cancel_all_tasks(self): 

321 """Legacy task cancellation (redirects to resource manager).""" 

322 self.resource_manager._cancel_all_tasks() 

323 

324 def _force_immediate_exit(self): 

325 """Legacy force exit (redirects to resource manager).""" 

326 self.resource_manager._force_immediate_exit()