Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 90%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Refactored async ingestion pipeline using the new modular architecture.""" 

2 

3from pathlib import Path 

4 

5from qdrant_loader.config import Settings, SourcesConfig 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.monitoring import prometheus_metrics 

8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor 

9from qdrant_loader.core.project_manager import ProjectManager 

10from qdrant_loader.core.qdrant_manager import QdrantManager 

11from qdrant_loader.core.state.state_manager import StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .pipeline import ( 

15 PipelineComponentsFactory, 

16 PipelineConfig, 

17 PipelineOrchestrator, 

18 ResourceManager, 

19) 

20 

21logger = LoggingConfig.get_logger(__name__) 

22 

23 

24class AsyncIngestionPipeline: 

25 """Async ingestion pipeline using modular architecture. 

26 

27 This class provides a streamlined interface for the modular pipeline 

28 architecture, handling document ingestion and processing workflows. 

29 """ 

30 

31 def __init__( 

32 self, 

33 settings: Settings, 

34 qdrant_manager: QdrantManager, 

35 state_manager: StateManager | None = None, 

36 max_chunk_workers: int = 10, 

37 max_embed_workers: int = 4, 

38 max_upsert_workers: int = 4, 

39 queue_size: int = 1000, 

40 upsert_batch_size: int | None = None, 

41 enable_metrics: bool = False, 

42 metrics_dir: Path | None = None, # New parameter for workspace support 

43 ): 

44 """Initialize the async ingestion pipeline. 

45 

46 Args: 

47 settings: Application settings 

48 qdrant_manager: QdrantManager instance 

49 state_manager: Optional state manager 

50 

51 max_chunk_workers: Maximum number of chunking workers 

52 max_embed_workers: Maximum number of embedding workers 

53 max_upsert_workers: Maximum number of upsert workers 

54 queue_size: Queue size for workers 

55 upsert_batch_size: Batch size for upserts 

56 enable_metrics: Whether to enable metrics server 

57 metrics_dir: Custom metrics directory (for workspace support) 

58 """ 

59 self.settings = settings 

60 self.qdrant_manager = qdrant_manager 

61 

62 # Validate that global configuration is available for pipeline operation. 

63 if not settings.global_config: 

64 raise ValueError( 

65 "Global configuration not available. Please check your configuration file." 

66 ) 

67 

68 # Create pipeline configuration with worker and batch size settings. 

69 self.pipeline_config = PipelineConfig( 

70 max_chunk_workers=max_chunk_workers, 

71 max_embed_workers=max_embed_workers, 

72 max_upsert_workers=max_upsert_workers, 

73 queue_size=queue_size, 

74 upsert_batch_size=upsert_batch_size, 

75 enable_metrics=enable_metrics, 

76 ) 

77 

78 # Create resource manager to handle cleanup and signal handling. 

79 self.resource_manager = ResourceManager() 

80 self.resource_manager.register_signal_handlers() 

81 

82 # Create state manager instance if not provided by caller. 

83 self.state_manager = state_manager or StateManager( 

84 settings.global_config.state_management 

85 ) 

86 

87 # Initialize project manager to support multi-project configurations. 

88 if not settings.global_config.qdrant: 

89 raise ValueError( 

90 "Qdrant configuration is required for project manager initialization" 

91 ) 

92 

93 self.project_manager = ProjectManager( 

94 projects_config=settings.projects_config, 

95 global_collection_name=settings.global_config.qdrant.collection_name, 

96 ) 

97 

98 # Create pipeline components using factory 

99 factory = PipelineComponentsFactory() 

100 self.components = factory.create_components( 

101 settings=settings, 

102 config=self.pipeline_config, 

103 qdrant_manager=qdrant_manager, 

104 state_manager=self.state_manager, 

105 resource_manager=self.resource_manager, 

106 ) 

107 

108 # Create orchestrator with project manager support 

109 self.orchestrator = PipelineOrchestrator( 

110 settings, self.components, self.project_manager 

111 ) 

112 

113 # Initialize performance monitor with custom or default metrics directory 

114 if metrics_dir: 

115 # Use provided metrics directory (workspace mode) 

116 # Accept both Path and str inputs 

117 final_metrics_dir = ( 

118 metrics_dir if isinstance(metrics_dir, Path) else Path(metrics_dir) 

119 ) 

120 else: 

121 # Use default metrics directory 

122 final_metrics_dir = Path.cwd() / "metrics" 

123 

124 final_metrics_dir.mkdir(parents=True, exist_ok=True) 

125 logger.info(f"Initializing metrics directory at {final_metrics_dir}") 

126 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute())) 

127 

128 # Start metrics server if enabled 

129 if enable_metrics: 

130 prometheus_metrics.start_metrics_server() 

131 

132 logger.info("AsyncIngestionPipeline initialized with new modular architecture") 

133 

134 # Track cleanup state to prevent duplicate cleanup 

135 self._cleanup_performed = False 

136 

137 async def initialize(self): 

138 """Initialize the pipeline.""" 

139 logger.debug("Starting pipeline initialization") 

140 

141 try: 

142 # Initialize state manager first 

143 if not self.state_manager.is_initialized: 

144 logger.debug("Initializing state manager") 

145 await self.state_manager.initialize() 

146 

147 # Initialize project manager 

148 if not self.project_manager._initialized: 

149 logger.debug("Initializing project manager") 

150 # Prefer direct use of session factory to match existing tests/mocks 

151 session_factory = getattr(self.state_manager, "_session_factory", None) 

152 if session_factory is None: 

153 logger.error( 

154 "State manager session factory is not available during initialization", 

155 suggestion="Check database configuration and ensure proper state manager setup", 

156 ) 

157 raise RuntimeError("State manager session factory is not available") 

158 

159 try: 

160 async with session_factory() as session: # type: ignore 

161 await self.project_manager.initialize(session) 

162 logger.debug("Project manager initialization completed") 

163 except Exception as e: 

164 # Standardized error logging: user-friendly message + technical details + stack trace 

165 logger.error( 

166 "Failed to initialize project manager during pipeline startup", 

167 error=str(e), 

168 error_type=type(e).__name__, 

169 suggestion="Check database connectivity and project configuration", 

170 exc_info=True, 

171 ) 

172 raise 

173 except Exception as e: 

174 # Standardized error logging: user-friendly message + technical details + stack trace 

175 logger.error( 

176 "Pipeline initialization failed during startup sequence", 

177 error=str(e), 

178 error_type=type(e).__name__, 

179 suggestion="Check configuration, database connectivity, and system resources", 

180 exc_info=True, 

181 ) 

182 raise 

183 

184 async def process_documents( 

185 self, 

186 sources_config: SourcesConfig | None = None, 

187 source_type: str | None = None, 

188 source: str | None = None, 

189 project_id: str | None = None, 

190 force: bool = False, 

191 ) -> list[Document]: 

192 """Process documents from all configured sources. 

193 

194 Args: 

195 sources_config: Sources configuration to use (deprecated, use project_id instead) 

196 source_type: Filter by source type 

197 source: Filter by specific source name 

198 project_id: Process documents for a specific project 

199 force: Force processing of all documents, bypassing change detection 

200 

201 Returns: 

202 List of processed documents 

203 """ 

204 # Ensure the pipeline is initialized 

205 await self.initialize() 

206 

207 # Reset metrics for new run 

208 self.monitor.clear_metrics() 

209 

210 self.monitor.start_operation( 

211 "ingestion_process", 

212 metadata={ 

213 "source_type": source_type, 

214 "source": source, 

215 "project_id": project_id, 

216 "force": force, 

217 }, 

218 ) 

219 

220 documents = [] # Initialize to avoid UnboundLocalError in exception handler 

221 try: 

222 logger.debug("Starting document processing with new pipeline architecture") 

223 

224 # Use the orchestrator to process documents with project support 

225 documents = await self.orchestrator.process_documents( 

226 sources_config=sources_config, 

227 source_type=source_type, 

228 source=source, 

229 project_id=project_id, 

230 force=force, 

231 ) 

232 

233 # Update metrics 

234 if documents: 

235 self.monitor.start_batch( 

236 "document_batch", 

237 batch_size=len(documents), 

238 metadata={ 

239 "source_type": source_type, 

240 "source": source, 

241 "project_id": project_id, 

242 "force": force, 

243 }, 

244 ) 

245 # Note: Success/error counts are handled internally by the new architecture 

246 self.monitor.end_batch("document_batch", len(documents), 0, []) 

247 

248 self.monitor.end_operation("ingestion_process") 

249 

250 logger.debug( 

251 f"Document processing completed. Processed {len(documents)} documents" 

252 ) 

253 return documents 

254 

255 except Exception as e: 

256 # Standardized error logging: user-friendly message + technical details + stack trace 

257 logger.error( 

258 "Document processing pipeline failed during ingestion", 

259 error=str(e), 

260 error_type=type(e).__name__, 

261 documents_attempted=len(documents), 

262 suggestion="Check data source connectivity, document formats, and system resources", 

263 exc_info=True, 

264 ) 

265 self.monitor.end_operation("ingestion_process", error=str(e)) 

266 raise 

267 

268 async def cleanup(self): 

269 """Clean up resources.""" 

270 if self._cleanup_performed: 

271 return 

272 

273 logger.info("Cleaning up pipeline resources") 

274 self._cleanup_performed = True 

275 

276 try: 

277 # Save metrics 

278 if hasattr(self, "monitor"): 

279 self.monitor.save_metrics() 

280 

281 # Stop metrics server 

282 try: 

283 prometheus_metrics.stop_metrics_server() 

284 except Exception as e: 

285 logger.warning(f"Error stopping metrics server: {e}") 

286 

287 # Use resource manager for cleanup 

288 if hasattr(self, "resource_manager"): 

289 await self.resource_manager.cleanup() 

290 

291 logger.info("Pipeline cleanup completed") 

292 except Exception as e: 

293 logger.error(f"Error during pipeline cleanup: {e}") 

294 

295 def __del__(self): 

296 """Destructor to ensure cleanup.""" 

297 try: 

298 # Can't await in __del__, so use the sync cleanup method 

299 self._sync_cleanup() 

300 except Exception as e: 

301 logger.error(f"Error in destructor cleanup: {e}") 

302 

303 def _sync_cleanup(self): 

304 """Synchronous cleanup for destructor and signal handlers.""" 

305 if self._cleanup_performed: 

306 return 

307 

308 logger.info("Cleaning up pipeline resources (sync)") 

309 self._cleanup_performed = True 

310 

311 # Save metrics 

312 try: 

313 if hasattr(self, "monitor"): 

314 self.monitor.save_metrics() 

315 except Exception as e: 

316 logger.error(f"Error saving metrics: {e}") 

317 

318 # Stop metrics server 

319 try: 

320 prometheus_metrics.stop_metrics_server() 

321 except Exception as e: 

322 logger.error(f"Error stopping metrics server: {e}") 

323 

324 # Use resource manager sync cleanup 

325 try: 

326 if hasattr(self, "resource_manager"): 

327 self.resource_manager._cleanup() 

328 except Exception as e: 

329 logger.error(f"Error in resource manager cleanup: {e}") 

330 

331 logger.info("Pipeline cleanup completed (sync)")