Coverage for src / qdrant_loader / core / async_ingestion_pipeline.py: 91%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Refactored async ingestion pipeline using the new modular architecture."""
3from pathlib import Path
5from qdrant_loader.config import Settings, SourcesConfig
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.monitoring import prometheus_metrics
8from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
9from qdrant_loader.core.project_manager import ProjectManager
10from qdrant_loader.core.qdrant_manager import QdrantManager
11from qdrant_loader.core.state.state_manager import StateManager
12from qdrant_loader.utils.logging import LoggingConfig
13from qdrant_loader.utils.sensitive import sanitize_exception_message
15from .pipeline import (
16 PipelineComponentsFactory,
17 PipelineConfig,
18 PipelineOrchestrator,
19 ResourceManager,
20)
22logger = LoggingConfig.get_logger(__name__)
25class AsyncIngestionPipeline:
26 """Async ingestion pipeline using modular architecture.
28 This class provides a streamlined interface for the modular pipeline
29 architecture, handling document ingestion and processing workflows.
30 """
32 def __init__(
33 self,
34 settings: Settings,
35 qdrant_manager: QdrantManager,
36 state_manager: StateManager | None = None,
37 max_chunk_workers: int = 10,
38 max_embed_workers: int = 4,
39 max_upsert_workers: int = 4,
40 queue_size: int = 1000,
41 upsert_batch_size: int | None = None,
42 enable_metrics: bool = False,
43 metrics_dir: Path | None = None, # New parameter for workspace support
44 ):
45 """Initialize the async ingestion pipeline.
47 Args:
48 settings: Application settings
49 qdrant_manager: QdrantManager instance
50 state_manager: Optional state manager
52 max_chunk_workers: Maximum number of chunking workers
53 max_embed_workers: Maximum number of embedding workers
54 max_upsert_workers: Maximum number of upsert workers
55 queue_size: Queue size for workers
56 upsert_batch_size: Batch size for upserts
57 enable_metrics: Whether to enable metrics server
58 metrics_dir: Custom metrics directory (for workspace support)
59 """
60 self.settings = settings
61 self.qdrant_manager = qdrant_manager
63 # Validate that global configuration is available for pipeline operation.
64 if not settings.global_config:
65 raise ValueError(
66 "Global configuration not available. Please check your configuration file."
67 )
69 # Create pipeline configuration with worker and batch size settings.
70 self.pipeline_config = PipelineConfig(
71 max_chunk_workers=max_chunk_workers,
72 max_embed_workers=max_embed_workers,
73 max_upsert_workers=max_upsert_workers,
74 queue_size=queue_size,
75 upsert_batch_size=upsert_batch_size,
76 enable_metrics=enable_metrics,
77 )
79 # Create resource manager to handle cleanup and signal handling.
80 self.resource_manager = ResourceManager()
81 self.resource_manager.register_signal_handlers()
83 # Create state manager instance if not provided by caller.
84 self.state_manager = state_manager or StateManager(
85 settings.global_config.state_management
86 )
88 # Initialize project manager to support multi-project configurations.
89 if not settings.global_config.qdrant:
90 raise ValueError(
91 "Qdrant configuration is required for project manager initialization"
92 )
94 self.project_manager = ProjectManager(
95 projects_config=settings.projects_config,
96 global_collection_name=settings.global_config.qdrant.collection_name,
97 )
99 # Create pipeline components using factory
100 factory = PipelineComponentsFactory()
101 self.components = factory.create_components(
102 settings=settings,
103 config=self.pipeline_config,
104 qdrant_manager=qdrant_manager,
105 state_manager=self.state_manager,
106 resource_manager=self.resource_manager,
107 )
109 # Create orchestrator with project manager support
110 self.orchestrator = PipelineOrchestrator(
111 settings, self.components, self.project_manager
112 )
114 # Initialize performance monitor with custom or default metrics directory
115 if metrics_dir:
116 # Use provided metrics directory (workspace mode)
117 # Accept both Path and str inputs
118 final_metrics_dir = (
119 metrics_dir if isinstance(metrics_dir, Path) else Path(metrics_dir)
120 )
121 else:
122 # Use default metrics directory
123 final_metrics_dir = Path.cwd() / "metrics"
125 final_metrics_dir.mkdir(parents=True, exist_ok=True)
126 logger.info(f"Initializing metrics directory at {final_metrics_dir}")
127 self.monitor = IngestionMonitor(str(final_metrics_dir.absolute()))
129 # Start metrics server if enabled
130 if enable_metrics:
131 prometheus_metrics.start_metrics_server()
133 logger.info("AsyncIngestionPipeline initialized with new modular architecture")
135 # Track cleanup state to prevent duplicate cleanup
136 self._cleanup_performed = False
138 async def initialize(self):
139 """Initialize the pipeline."""
140 logger.debug("Starting pipeline initialization")
142 try:
143 # Initialize state manager first
144 if not self.state_manager.is_initialized:
145 logger.debug("Initializing state manager")
146 await self.state_manager.initialize()
148 # Initialize project manager
149 if not self.project_manager._initialized:
150 logger.debug("Initializing project manager")
151 # Prefer direct use of session factory to match existing tests/mocks
152 session_factory = getattr(self.state_manager, "_session_factory", None)
153 if session_factory is None:
154 logger.error(
155 "State manager session factory is not available during initialization",
156 suggestion="Check database configuration and ensure proper state manager setup",
157 )
158 raise RuntimeError("State manager session factory is not available")
160 try:
161 async with session_factory() as session: # type: ignore
162 await self.project_manager.initialize(session)
163 logger.debug("Project manager initialization completed")
165 except Exception as e:
166 logger.error(
167 "Failed to initialize project manager during pipeline startup",
168 error=sanitize_exception_message(e),
169 error_type=type(e).__name__,
170 suggestion="Check database connectivity and project configuration",
171 )
172 raise
173 except Exception as e:
174 logger.error(
175 "Pipeline initialization failed during startup sequence",
176 error=sanitize_exception_message(e),
177 error_type=type(e).__name__,
178 suggestion="Check configuration, database connectivity, and system resources",
179 )
180 raise
182 async def process_documents(
183 self,
184 sources_config: SourcesConfig | None = None,
185 source_type: str | None = None,
186 source: str | None = None,
187 project_id: str | None = None,
188 force: bool = False,
189 ) -> list[Document]:
190 """Process documents from all configured sources.
192 Args:
193 sources_config: Sources configuration to use (deprecated, use project_id instead)
194 source_type: Filter by source type
195 source: Filter by specific source name
196 project_id: Process documents for a specific project
197 force: Force processing of all documents, bypassing change detection
199 Returns:
200 List of processed documents
201 """
202 # Ensure the pipeline is initialized
203 await self.initialize()
205 # Reset metrics for new run
206 self.monitor.clear_metrics()
207 self.monitor.start_operation(
208 "ingestion_process",
209 metadata={
210 "source_type": source_type,
211 "source": source,
212 "project_id": project_id,
213 "force": force,
214 },
215 )
217 documents = [] # Initialize to avoid UnboundLocalError in exception handler
218 try:
219 logger.debug("Starting document processing with new pipeline architecture")
221 # Use the orchestrator to process documents with project support
222 documents = await self.orchestrator.process_documents(
223 sources_config=sources_config,
224 source_type=source_type,
225 source=source,
226 project_id=project_id,
227 force=force,
228 )
230 # Update metrics
231 if documents:
232 pipeline_result = getattr(
233 self.orchestrator, "last_pipeline_result", None
234 )
235 total_chunks = getattr(pipeline_result, "success_count", 0)
237 def _safe_document_size(doc: Document) -> int:
238 try:
239 return int(doc.metadata.get("size", 0))
240 except (TypeError, ValueError):
241 return 0
243 total_size_bytes = sum(_safe_document_size(doc) for doc in documents)
245 self.monitor.start_batch(
246 "document_batch",
247 batch_size=len(documents),
248 metadata={
249 "source_type": source_type,
250 "source": source,
251 "project_id": project_id,
252 "force": force,
253 },
254 )
255 # Note: Success/error counts are handled internally by the new architecture
256 self.monitor.end_batch(
257 "document_batch",
258 len(documents),
259 0,
260 [],
261 total_chunks=total_chunks,
262 total_size_bytes=total_size_bytes,
263 )
265 self.monitor.end_operation("ingestion_process")
267 logger.debug(
268 f"Document processing completed. Processed {len(documents)} documents"
269 )
270 return documents
272 except Exception as e:
273 safe_error = sanitize_exception_message(e)
274 logger.error(
275 "Document processing pipeline failed during ingestion",
276 error=safe_error,
277 error_type=type(e).__name__,
278 documents_attempted=len(documents),
279 suggestion="Check data source connectivity, document formats, and system resources",
280 )
281 self.monitor.end_operation(
282 "ingestion_process", success=False, error=safe_error
283 )
284 raise
286 async def cleanup(self):
287 """Clean up resources."""
288 if self._cleanup_performed:
289 return
291 logger.info("Cleaning up pipeline resources")
292 self._cleanup_performed = True
294 try:
295 # Save metrics
296 if hasattr(self, "monitor"):
297 self.monitor.save_metrics()
299 # Stop metrics server
300 try:
301 prometheus_metrics.stop_metrics_server()
302 except Exception as e:
303 logger.warning(
304 f"Error stopping metrics server: {sanitize_exception_message(e)}"
305 )
307 # Use resource manager for cleanup
308 if hasattr(self, "resource_manager"):
309 await self.resource_manager.cleanup()
311 logger.info("Pipeline cleanup completed")
312 except Exception as e:
313 logger.error(
314 f"Error during pipeline cleanup: {sanitize_exception_message(e)}"
315 )
317 def __del__(self):
318 """Destructor to ensure cleanup."""
319 try:
320 # Can't await in __del__, so use the sync cleanup method
321 self._sync_cleanup()
322 except Exception as e:
323 logger.error(
324 f"Error in destructor cleanup: {sanitize_exception_message(e)}"
325 )
327 def _sync_cleanup(self):
328 """Synchronous cleanup for destructor and signal handlers."""
329 if self._cleanup_performed:
330 return
332 logger.info("Cleaning up pipeline resources (sync)")
333 self._cleanup_performed = True
335 # Save metrics
336 try:
337 if hasattr(self, "monitor"):
338 self.monitor.save_metrics()
339 except Exception as e:
340 logger.error(f"Error saving metrics: {sanitize_exception_message(e)}")
342 # Stop metrics server
343 try:
344 prometheus_metrics.stop_metrics_server()
345 except Exception as e:
346 logger.error(
347 f"Error stopping metrics server: {sanitize_exception_message(e)}"
348 )
350 # Use resource manager sync cleanup
351 try:
352 if hasattr(self, "resource_manager"):
353 self.resource_manager._cleanup()
354 except Exception as e:
355 logger.error(
356 f"Error in resource manager cleanup: {sanitize_exception_message(e)}"
357 )
359 logger.info("Pipeline cleanup completed (sync)")