Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 95%
128 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Refactored async ingestion pipeline using the new modular architecture."""
3from pathlib import Path
5from qdrant_loader.config import Settings, SourcesConfig
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.monitoring import prometheus_metrics
8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
9from qdrant_loader.core.qdrant_manager import QdrantManager
10from qdrant_loader.core.state.state_manager import StateManager
11from qdrant_loader.core.project_manager import ProjectManager
12from qdrant_loader.utils.logging import LoggingConfig
14from .pipeline import (
15 PipelineComponentsFactory,
16 PipelineConfig,
17 PipelineOrchestrator,
18 ResourceManager,
19)
21logger = LoggingConfig.get_logger(__name__)
24class AsyncIngestionPipeline:
25 """Refactored async ingestion pipeline using modular architecture.
27 This class maintains backward compatibility with the original interface
28 while using the new modular pipeline architecture internally.
29 """
31 def __init__(
32 self,
33 settings: Settings,
34 qdrant_manager: QdrantManager,
35 state_manager: StateManager | None = None,
36 embedding_cache=None, # Placeholder for future cache (maintained for compatibility)
37 max_chunk_workers: int = 10,
38 max_embed_workers: int = 4,
39 max_upsert_workers: int = 4,
40 queue_size: int = 1000,
41 upsert_batch_size: int | None = None,
42 enable_metrics: bool = False,
43 metrics_dir: Path | None = None, # New parameter for workspace support
44 ):
45 """Initialize the async ingestion pipeline.
47 Args:
48 settings: Application settings
49 qdrant_manager: QdrantManager instance
50 state_manager: Optional state manager
51 embedding_cache: Placeholder for future cache (unused)
52 max_chunk_workers: Maximum number of chunking workers
53 max_embed_workers: Maximum number of embedding workers
54 max_upsert_workers: Maximum number of upsert workers
55 queue_size: Queue size for workers
56 upsert_batch_size: Batch size for upserts
57 enable_metrics: Whether to enable metrics server
58 metrics_dir: Custom metrics directory (for workspace support)
59 """
60 self.settings = settings
61 self.qdrant_manager = qdrant_manager
62 self.embedding_cache = embedding_cache # Maintained for compatibility
64 # Validate global configuration
65 if not settings.global_config:
66 raise ValueError(
67 "Global configuration not available. Please check your configuration file."
68 )
70 # Create pipeline configuration
71 self.pipeline_config = PipelineConfig(
72 max_chunk_workers=max_chunk_workers,
73 max_embed_workers=max_embed_workers,
74 max_upsert_workers=max_upsert_workers,
75 queue_size=queue_size,
76 upsert_batch_size=upsert_batch_size,
77 enable_metrics=enable_metrics,
78 )
80 # Create resource manager
81 self.resource_manager = ResourceManager()
82 self.resource_manager.register_signal_handlers()
84 # Create state manager if not provided
85 self.state_manager = state_manager or StateManager(
86 settings.global_config.state_management
87 )
89 # Initialize project manager for multi-project support
90 if not settings.global_config.qdrant:
91 raise ValueError(
92 "Qdrant configuration is required for project manager initialization"
93 )
95 self.project_manager = ProjectManager(
96 projects_config=settings.projects_config,
97 global_collection_name=settings.global_config.qdrant.collection_name,
98 )
100 # Create pipeline components using factory
101 factory = PipelineComponentsFactory()
102 self.components = factory.create_components(
103 settings=settings,
104 config=self.pipeline_config,
105 qdrant_manager=qdrant_manager,
106 state_manager=self.state_manager,
107 resource_manager=self.resource_manager,
108 )
110 # Create orchestrator with project manager support
111 self.orchestrator = PipelineOrchestrator(
112 settings, self.components, self.project_manager
113 )
115 # Initialize performance monitor with custom or default metrics directory
116 if metrics_dir:
117 # Use provided metrics directory (workspace mode)
118 final_metrics_dir = metrics_dir
119 else:
120 # Use default metrics directory
121 final_metrics_dir = Path.cwd() / "metrics"
123 final_metrics_dir.mkdir(parents=True, exist_ok=True)
124 logger.info(f"Initializing metrics directory at {final_metrics_dir}")
125 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute()))
127 # Start metrics server if enabled
128 if enable_metrics:
129 prometheus_metrics.start_metrics_server()
131 logger.info("AsyncIngestionPipeline initialized with new modular architecture")
133 # Track cleanup state to prevent duplicate cleanup
134 self._cleanup_performed = False
136 async def initialize(self):
137 """Initialize the pipeline (maintained for compatibility)."""
138 logger.debug("Pipeline initialization called")
140 # Initialize state manager first
141 if not self.state_manager._initialized:
142 logger.debug("Initializing state manager")
143 await self.state_manager.initialize()
145 # Initialize project manager
146 if not self.project_manager._initialized:
147 logger.debug("Initializing project manager")
148 if self.state_manager._session_factory:
149 async with self.state_manager._session_factory() as session:
150 await self.project_manager.initialize(session)
152 async def process_documents(
153 self,
154 sources_config: SourcesConfig | None = None,
155 source_type: str | None = None,
156 source: str | None = None,
157 project_id: str | None = None,
158 ) -> list[Document]:
159 """Process documents from all configured sources.
161 Args:
162 sources_config: Sources configuration to use (deprecated, use project_id instead)
163 source_type: Filter by source type
164 source: Filter by specific source name
165 project_id: Process documents for a specific project
167 Returns:
168 List of processed documents
169 """
170 # Ensure the pipeline is initialized
171 await self.initialize()
173 # Reset metrics for new run
174 self.monitor.clear_metrics()
176 self.monitor.start_operation(
177 "ingestion_process",
178 metadata={
179 "source_type": source_type,
180 "source": source,
181 "project_id": project_id,
182 },
183 )
185 try:
186 logger.debug("Starting document processing with new pipeline architecture")
188 # Use the orchestrator to process documents with project support
189 documents = await self.orchestrator.process_documents(
190 sources_config=sources_config,
191 source_type=source_type,
192 source=source,
193 project_id=project_id,
194 )
196 # Update metrics (maintained for compatibility)
197 if documents:
198 self.monitor.start_batch(
199 "document_batch",
200 batch_size=len(documents),
201 metadata={
202 "source_type": source_type,
203 "source": source,
204 "project_id": project_id,
205 },
206 )
207 # Note: Success/error counts are handled internally by the new architecture
208 self.monitor.end_batch("document_batch", len(documents), 0, [])
210 self.monitor.end_operation("ingestion_process")
212 logger.debug(
213 f"Document processing completed. Processed {len(documents)} documents"
214 )
215 return documents
217 except Exception as e:
218 logger.error(f"Document processing failed: {e}", exc_info=True)
219 self.monitor.end_operation("ingestion_process", error=str(e))
220 raise
222 async def cleanup(self):
223 """Clean up resources."""
224 if self._cleanup_performed:
225 return
227 logger.info("Cleaning up pipeline resources")
228 self._cleanup_performed = True
230 try:
231 # Save metrics
232 if hasattr(self, "monitor"):
233 self.monitor.save_metrics()
235 # Stop metrics server
236 try:
237 prometheus_metrics.stop_metrics_server()
238 except Exception as e:
239 logger.warning(f"Error stopping metrics server: {e}")
241 # Use resource manager for cleanup
242 if hasattr(self, "resource_manager"):
243 await self.resource_manager.cleanup()
245 logger.info("Pipeline cleanup completed")
246 except Exception as e:
247 logger.error(f"Error during pipeline cleanup: {e}")
249 def __del__(self):
250 """Destructor to ensure cleanup."""
251 try:
252 # Can't await in __del__, so use the sync cleanup method
253 self._sync_cleanup()
254 except Exception as e:
255 logger.error(f"Error in destructor cleanup: {e}")
257 def _sync_cleanup(self):
258 """Synchronous cleanup for destructor and signal handlers."""
259 if self._cleanup_performed:
260 return
262 logger.info("Cleaning up pipeline resources (sync)")
263 self._cleanup_performed = True
265 # Save metrics
266 try:
267 if hasattr(self, "monitor"):
268 self.monitor.save_metrics()
269 except Exception as e:
270 logger.error(f"Error saving metrics: {e}")
272 # Stop metrics server
273 try:
274 prometheus_metrics.stop_metrics_server()
275 except Exception as e:
276 logger.error(f"Error stopping metrics server: {e}")
278 # Use resource manager sync cleanup
279 try:
280 if hasattr(self, "resource_manager"):
281 self.resource_manager._cleanup()
282 except Exception as e:
283 logger.error(f"Error in resource manager cleanup: {e}")
285 logger.info("Pipeline cleanup completed (sync)")
287 # Backward compatibility properties
288 @property
289 def _shutdown_event(self):
290 """Backward compatibility property for shutdown event."""
291 return self.resource_manager.shutdown_event
293 @property
294 def _active_tasks(self):
295 """Backward compatibility property for active tasks."""
296 return self.resource_manager.active_tasks
298 @property
299 def _cleanup_done(self):
300 """Backward compatibility property for cleanup status."""
301 return self.resource_manager.cleanup_done
303 # Legacy methods maintained for compatibility
304 def _cleanup(self):
305 """Legacy cleanup method (redirects to sync cleanup)."""
306 self._sync_cleanup()
308 async def _async_cleanup(self):
309 """Legacy async cleanup method (redirects to resource manager)."""
310 await self.resource_manager.cleanup()
312 def _handle_sigint(self, signum, frame):
313 """Legacy signal handler (redirects to resource manager)."""
314 self.resource_manager._handle_sigint(signum, frame)
316 def _handle_sigterm(self, signum, frame):
317 """Legacy signal handler (redirects to resource manager)."""
318 self.resource_manager._handle_sigterm(signum, frame)
320 def _cancel_all_tasks(self):
321 """Legacy task cancellation (redirects to resource manager)."""
322 self.resource_manager._cancel_all_tasks()
324 def _force_immediate_exit(self):
325 """Legacy force exit (redirects to resource manager)."""
326 self.resource_manager._force_immediate_exit()