Coverage for src / qdrant_loader / core / pipeline / orchestrator.py: 91%
161 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Main orchestrator for the ingestion pipeline."""
3import traceback
5from qdrant_loader.config import Settings, SourcesConfig
6from qdrant_loader.connectors.base import ConnectorConfigurationError
7from qdrant_loader.connectors.factory import get_connector_instance
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.project_manager import ProjectManager
10from qdrant_loader.core.state.state_change_detector import StateChangeDetector
11from qdrant_loader.core.state.state_manager import StateManager
12from qdrant_loader.utils.logging import LoggingConfig
13from qdrant_loader.utils.sensitive import sanitize_exception_message
15from .document_pipeline import DocumentPipeline
16from .source_filter import SourceFilter
17from .source_processor import SourceProcessor
18from .workers.upsert_worker import PipelineResult
20logger = LoggingConfig.get_logger(__name__)
23class PipelineComponents:
24 """Container for pipeline components."""
26 def __init__(
27 self,
28 document_pipeline: DocumentPipeline,
29 source_processor: SourceProcessor,
30 source_filter: SourceFilter,
31 state_manager: StateManager,
32 ):
33 self.document_pipeline = document_pipeline
34 self.source_processor = source_processor
35 self.source_filter = source_filter
36 self.state_manager = state_manager
39class PipelineOrchestrator:
40 """Main orchestrator for the ingestion pipeline."""
42 def __init__(
43 self,
44 settings: Settings,
45 components: PipelineComponents,
46 project_manager: ProjectManager | None = None,
47 ):
48 self.settings = settings
49 self.components = components
50 self.project_manager = project_manager
51 self.last_pipeline_result = None
53 async def process_documents(
54 self,
55 sources_config: SourcesConfig | None = None,
56 source_type: str | None = None,
57 source: str | None = None,
58 project_id: str | None = None,
59 force: bool = False,
60 ) -> list[Document]:
61 """Main entry point for document processing.
63 Args:
64 sources_config: Sources configuration to use (for backward compatibility)
65 source_type: Filter by source type
66 source: Filter by specific source name
67 project_id: Process documents for a specific project
68 force: Force processing of all documents, bypassing change detection
70 Returns:
71 List of processed documents
72 """
73 logger.info("🚀 Starting document ingestion")
74 self.last_pipeline_result = None
76 try:
77 # Determine sources configuration to use
78 if sources_config:
79 # Use provided sources config (backward compatibility)
80 logger.debug("Using provided sources configuration")
81 filtered_config = self.components.source_filter.filter_sources(
82 sources_config, source_type, source
83 )
84 current_project_id = None
85 elif project_id:
86 # Use project-specific sources configuration
87 if not self.project_manager:
88 raise ValueError(
89 "Project manager not available for project-specific processing"
90 )
92 project_context = self.project_manager.get_project_context(project_id)
93 if (
94 not project_context
95 or not project_context.config
96 or not project_context.config.sources
97 ):
98 raise ValueError(
99 f"Project '{project_id}' not found or has no configuration"
100 )
102 logger.debug(f"Using project configuration for project: {project_id}")
103 project_sources_config = project_context.config.sources
104 filtered_config = self.components.source_filter.filter_sources(
105 project_sources_config, source_type, source
106 )
107 current_project_id = project_id
108 else:
109 # Process all projects
110 if not self.project_manager:
111 raise ValueError(
112 "Project manager not available and no sources configuration provided"
113 )
115 logger.debug("Processing all projects")
116 return await self._process_all_projects(source_type, source, force)
118 # Check if filtered config is empty
119 if source_type and not any(
120 [
121 filtered_config.git,
122 filtered_config.confluence,
123 filtered_config.jira,
124 filtered_config.publicdocs,
125 filtered_config.localfile,
126 ]
127 ):
128 raise ValueError(f"No sources found for type '{source_type}'")
130 # Collect documents from all sources
131 documents = await self._collect_documents_from_sources(
132 filtered_config, current_project_id
133 )
135 if not documents:
136 logger.info("✅ No documents found from sources")
137 return []
139 # Detect changes in documents (bypass if force=True)
140 if force:
141 logger.warning(
142 f"🔄 Force mode enabled: bypassing change detection, processing all {len(documents)} documents"
143 )
144 else:
145 documents = await self._detect_document_changes(
146 documents, filtered_config, current_project_id
147 )
149 if not documents:
150 logger.info("✅ No new or updated documents to process")
151 return []
153 # Process documents through the pipeline
154 result = await self.components.document_pipeline.process_documents(
155 documents
156 )
157 self.last_pipeline_result = result
159 # Update document states for successfully processed documents
160 await self._update_document_states(
161 documents, result.successfully_processed_documents, current_project_id
162 )
164 logger.info(
165 f"✅ Ingestion completed: {result.success_count} chunks processed successfully"
166 )
167 return documents
169 except Exception as e:
170 logger.error(
171 f"❌ Pipeline orchestration failed: {sanitize_exception_message(e)}",
172 error_type=type(e).__name__,
173 sanitized_traceback=sanitize_exception_message(traceback.format_exc()),
174 )
175 raise
177 async def _process_all_projects(
178 self,
179 source_type: str | None = None,
180 source: str | None = None,
181 force: bool = False,
182 ) -> list[Document]:
183 """Process documents from all configured projects."""
184 if not self.project_manager:
185 raise ValueError("Project manager not available")
187 all_documents = []
188 aggregated_result = PipelineResult()
189 failed_projects: list[str] = []
190 project_ids = self.project_manager.list_project_ids()
192 logger.info(f"Processing {len(project_ids)} projects")
194 for project_id in project_ids:
195 try:
196 logger.debug(f"Processing project: {project_id}")
197 project_documents = await self.process_documents(
198 project_id=project_id,
199 source_type=source_type,
200 source=source,
201 force=force,
202 )
203 project_result = self.last_pipeline_result
204 all_documents.extend(project_documents)
206 if project_result is not None:
207 aggregated_result.success_count += project_result.success_count
208 aggregated_result.error_count += project_result.error_count
209 aggregated_result.successfully_processed_documents.update(
210 project_result.successfully_processed_documents
211 )
212 aggregated_result.failed_document_ids.update(
213 project_result.failed_document_ids
214 )
215 aggregated_result.errors.extend(project_result.errors)
217 logger.debug(
218 f"Processed {len(project_documents)} documents from project: {project_id}"
219 )
220 except ConnectorConfigurationError as e:
221 logger.error(
222 f"Configuration error in project {project_id}: "
223 f"{sanitize_exception_message(e)}. "
224 "Skipping this project — check connector settings.",
225 error_type=type(e).__name__,
226 sanitized_traceback=sanitize_exception_message(
227 traceback.format_exc()
228 ),
229 )
230 aggregated_result.errors.append(
231 f"Configuration error in project {project_id}: "
232 f"{sanitize_exception_message(e)}"
233 )
234 failed_projects.append(project_id)
235 continue
236 except Exception as e:
237 safe_error = sanitize_exception_message(e)
238 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
239 aggregated_result.error_count += 1
240 aggregated_result.errors.append(
241 "project_id="
242 f"{project_id}; "
243 "error_type="
244 f"{type(e).__name__}; "
245 "message="
246 f"{safe_error}; "
247 "traceback="
248 f"{sanitized_traceback}"
249 )
250 logger.error(
251 f"Failed to process project {project_id}: {safe_error}",
252 error_type=type(e).__name__,
253 sanitized_traceback=sanitized_traceback,
254 )
255 failed_projects.append(project_id)
256 # Continue processing other projects
257 continue
259 self.last_pipeline_result = aggregated_result
261 total_count = len(project_ids)
262 failed_count = len(failed_projects)
263 success_count = total_count - failed_count
264 if failed_count > 0:
265 logger.warning(
266 f"Completed processing projects: {success_count}/{total_count} succeeded, "
267 f"{failed_count} failed. Check errors above for details.",
268 total_projects=total_count,
269 successful_projects=success_count,
270 failed_projects=failed_count,
271 )
272 else:
273 logger.info(
274 f"Completed processing all projects: {len(all_documents)} total documents"
275 )
276 return all_documents
278 async def _collect_documents_from_sources(
279 self, filtered_config: SourcesConfig, project_id: str | None = None
280 ) -> list[Document]:
281 """Collect documents from all configured sources."""
282 documents = []
284 # Process each source type with project context
285 if filtered_config.confluence:
286 confluence_docs = (
287 await self.components.source_processor.process_source_type(
288 filtered_config.confluence, get_connector_instance, "Confluence"
289 )
290 )
291 documents.extend(confluence_docs)
293 if filtered_config.git:
294 git_docs = await self.components.source_processor.process_source_type(
295 filtered_config.git, get_connector_instance, "Git"
296 )
297 documents.extend(git_docs)
299 if filtered_config.jira:
300 jira_docs = await self.components.source_processor.process_source_type(
301 filtered_config.jira, get_connector_instance, "Jira"
302 )
303 documents.extend(jira_docs)
305 if filtered_config.publicdocs:
306 publicdocs_docs = (
307 await self.components.source_processor.process_source_type(
308 filtered_config.publicdocs, get_connector_instance, "PublicDocs"
309 )
310 )
311 documents.extend(publicdocs_docs)
313 if filtered_config.localfile:
314 localfile_docs = await self.components.source_processor.process_source_type(
315 filtered_config.localfile, get_connector_instance, "LocalFile"
316 )
317 documents.extend(localfile_docs)
319 # Inject project metadata into documents if project context is available
320 if project_id and self.project_manager:
321 for document in documents:
322 enhanced_metadata = self.project_manager.inject_project_metadata(
323 project_id, document.metadata
324 )
325 document.metadata = enhanced_metadata
327 logger.info(f"📄 Collected {len(documents)} documents from all sources")
328 return documents
330 async def _detect_document_changes(
331 self,
332 documents: list[Document],
333 filtered_config: SourcesConfig,
334 project_id: str | None = None,
335 ) -> list[Document]:
336 """Detect changes in documents and return only new/updated ones."""
337 if not documents:
338 return []
340 logger.debug(f"Starting change detection for {len(documents)} documents")
342 try:
343 # Ensure state manager is initialized before use
344 if not self.components.state_manager._initialized:
345 logger.debug("Initializing state manager for change detection")
346 await self.components.state_manager.initialize()
348 async with StateChangeDetector(
349 self.components.state_manager
350 ) as change_detector:
351 changes = await change_detector.detect_changes(
352 documents, filtered_config
353 )
355 logger.info(
356 f"🔍 Change detection: {len(changes['new'])} new, "
357 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted"
358 )
360 # Return new and updated documents
361 return changes["new"] + changes["updated"]
363 except Exception as e:
364 logger.error(
365 f"Error during change detection: {sanitize_exception_message(e)}",
366 error_type=type(e).__name__,
367 )
368 raise
370 async def _update_document_states(
371 self,
372 documents: list[Document],
373 successfully_processed_doc_ids: set,
374 project_id: str | None = None,
375 ):
376 """Update document states for successfully processed documents."""
377 successfully_processed_docs = [
378 doc for doc in documents if doc.id in successfully_processed_doc_ids
379 ]
381 logger.debug(
382 f"Updating document states for {len(successfully_processed_docs)} documents"
383 )
385 # Ensure state manager is initialized before use
386 if not self.components.state_manager._initialized:
387 logger.debug("Initializing state manager for document state updates")
388 await self.components.state_manager.initialize()
390 for doc in successfully_processed_docs:
391 try:
392 await self.components.state_manager.update_document_state(
393 doc, project_id
394 )
395 logger.debug(f"Updated document state for {doc.id}")
396 except Exception as e:
397 logger.error(
398 f"Failed to update document state for {doc.id}: {sanitize_exception_message(e)}",
399 error_type=type(e).__name__,
400 )