Coverage for src/qdrant_loader/core/async_ingestion_pipeline.py: 90%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Refactored async ingestion pipeline using the new modular architecture."""
3from pathlib import Path
5from qdrant_loader.config import Settings, SourcesConfig
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.monitoring import prometheus_metrics
8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
9from qdrant_loader.core.project_manager import ProjectManager
10from qdrant_loader.core.qdrant_manager import QdrantManager
11from qdrant_loader.core.state.state_manager import StateManager
12from qdrant_loader.utils.logging import LoggingConfig
14from .pipeline import (
15 PipelineComponentsFactory,
16 PipelineConfig,
17 PipelineOrchestrator,
18 ResourceManager,
19)
21logger = LoggingConfig.get_logger(__name__)
24class AsyncIngestionPipeline:
25 """Async ingestion pipeline using modular architecture.
27 This class provides a streamlined interface for the modular pipeline
28 architecture, handling document ingestion and processing workflows.
29 """
31 def __init__(
32 self,
33 settings: Settings,
34 qdrant_manager: QdrantManager,
35 state_manager: StateManager | None = None,
36 max_chunk_workers: int = 10,
37 max_embed_workers: int = 4,
38 max_upsert_workers: int = 4,
39 queue_size: int = 1000,
40 upsert_batch_size: int | None = None,
41 enable_metrics: bool = False,
42 metrics_dir: Path | None = None, # New parameter for workspace support
43 ):
44 """Initialize the async ingestion pipeline.
46 Args:
47 settings: Application settings
48 qdrant_manager: QdrantManager instance
49 state_manager: Optional state manager
51 max_chunk_workers: Maximum number of chunking workers
52 max_embed_workers: Maximum number of embedding workers
53 max_upsert_workers: Maximum number of upsert workers
54 queue_size: Queue size for workers
55 upsert_batch_size: Batch size for upserts
56 enable_metrics: Whether to enable metrics server
57 metrics_dir: Custom metrics directory (for workspace support)
58 """
59 self.settings = settings
60 self.qdrant_manager = qdrant_manager
62 # Validate that global configuration is available for pipeline operation.
63 if not settings.global_config:
64 raise ValueError(
65 "Global configuration not available. Please check your configuration file."
66 )
68 # Create pipeline configuration with worker and batch size settings.
69 self.pipeline_config = PipelineConfig(
70 max_chunk_workers=max_chunk_workers,
71 max_embed_workers=max_embed_workers,
72 max_upsert_workers=max_upsert_workers,
73 queue_size=queue_size,
74 upsert_batch_size=upsert_batch_size,
75 enable_metrics=enable_metrics,
76 )
78 # Create resource manager to handle cleanup and signal handling.
79 self.resource_manager = ResourceManager()
80 self.resource_manager.register_signal_handlers()
82 # Create state manager instance if not provided by caller.
83 self.state_manager = state_manager or StateManager(
84 settings.global_config.state_management
85 )
87 # Initialize project manager to support multi-project configurations.
88 if not settings.global_config.qdrant:
89 raise ValueError(
90 "Qdrant configuration is required for project manager initialization"
91 )
93 self.project_manager = ProjectManager(
94 projects_config=settings.projects_config,
95 global_collection_name=settings.global_config.qdrant.collection_name,
96 )
98 # Create pipeline components using factory
99 factory = PipelineComponentsFactory()
100 self.components = factory.create_components(
101 settings=settings,
102 config=self.pipeline_config,
103 qdrant_manager=qdrant_manager,
104 state_manager=self.state_manager,
105 resource_manager=self.resource_manager,
106 )
108 # Create orchestrator with project manager support
109 self.orchestrator = PipelineOrchestrator(
110 settings, self.components, self.project_manager
111 )
113 # Initialize performance monitor with custom or default metrics directory
114 if metrics_dir:
115 # Use provided metrics directory (workspace mode)
116 # Accept both Path and str inputs
117 final_metrics_dir = (
118 metrics_dir if isinstance(metrics_dir, Path) else Path(metrics_dir)
119 )
120 else:
121 # Use default metrics directory
122 final_metrics_dir = Path.cwd() / "metrics"
124 final_metrics_dir.mkdir(parents=True, exist_ok=True)
125 logger.info(f"Initializing metrics directory at {final_metrics_dir}")
126 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute()))
128 # Start metrics server if enabled
129 if enable_metrics:
130 prometheus_metrics.start_metrics_server()
132 logger.info("AsyncIngestionPipeline initialized with new modular architecture")
134 # Track cleanup state to prevent duplicate cleanup
135 self._cleanup_performed = False
137 async def initialize(self):
138 """Initialize the pipeline."""
139 logger.debug("Starting pipeline initialization")
141 try:
142 # Initialize state manager first
143 if not self.state_manager.is_initialized:
144 logger.debug("Initializing state manager")
145 await self.state_manager.initialize()
147 # Initialize project manager
148 if not self.project_manager._initialized:
149 logger.debug("Initializing project manager")
150 # Prefer direct use of session factory to match existing tests/mocks
151 session_factory = getattr(self.state_manager, "_session_factory", None)
152 if session_factory is None:
153 logger.error(
154 "State manager session factory is not available during initialization",
155 suggestion="Check database configuration and ensure proper state manager setup",
156 )
157 raise RuntimeError("State manager session factory is not available")
159 try:
160 async with session_factory() as session: # type: ignore
161 await self.project_manager.initialize(session)
162 logger.debug("Project manager initialization completed")
163 except Exception as e:
164 # Standardized error logging: user-friendly message + technical details + stack trace
165 logger.error(
166 "Failed to initialize project manager during pipeline startup",
167 error=str(e),
168 error_type=type(e).__name__,
169 suggestion="Check database connectivity and project configuration",
170 exc_info=True,
171 )
172 raise
173 except Exception as e:
174 # Standardized error logging: user-friendly message + technical details + stack trace
175 logger.error(
176 "Pipeline initialization failed during startup sequence",
177 error=str(e),
178 error_type=type(e).__name__,
179 suggestion="Check configuration, database connectivity, and system resources",
180 exc_info=True,
181 )
182 raise
184 async def process_documents(
185 self,
186 sources_config: SourcesConfig | None = None,
187 source_type: str | None = None,
188 source: str | None = None,
189 project_id: str | None = None,
190 force: bool = False,
191 ) -> list[Document]:
192 """Process documents from all configured sources.
194 Args:
195 sources_config: Sources configuration to use (deprecated, use project_id instead)
196 source_type: Filter by source type
197 source: Filter by specific source name
198 project_id: Process documents for a specific project
199 force: Force processing of all documents, bypassing change detection
201 Returns:
202 List of processed documents
203 """
204 # Ensure the pipeline is initialized
205 await self.initialize()
207 # Reset metrics for new run
208 self.monitor.clear_metrics()
210 self.monitor.start_operation(
211 "ingestion_process",
212 metadata={
213 "source_type": source_type,
214 "source": source,
215 "project_id": project_id,
216 "force": force,
217 },
218 )
220 documents = [] # Initialize to avoid UnboundLocalError in exception handler
221 try:
222 logger.debug("Starting document processing with new pipeline architecture")
224 # Use the orchestrator to process documents with project support
225 documents = await self.orchestrator.process_documents(
226 sources_config=sources_config,
227 source_type=source_type,
228 source=source,
229 project_id=project_id,
230 force=force,
231 )
233 # Update metrics
234 if documents:
235 self.monitor.start_batch(
236 "document_batch",
237 batch_size=len(documents),
238 metadata={
239 "source_type": source_type,
240 "source": source,
241 "project_id": project_id,
242 "force": force,
243 },
244 )
245 # Note: Success/error counts are handled internally by the new architecture
246 self.monitor.end_batch("document_batch", len(documents), 0, [])
248 self.monitor.end_operation("ingestion_process")
250 logger.debug(
251 f"Document processing completed. Processed {len(documents)} documents"
252 )
253 return documents
255 except Exception as e:
256 # Standardized error logging: user-friendly message + technical details + stack trace
257 logger.error(
258 "Document processing pipeline failed during ingestion",
259 error=str(e),
260 error_type=type(e).__name__,
261 documents_attempted=len(documents),
262 suggestion="Check data source connectivity, document formats, and system resources",
263 exc_info=True,
264 )
265 self.monitor.end_operation("ingestion_process", error=str(e))
266 raise
268 async def cleanup(self):
269 """Clean up resources."""
270 if self._cleanup_performed:
271 return
273 logger.info("Cleaning up pipeline resources")
274 self._cleanup_performed = True
276 try:
277 # Save metrics
278 if hasattr(self, "monitor"):
279 self.monitor.save_metrics()
281 # Stop metrics server
282 try:
283 prometheus_metrics.stop_metrics_server()
284 except Exception as e:
285 logger.warning(f"Error stopping metrics server: {e}")
287 # Use resource manager for cleanup
288 if hasattr(self, "resource_manager"):
289 await self.resource_manager.cleanup()
291 logger.info("Pipeline cleanup completed")
292 except Exception as e:
293 logger.error(f"Error during pipeline cleanup: {e}")
295 def __del__(self):
296 """Destructor to ensure cleanup."""
297 try:
298 # Can't await in __del__, so use the sync cleanup method
299 self._sync_cleanup()
300 except Exception as e:
301 logger.error(f"Error in destructor cleanup: {e}")
303 def _sync_cleanup(self):
304 """Synchronous cleanup for destructor and signal handlers."""
305 if self._cleanup_performed:
306 return
308 logger.info("Cleaning up pipeline resources (sync)")
309 self._cleanup_performed = True
311 # Save metrics
312 try:
313 if hasattr(self, "monitor"):
314 self.monitor.save_metrics()
315 except Exception as e:
316 logger.error(f"Error saving metrics: {e}")
318 # Stop metrics server
319 try:
320 prometheus_metrics.stop_metrics_server()
321 except Exception as e:
322 logger.error(f"Error stopping metrics server: {e}")
324 # Use resource manager sync cleanup
325 try:
326 if hasattr(self, "resource_manager"):
327 self.resource_manager._cleanup()
328 except Exception as e:
329 logger.error(f"Error in resource manager cleanup: {e}")
331 logger.info("Pipeline cleanup completed (sync)")