Coverage for src / qdrant_loader / core / async_ingestion_pipeline.py: 91%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Refactored async ingestion pipeline using the new modular architecture.""" 

2 

3from pathlib import Path 

4 

5from qdrant_loader.config import Settings, SourcesConfig 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.monitoring import prometheus_metrics 

8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor 

9from qdrant_loader.core.project_manager import ProjectManager 

10from qdrant_loader.core.qdrant_manager import QdrantManager 

11from qdrant_loader.core.state.state_manager import StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13from qdrant_loader.utils.sensitive import sanitize_exception_message 

14 

15from .pipeline import ( 

16 PipelineComponentsFactory, 

17 PipelineConfig, 

18 PipelineOrchestrator, 

19 ResourceManager, 

20) 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class AsyncIngestionPipeline: 

26 """Async ingestion pipeline using modular architecture. 

27 

28 This class provides a streamlined interface for the modular pipeline 

29 architecture, handling document ingestion and processing workflows. 

30 """ 

31 

32 def __init__( 

33 self, 

34 settings: Settings, 

35 qdrant_manager: QdrantManager, 

36 state_manager: StateManager | None = None, 

37 max_chunk_workers: int = 10, 

38 max_embed_workers: int = 4, 

39 max_upsert_workers: int = 4, 

40 queue_size: int = 1000, 

41 upsert_batch_size: int | None = None, 

42 enable_metrics: bool = False, 

43 metrics_dir: Path | None = None, # New parameter for workspace support 

44 ): 

45 """Initialize the async ingestion pipeline. 

46 

47 Args: 

48 settings: Application settings 

49 qdrant_manager: QdrantManager instance 

50 state_manager: Optional state manager 

51 

52 max_chunk_workers: Maximum number of chunking workers 

53 max_embed_workers: Maximum number of embedding workers 

54 max_upsert_workers: Maximum number of upsert workers 

55 queue_size: Queue size for workers 

56 upsert_batch_size: Batch size for upserts 

57 enable_metrics: Whether to enable metrics server 

58 metrics_dir: Custom metrics directory (for workspace support) 

59 """ 

60 self.settings = settings 

61 self.qdrant_manager = qdrant_manager 

62 

63 # Validate that global configuration is available for pipeline operation. 

64 if not settings.global_config: 

65 raise ValueError( 

66 "Global configuration not available. Please check your configuration file." 

67 ) 

68 

69 # Create pipeline configuration with worker and batch size settings. 

70 self.pipeline_config = PipelineConfig( 

71 max_chunk_workers=max_chunk_workers, 

72 max_embed_workers=max_embed_workers, 

73 max_upsert_workers=max_upsert_workers, 

74 queue_size=queue_size, 

75 upsert_batch_size=upsert_batch_size, 

76 enable_metrics=enable_metrics, 

77 ) 

78 

79 # Create resource manager to handle cleanup and signal handling. 

80 self.resource_manager = ResourceManager() 

81 self.resource_manager.register_signal_handlers() 

82 

83 # Create state manager instance if not provided by caller. 

84 self.state_manager = state_manager or StateManager( 

85 settings.global_config.state_management 

86 ) 

87 

88 # Initialize project manager to support multi-project configurations. 

89 if not settings.global_config.qdrant: 

90 raise ValueError( 

91 "Qdrant configuration is required for project manager initialization" 

92 ) 

93 

94 self.project_manager = ProjectManager( 

95 projects_config=settings.projects_config, 

96 global_collection_name=settings.global_config.qdrant.collection_name, 

97 ) 

98 

99 # Create pipeline components using factory 

100 factory = PipelineComponentsFactory() 

101 self.components = factory.create_components( 

102 settings=settings, 

103 config=self.pipeline_config, 

104 qdrant_manager=qdrant_manager, 

105 state_manager=self.state_manager, 

106 resource_manager=self.resource_manager, 

107 ) 

108 

109 # Create orchestrator with project manager support 

110 self.orchestrator = PipelineOrchestrator( 

111 settings, self.components, self.project_manager 

112 ) 

113 

114 # Initialize performance monitor with custom or default metrics directory 

115 if metrics_dir: 

116 # Use provided metrics directory (workspace mode) 

117 # Accept both Path and str inputs 

118 final_metrics_dir = ( 

119 metrics_dir if isinstance(metrics_dir, Path) else Path(metrics_dir) 

120 ) 

121 else: 

122 # Use default metrics directory 

123 final_metrics_dir = Path.cwd() / "metrics" 

124 

125 final_metrics_dir.mkdir(parents=True, exist_ok=True) 

126 logger.info(f"Initializing metrics directory at {final_metrics_dir}") 

127 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute())) 

128 

129 # Start metrics server if enabled 

130 if enable_metrics: 

131 prometheus_metrics.start_metrics_server() 

132 

133 logger.info("AsyncIngestionPipeline initialized with new modular architecture") 

134 

135 # Track cleanup state to prevent duplicate cleanup 

136 self._cleanup_performed = False 

137 

138 async def initialize(self): 

139 """Initialize the pipeline.""" 

140 logger.debug("Starting pipeline initialization") 

141 

142 try: 

143 # Initialize state manager first 

144 if not self.state_manager.is_initialized: 

145 logger.debug("Initializing state manager") 

146 await self.state_manager.initialize() 

147 

148 # Initialize project manager 

149 if not self.project_manager._initialized: 

150 logger.debug("Initializing project manager") 

151 # Prefer direct use of session factory to match existing tests/mocks 

152 session_factory = getattr(self.state_manager, "_session_factory", None) 

153 if session_factory is None: 

154 logger.error( 

155 "State manager session factory is not available during initialization", 

156 suggestion="Check database configuration and ensure proper state manager setup", 

157 ) 

158 raise RuntimeError("State manager session factory is not available") 

159 

160 try: 

161 async with session_factory() as session: # type: ignore 

162 await self.project_manager.initialize(session) 

163 logger.debug("Project manager initialization completed") 

164 

165 except Exception as e: 

166 logger.error( 

167 "Failed to initialize project manager during pipeline startup", 

168 error=sanitize_exception_message(e), 

169 error_type=type(e).__name__, 

170 suggestion="Check database connectivity and project configuration", 

171 ) 

172 raise 

173 except Exception as e: 

174 logger.error( 

175 "Pipeline initialization failed during startup sequence", 

176 error=sanitize_exception_message(e), 

177 error_type=type(e).__name__, 

178 suggestion="Check configuration, database connectivity, and system resources", 

179 ) 

180 raise 

181 

182 async def process_documents( 

183 self, 

184 sources_config: SourcesConfig | None = None, 

185 source_type: str | None = None, 

186 source: str | None = None, 

187 project_id: str | None = None, 

188 force: bool = False, 

189 ) -> list[Document]: 

190 """Process documents from all configured sources. 

191 

192 Args: 

193 sources_config: Sources configuration to use (deprecated, use project_id instead) 

194 source_type: Filter by source type 

195 source: Filter by specific source name 

196 project_id: Process documents for a specific project 

197 force: Force processing of all documents, bypassing change detection 

198 

199 Returns: 

200 List of processed documents 

201 """ 

202 # Ensure the pipeline is initialized 

203 await self.initialize() 

204 

205 # Reset metrics for new run 

206 self.monitor.clear_metrics() 

207 self.monitor.start_operation( 

208 "ingestion_process", 

209 metadata={ 

210 "source_type": source_type, 

211 "source": source, 

212 "project_id": project_id, 

213 "force": force, 

214 }, 

215 ) 

216 

217 documents = [] # Initialize to avoid UnboundLocalError in exception handler 

218 try: 

219 logger.debug("Starting document processing with new pipeline architecture") 

220 

221 # Use the orchestrator to process documents with project support 

222 documents = await self.orchestrator.process_documents( 

223 sources_config=sources_config, 

224 source_type=source_type, 

225 source=source, 

226 project_id=project_id, 

227 force=force, 

228 ) 

229 

230 # Update metrics 

231 if documents: 

232 pipeline_result = getattr( 

233 self.orchestrator, "last_pipeline_result", None 

234 ) 

235 total_chunks = getattr(pipeline_result, "success_count", 0) 

236 

237 def _safe_document_size(doc: Document) -> int: 

238 try: 

239 return int(doc.metadata.get("size", 0)) 

240 except (TypeError, ValueError): 

241 return 0 

242 

243 total_size_bytes = sum(_safe_document_size(doc) for doc in documents) 

244 

245 self.monitor.start_batch( 

246 "document_batch", 

247 batch_size=len(documents), 

248 metadata={ 

249 "source_type": source_type, 

250 "source": source, 

251 "project_id": project_id, 

252 "force": force, 

253 }, 

254 ) 

255 # Note: Success/error counts are handled internally by the new architecture 

256 self.monitor.end_batch( 

257 "document_batch", 

258 len(documents), 

259 0, 

260 [], 

261 total_chunks=total_chunks, 

262 total_size_bytes=total_size_bytes, 

263 ) 

264 

265 self.monitor.end_operation("ingestion_process") 

266 

267 logger.debug( 

268 f"Document processing completed. Processed {len(documents)} documents" 

269 ) 

270 return documents 

271 

272 except Exception as e: 

273 safe_error = sanitize_exception_message(e) 

274 logger.error( 

275 "Document processing pipeline failed during ingestion", 

276 error=safe_error, 

277 error_type=type(e).__name__, 

278 documents_attempted=len(documents), 

279 suggestion="Check data source connectivity, document formats, and system resources", 

280 ) 

281 self.monitor.end_operation( 

282 "ingestion_process", success=False, error=safe_error 

283 ) 

284 raise 

285 

286 async def cleanup(self): 

287 """Clean up resources.""" 

288 if self._cleanup_performed: 

289 return 

290 

291 logger.info("Cleaning up pipeline resources") 

292 self._cleanup_performed = True 

293 

294 try: 

295 # Save metrics 

296 if hasattr(self, "monitor"): 

297 self.monitor.save_metrics() 

298 

299 # Stop metrics server 

300 try: 

301 prometheus_metrics.stop_metrics_server() 

302 except Exception as e: 

303 logger.warning( 

304 f"Error stopping metrics server: {sanitize_exception_message(e)}" 

305 ) 

306 

307 # Use resource manager for cleanup 

308 if hasattr(self, "resource_manager"): 

309 await self.resource_manager.cleanup() 

310 

311 logger.info("Pipeline cleanup completed") 

312 except Exception as e: 

313 logger.error( 

314 f"Error during pipeline cleanup: {sanitize_exception_message(e)}" 

315 ) 

316 

317 def __del__(self): 

318 """Destructor to ensure cleanup.""" 

319 try: 

320 # Can't await in __del__, so use the sync cleanup method 

321 self._sync_cleanup() 

322 except Exception as e: 

323 logger.error( 

324 f"Error in destructor cleanup: {sanitize_exception_message(e)}" 

325 ) 

326 

327 def _sync_cleanup(self): 

328 """Synchronous cleanup for destructor and signal handlers.""" 

329 if self._cleanup_performed: 

330 return 

331 

332 logger.info("Cleaning up pipeline resources (sync)") 

333 self._cleanup_performed = True 

334 

335 # Save metrics 

336 try: 

337 if hasattr(self, "monitor"): 

338 self.monitor.save_metrics() 

339 except Exception as e: 

340 logger.error(f"Error saving metrics: {sanitize_exception_message(e)}") 

341 

342 # Stop metrics server 

343 try: 

344 prometheus_metrics.stop_metrics_server() 

345 except Exception as e: 

346 logger.error( 

347 f"Error stopping metrics server: {sanitize_exception_message(e)}" 

348 ) 

349 

350 # Use resource manager sync cleanup 

351 try: 

352 if hasattr(self, "resource_manager"): 

353 self.resource_manager._cleanup() 

354 except Exception as e: 

355 logger.error( 

356 f"Error in resource manager cleanup: {sanitize_exception_message(e)}" 

357 ) 

358 

359 logger.info("Pipeline cleanup completed (sync)")