Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 89%
140 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Refactored async ingestion pipeline using the new modular architecture."""
3from pathlib import Path
5from qdrant_loader.config import Settings, SourcesConfig
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.monitoring import prometheus_metrics
8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
9from qdrant_loader.core.project_manager import ProjectManager
10from qdrant_loader.core.qdrant_manager import QdrantManager
11from qdrant_loader.core.state.state_manager import StateManager
12from qdrant_loader.utils.logging import LoggingConfig
14from .pipeline import (
15 PipelineComponentsFactory,
16 PipelineConfig,
17 PipelineOrchestrator,
18 ResourceManager,
19)
21logger = LoggingConfig.get_logger(__name__)
24class AsyncIngestionPipeline:
25 """Refactored async ingestion pipeline using modular architecture.
27 This class maintains backward compatibility with the original interface
28 while using the new modular pipeline architecture internally.
29 """
31 def __init__(
32 self,
33 settings: Settings,
34 qdrant_manager: QdrantManager,
35 state_manager: StateManager | None = None,
36 embedding_cache=None, # Placeholder for future cache (maintained for compatibility)
37 max_chunk_workers: int = 10,
38 max_embed_workers: int = 4,
39 max_upsert_workers: int = 4,
40 queue_size: int = 1000,
41 upsert_batch_size: int | None = None,
42 enable_metrics: bool = False,
43 metrics_dir: Path | None = None, # New parameter for workspace support
44 ):
45 """Initialize the async ingestion pipeline.
47 Args:
48 settings: Application settings
49 qdrant_manager: QdrantManager instance
50 state_manager: Optional state manager
51 embedding_cache: Placeholder for future cache (unused)
52 max_chunk_workers: Maximum number of chunking workers
53 max_embed_workers: Maximum number of embedding workers
54 max_upsert_workers: Maximum number of upsert workers
55 queue_size: Queue size for workers
56 upsert_batch_size: Batch size for upserts
57 enable_metrics: Whether to enable metrics server
58 metrics_dir: Custom metrics directory (for workspace support)
59 """
60 self.settings = settings
61 self.qdrant_manager = qdrant_manager
62 self.embedding_cache = embedding_cache # Maintained for compatibility
64 # Validate global configuration
65 if not settings.global_config:
66 raise ValueError(
67 "Global configuration not available. Please check your configuration file."
68 )
70 # Create pipeline configuration
71 self.pipeline_config = PipelineConfig(
72 max_chunk_workers=max_chunk_workers,
73 max_embed_workers=max_embed_workers,
74 max_upsert_workers=max_upsert_workers,
75 queue_size=queue_size,
76 upsert_batch_size=upsert_batch_size,
77 enable_metrics=enable_metrics,
78 )
80 # Create resource manager
81 self.resource_manager = ResourceManager()
82 self.resource_manager.register_signal_handlers()
84 # Create state manager if not provided
85 self.state_manager = state_manager or StateManager(
86 settings.global_config.state_management
87 )
89 # Initialize project manager for multi-project support
90 if not settings.global_config.qdrant:
91 raise ValueError(
92 "Qdrant configuration is required for project manager initialization"
93 )
95 self.project_manager = ProjectManager(
96 projects_config=settings.projects_config,
97 global_collection_name=settings.global_config.qdrant.collection_name,
98 )
100 # Create pipeline components using factory
101 factory = PipelineComponentsFactory()
102 self.components = factory.create_components(
103 settings=settings,
104 config=self.pipeline_config,
105 qdrant_manager=qdrant_manager,
106 state_manager=self.state_manager,
107 resource_manager=self.resource_manager,
108 )
110 # Create orchestrator with project manager support
111 self.orchestrator = PipelineOrchestrator(
112 settings, self.components, self.project_manager
113 )
115 # Initialize performance monitor with custom or default metrics directory
116 if metrics_dir:
117 # Use provided metrics directory (workspace mode)
118 final_metrics_dir = metrics_dir
119 else:
120 # Use default metrics directory
121 final_metrics_dir = Path.cwd() / "metrics"
123 final_metrics_dir.mkdir(parents=True, exist_ok=True)
124 logger.info(f"Initializing metrics directory at {final_metrics_dir}")
125 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute()))
127 # Start metrics server if enabled
128 if enable_metrics:
129 prometheus_metrics.start_metrics_server()
131 logger.info("AsyncIngestionPipeline initialized with new modular architecture")
133 # Track cleanup state to prevent duplicate cleanup
134 self._cleanup_performed = False
136 async def initialize(self):
137 """Initialize the pipeline (maintained for compatibility)."""
138 logger.debug("Pipeline initialization called")
140 try:
141 # Initialize state manager first
142 if not self.state_manager._initialized:
143 logger.debug("Initializing state manager")
144 await self.state_manager.initialize()
145 logger.debug("State manager initialization completed")
147 # Initialize project manager
148 if not self.project_manager._initialized:
149 logger.debug("Initializing project manager")
150 if self.state_manager._session_factory:
151 try:
152 async with self.state_manager._session_factory() as session:
153 await self.project_manager.initialize(session)
154 logger.debug("Project manager initialization completed")
155 except Exception as e:
156 logger.error(
157 f"Failed to initialize project manager: {e}", exc_info=True
158 )
159 raise
160 else:
161 logger.error("State manager session factory is not available")
162 raise RuntimeError("State manager session factory is not available")
163 except Exception as e:
164 logger.error(f"Pipeline initialization failed: {e}", exc_info=True)
165 raise
167 async def process_documents(
168 self,
169 sources_config: SourcesConfig | None = None,
170 source_type: str | None = None,
171 source: str | None = None,
172 project_id: str | None = None,
173 force: bool = False,
174 ) -> list[Document]:
175 """Process documents from all configured sources.
177 Args:
178 sources_config: Sources configuration to use (deprecated, use project_id instead)
179 source_type: Filter by source type
180 source: Filter by specific source name
181 project_id: Process documents for a specific project
182 force: Force processing of all documents, bypassing change detection
184 Returns:
185 List of processed documents
186 """
187 # Ensure the pipeline is initialized
188 await self.initialize()
190 # Reset metrics for new run
191 self.monitor.clear_metrics()
193 self.monitor.start_operation(
194 "ingestion_process",
195 metadata={
196 "source_type": source_type,
197 "source": source,
198 "project_id": project_id,
199 "force": force,
200 },
201 )
203 try:
204 logger.debug("Starting document processing with new pipeline architecture")
206 # Use the orchestrator to process documents with project support
207 documents = await self.orchestrator.process_documents(
208 sources_config=sources_config,
209 source_type=source_type,
210 source=source,
211 project_id=project_id,
212 force=force,
213 )
215 # Update metrics (maintained for compatibility)
216 if documents:
217 self.monitor.start_batch(
218 "document_batch",
219 batch_size=len(documents),
220 metadata={
221 "source_type": source_type,
222 "source": source,
223 "project_id": project_id,
224 "force": force,
225 },
226 )
227 # Note: Success/error counts are handled internally by the new architecture
228 self.monitor.end_batch("document_batch", len(documents), 0, [])
230 self.monitor.end_operation("ingestion_process")
232 logger.debug(
233 f"Document processing completed. Processed {len(documents)} documents"
234 )
235 return documents
237 except Exception as e:
238 logger.error(f"Document processing failed: {e}", exc_info=True)
239 self.monitor.end_operation("ingestion_process", error=str(e))
240 raise
242 async def cleanup(self):
243 """Clean up resources."""
244 if self._cleanup_performed:
245 return
247 logger.info("Cleaning up pipeline resources")
248 self._cleanup_performed = True
250 try:
251 # Save metrics
252 if hasattr(self, "monitor"):
253 self.monitor.save_metrics()
255 # Stop metrics server
256 try:
257 prometheus_metrics.stop_metrics_server()
258 except Exception as e:
259 logger.warning(f"Error stopping metrics server: {e}")
261 # Use resource manager for cleanup
262 if hasattr(self, "resource_manager"):
263 await self.resource_manager.cleanup()
265 logger.info("Pipeline cleanup completed")
266 except Exception as e:
267 logger.error(f"Error during pipeline cleanup: {e}")
269 def __del__(self):
270 """Destructor to ensure cleanup."""
271 try:
272 # Can't await in __del__, so use the sync cleanup method
273 self._sync_cleanup()
274 except Exception as e:
275 logger.error(f"Error in destructor cleanup: {e}")
277 def _sync_cleanup(self):
278 """Synchronous cleanup for destructor and signal handlers."""
279 if self._cleanup_performed:
280 return
282 logger.info("Cleaning up pipeline resources (sync)")
283 self._cleanup_performed = True
285 # Save metrics
286 try:
287 if hasattr(self, "monitor"):
288 self.monitor.save_metrics()
289 except Exception as e:
290 logger.error(f"Error saving metrics: {e}")
292 # Stop metrics server
293 try:
294 prometheus_metrics.stop_metrics_server()
295 except Exception as e:
296 logger.error(f"Error stopping metrics server: {e}")
298 # Use resource manager sync cleanup
299 try:
300 if hasattr(self, "resource_manager"):
301 self.resource_manager._cleanup()
302 except Exception as e:
303 logger.error(f"Error in resource manager cleanup: {e}")
305 logger.info("Pipeline cleanup completed (sync)")
307 # Backward compatibility properties
308 @property
309 def _shutdown_event(self):
310 """Backward compatibility property for shutdown event."""
311 return self.resource_manager.shutdown_event
313 @property
314 def _active_tasks(self):
315 """Backward compatibility property for active tasks."""
316 return self.resource_manager.active_tasks
318 @property
319 def _cleanup_done(self):
320 """Backward compatibility property for cleanup status."""
321 return self.resource_manager.cleanup_done
323 # Legacy methods maintained for compatibility
324 def _cleanup(self):
325 """Legacy cleanup method (redirects to sync cleanup)."""
326 self._sync_cleanup()
328 async def _async_cleanup(self):
329 """Legacy async cleanup method (redirects to resource manager)."""
330 await self.resource_manager.cleanup()
332 def _handle_sigint(self, signum, frame):
333 """Legacy signal handler (redirects to resource manager)."""
334 self.resource_manager._handle_sigint(signum, frame)
336 def _handle_sigterm(self, signum, frame):
337 """Legacy signal handler (redirects to resource manager)."""
338 self.resource_manager._handle_sigterm(signum, frame)
340 def _cancel_all_tasks(self):
341 """Legacy task cancellation (redirects to resource manager)."""
342 self.resource_manager._cancel_all_tasks()
344 def _force_immediate_exit(self):
345 """Legacy force exit (redirects to resource manager)."""
346 self.resource_manager._force_immediate_exit()