Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 89%

140 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Refactored async ingestion pipeline using the new modular architecture.""" 

2 

3from pathlib import Path 

4 

5from qdrant_loader.config import Settings, SourcesConfig 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.monitoring import prometheus_metrics 

8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor 

9from qdrant_loader.core.project_manager import ProjectManager 

10from qdrant_loader.core.qdrant_manager import QdrantManager 

11from qdrant_loader.core.state.state_manager import StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .pipeline import ( 

15 PipelineComponentsFactory, 

16 PipelineConfig, 

17 PipelineOrchestrator, 

18 ResourceManager, 

19) 

20 

21logger = LoggingConfig.get_logger(__name__) 

22 

23 

24class AsyncIngestionPipeline: 

25 """Refactored async ingestion pipeline using modular architecture. 

26 

27 This class maintains backward compatibility with the original interface 

28 while using the new modular pipeline architecture internally. 

29 """ 

30 

31 def __init__( 

32 self, 

33 settings: Settings, 

34 qdrant_manager: QdrantManager, 

35 state_manager: StateManager | None = None, 

36 embedding_cache=None, # Placeholder for future cache (maintained for compatibility) 

37 max_chunk_workers: int = 10, 

38 max_embed_workers: int = 4, 

39 max_upsert_workers: int = 4, 

40 queue_size: int = 1000, 

41 upsert_batch_size: int | None = None, 

42 enable_metrics: bool = False, 

43 metrics_dir: Path | None = None, # New parameter for workspace support 

44 ): 

45 """Initialize the async ingestion pipeline. 

46 

47 Args: 

48 settings: Application settings 

49 qdrant_manager: QdrantManager instance 

50 state_manager: Optional state manager 

51 embedding_cache: Placeholder for future cache (unused) 

52 max_chunk_workers: Maximum number of chunking workers 

53 max_embed_workers: Maximum number of embedding workers 

54 max_upsert_workers: Maximum number of upsert workers 

55 queue_size: Queue size for workers 

56 upsert_batch_size: Batch size for upserts 

57 enable_metrics: Whether to enable metrics server 

58 metrics_dir: Custom metrics directory (for workspace support) 

59 """ 

60 self.settings = settings 

61 self.qdrant_manager = qdrant_manager 

62 self.embedding_cache = embedding_cache # Maintained for compatibility 

63 

64 # Validate global configuration 

65 if not settings.global_config: 

66 raise ValueError( 

67 "Global configuration not available. Please check your configuration file." 

68 ) 

69 

70 # Create pipeline configuration 

71 self.pipeline_config = PipelineConfig( 

72 max_chunk_workers=max_chunk_workers, 

73 max_embed_workers=max_embed_workers, 

74 max_upsert_workers=max_upsert_workers, 

75 queue_size=queue_size, 

76 upsert_batch_size=upsert_batch_size, 

77 enable_metrics=enable_metrics, 

78 ) 

79 

80 # Create resource manager 

81 self.resource_manager = ResourceManager() 

82 self.resource_manager.register_signal_handlers() 

83 

84 # Create state manager if not provided 

85 self.state_manager = state_manager or StateManager( 

86 settings.global_config.state_management 

87 ) 

88 

89 # Initialize project manager for multi-project support 

90 if not settings.global_config.qdrant: 

91 raise ValueError( 

92 "Qdrant configuration is required for project manager initialization" 

93 ) 

94 

95 self.project_manager = ProjectManager( 

96 projects_config=settings.projects_config, 

97 global_collection_name=settings.global_config.qdrant.collection_name, 

98 ) 

99 

100 # Create pipeline components using factory 

101 factory = PipelineComponentsFactory() 

102 self.components = factory.create_components( 

103 settings=settings, 

104 config=self.pipeline_config, 

105 qdrant_manager=qdrant_manager, 

106 state_manager=self.state_manager, 

107 resource_manager=self.resource_manager, 

108 ) 

109 

110 # Create orchestrator with project manager support 

111 self.orchestrator = PipelineOrchestrator( 

112 settings, self.components, self.project_manager 

113 ) 

114 

115 # Initialize performance monitor with custom or default metrics directory 

116 if metrics_dir: 

117 # Use provided metrics directory (workspace mode) 

118 final_metrics_dir = metrics_dir 

119 else: 

120 # Use default metrics directory 

121 final_metrics_dir = Path.cwd() / "metrics" 

122 

123 final_metrics_dir.mkdir(parents=True, exist_ok=True) 

124 logger.info(f"Initializing metrics directory at {final_metrics_dir}") 

125 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute())) 

126 

127 # Start metrics server if enabled 

128 if enable_metrics: 

129 prometheus_metrics.start_metrics_server() 

130 

131 logger.info("AsyncIngestionPipeline initialized with new modular architecture") 

132 

133 # Track cleanup state to prevent duplicate cleanup 

134 self._cleanup_performed = False 

135 

136 async def initialize(self): 

137 """Initialize the pipeline (maintained for compatibility).""" 

138 logger.debug("Pipeline initialization called") 

139 

140 try: 

141 # Initialize state manager first 

142 if not self.state_manager._initialized: 

143 logger.debug("Initializing state manager") 

144 await self.state_manager.initialize() 

145 logger.debug("State manager initialization completed") 

146 

147 # Initialize project manager 

148 if not self.project_manager._initialized: 

149 logger.debug("Initializing project manager") 

150 if self.state_manager._session_factory: 

151 try: 

152 async with self.state_manager._session_factory() as session: 

153 await self.project_manager.initialize(session) 

154 logger.debug("Project manager initialization completed") 

155 except Exception as e: 

156 logger.error( 

157 f"Failed to initialize project manager: {e}", exc_info=True 

158 ) 

159 raise 

160 else: 

161 logger.error("State manager session factory is not available") 

162 raise RuntimeError("State manager session factory is not available") 

163 except Exception as e: 

164 logger.error(f"Pipeline initialization failed: {e}", exc_info=True) 

165 raise 

166 

167 async def process_documents( 

168 self, 

169 sources_config: SourcesConfig | None = None, 

170 source_type: str | None = None, 

171 source: str | None = None, 

172 project_id: str | None = None, 

173 force: bool = False, 

174 ) -> list[Document]: 

175 """Process documents from all configured sources. 

176 

177 Args: 

178 sources_config: Sources configuration to use (deprecated, use project_id instead) 

179 source_type: Filter by source type 

180 source: Filter by specific source name 

181 project_id: Process documents for a specific project 

182 force: Force processing of all documents, bypassing change detection 

183 

184 Returns: 

185 List of processed documents 

186 """ 

187 # Ensure the pipeline is initialized 

188 await self.initialize() 

189 

190 # Reset metrics for new run 

191 self.monitor.clear_metrics() 

192 

193 self.monitor.start_operation( 

194 "ingestion_process", 

195 metadata={ 

196 "source_type": source_type, 

197 "source": source, 

198 "project_id": project_id, 

199 "force": force, 

200 }, 

201 ) 

202 

203 try: 

204 logger.debug("Starting document processing with new pipeline architecture") 

205 

206 # Use the orchestrator to process documents with project support 

207 documents = await self.orchestrator.process_documents( 

208 sources_config=sources_config, 

209 source_type=source_type, 

210 source=source, 

211 project_id=project_id, 

212 force=force, 

213 ) 

214 

215 # Update metrics (maintained for compatibility) 

216 if documents: 

217 self.monitor.start_batch( 

218 "document_batch", 

219 batch_size=len(documents), 

220 metadata={ 

221 "source_type": source_type, 

222 "source": source, 

223 "project_id": project_id, 

224 "force": force, 

225 }, 

226 ) 

227 # Note: Success/error counts are handled internally by the new architecture 

228 self.monitor.end_batch("document_batch", len(documents), 0, []) 

229 

230 self.monitor.end_operation("ingestion_process") 

231 

232 logger.debug( 

233 f"Document processing completed. Processed {len(documents)} documents" 

234 ) 

235 return documents 

236 

237 except Exception as e: 

238 logger.error(f"Document processing failed: {e}", exc_info=True) 

239 self.monitor.end_operation("ingestion_process", error=str(e)) 

240 raise 

241 

242 async def cleanup(self): 

243 """Clean up resources.""" 

244 if self._cleanup_performed: 

245 return 

246 

247 logger.info("Cleaning up pipeline resources") 

248 self._cleanup_performed = True 

249 

250 try: 

251 # Save metrics 

252 if hasattr(self, "monitor"): 

253 self.monitor.save_metrics() 

254 

255 # Stop metrics server 

256 try: 

257 prometheus_metrics.stop_metrics_server() 

258 except Exception as e: 

259 logger.warning(f"Error stopping metrics server: {e}") 

260 

261 # Use resource manager for cleanup 

262 if hasattr(self, "resource_manager"): 

263 await self.resource_manager.cleanup() 

264 

265 logger.info("Pipeline cleanup completed") 

266 except Exception as e: 

267 logger.error(f"Error during pipeline cleanup: {e}") 

268 

269 def __del__(self): 

270 """Destructor to ensure cleanup.""" 

271 try: 

272 # Can't await in __del__, so use the sync cleanup method 

273 self._sync_cleanup() 

274 except Exception as e: 

275 logger.error(f"Error in destructor cleanup: {e}") 

276 

277 def _sync_cleanup(self): 

278 """Synchronous cleanup for destructor and signal handlers.""" 

279 if self._cleanup_performed: 

280 return 

281 

282 logger.info("Cleaning up pipeline resources (sync)") 

283 self._cleanup_performed = True 

284 

285 # Save metrics 

286 try: 

287 if hasattr(self, "monitor"): 

288 self.monitor.save_metrics() 

289 except Exception as e: 

290 logger.error(f"Error saving metrics: {e}") 

291 

292 # Stop metrics server 

293 try: 

294 prometheus_metrics.stop_metrics_server() 

295 except Exception as e: 

296 logger.error(f"Error stopping metrics server: {e}") 

297 

298 # Use resource manager sync cleanup 

299 try: 

300 if hasattr(self, "resource_manager"): 

301 self.resource_manager._cleanup() 

302 except Exception as e: 

303 logger.error(f"Error in resource manager cleanup: {e}") 

304 

305 logger.info("Pipeline cleanup completed (sync)") 

306 

307 # Backward compatibility properties 

308 @property 

309 def _shutdown_event(self): 

310 """Backward compatibility property for shutdown event.""" 

311 return self.resource_manager.shutdown_event 

312 

313 @property 

314 def _active_tasks(self): 

315 """Backward compatibility property for active tasks.""" 

316 return self.resource_manager.active_tasks 

317 

318 @property 

319 def _cleanup_done(self): 

320 """Backward compatibility property for cleanup status.""" 

321 return self.resource_manager.cleanup_done 

322 

323 # Legacy methods maintained for compatibility 

324 def _cleanup(self): 

325 """Legacy cleanup method (redirects to sync cleanup).""" 

326 self._sync_cleanup() 

327 

328 async def _async_cleanup(self): 

329 """Legacy async cleanup method (redirects to resource manager).""" 

330 await self.resource_manager.cleanup() 

331 

332 def _handle_sigint(self, signum, frame): 

333 """Legacy signal handler (redirects to resource manager).""" 

334 self.resource_manager._handle_sigint(signum, frame) 

335 

336 def _handle_sigterm(self, signum, frame): 

337 """Legacy signal handler (redirects to resource manager).""" 

338 self.resource_manager._handle_sigterm(signum, frame) 

339 

340 def _cancel_all_tasks(self): 

341 """Legacy task cancellation (redirects to resource manager).""" 

342 self.resource_manager._cancel_all_tasks() 

343 

344 def _force_immediate_exit(self): 

345 """Legacy force exit (redirects to resource manager).""" 

346 self.resource_manager._force_immediate_exit()