Coverage for src/qdrant_loader/core/pipeline/orchestrator.py: 78%
133 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Main orchestrator for the ingestion pipeline."""
3from qdrant_loader.config import Settings, SourcesConfig
4from qdrant_loader.connectors.confluence import ConfluenceConnector
5from qdrant_loader.connectors.git import GitConnector
6from qdrant_loader.connectors.jira import JiraConnector
7from qdrant_loader.connectors.localfile import LocalFileConnector
8from qdrant_loader.connectors.publicdocs import PublicDocsConnector
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.project_manager import ProjectManager
11from qdrant_loader.core.state.state_change_detector import StateChangeDetector
12from qdrant_loader.core.state.state_manager import StateManager
13from qdrant_loader.utils.logging import LoggingConfig
15from .document_pipeline import DocumentPipeline
16from .source_filter import SourceFilter
17from .source_processor import SourceProcessor
19logger = LoggingConfig.get_logger(__name__)
22class PipelineComponents:
23 """Container for pipeline components."""
25 def __init__(
26 self,
27 document_pipeline: DocumentPipeline,
28 source_processor: SourceProcessor,
29 source_filter: SourceFilter,
30 state_manager: StateManager,
31 ):
32 self.document_pipeline = document_pipeline
33 self.source_processor = source_processor
34 self.source_filter = source_filter
35 self.state_manager = state_manager
38class PipelineOrchestrator:
39 """Main orchestrator for the ingestion pipeline."""
41 def __init__(
42 self,
43 settings: Settings,
44 components: PipelineComponents,
45 project_manager: ProjectManager | None = None,
46 ):
47 self.settings = settings
48 self.components = components
49 self.project_manager = project_manager
51 async def process_documents(
52 self,
53 sources_config: SourcesConfig | None = None,
54 source_type: str | None = None,
55 source: str | None = None,
56 project_id: str | None = None,
57 force: bool = False,
58 ) -> list[Document]:
59 """Main entry point for document processing.
61 Args:
62 sources_config: Sources configuration to use (for backward compatibility)
63 source_type: Filter by source type
64 source: Filter by specific source name
65 project_id: Process documents for a specific project
66 force: Force processing of all documents, bypassing change detection
68 Returns:
69 List of processed documents
70 """
71 logger.info("🚀 Starting document ingestion")
73 try:
74 # Determine sources configuration to use
75 if sources_config:
76 # Use provided sources config (backward compatibility)
77 logger.debug("Using provided sources configuration")
78 filtered_config = self.components.source_filter.filter_sources(
79 sources_config, source_type, source
80 )
81 current_project_id = None
82 elif project_id:
83 # Use project-specific sources configuration
84 if not self.project_manager:
85 raise ValueError(
86 "Project manager not available for project-specific processing"
87 )
89 project_context = self.project_manager.get_project_context(project_id)
90 if (
91 not project_context
92 or not project_context.config
93 or not project_context.config.sources
94 ):
95 raise ValueError(
96 f"Project '{project_id}' not found or has no configuration"
97 )
99 logger.debug(f"Using project configuration for project: {project_id}")
100 project_sources_config = project_context.config.sources
101 filtered_config = self.components.source_filter.filter_sources(
102 project_sources_config, source_type, source
103 )
104 current_project_id = project_id
105 else:
106 # Process all projects
107 if not self.project_manager:
108 raise ValueError(
109 "Project manager not available and no sources configuration provided"
110 )
112 logger.debug("Processing all projects")
113 return await self._process_all_projects(source_type, source, force)
115 # Check if filtered config is empty
116 if source_type and not any(
117 [
118 filtered_config.git,
119 filtered_config.confluence,
120 filtered_config.jira,
121 filtered_config.publicdocs,
122 filtered_config.localfile,
123 ]
124 ):
125 raise ValueError(f"No sources found for type '{source_type}'")
127 # Collect documents from all sources
128 documents = await self._collect_documents_from_sources(
129 filtered_config, current_project_id
130 )
132 if not documents:
133 logger.info("✅ No documents found from sources")
134 return []
136 # Detect changes in documents (bypass if force=True)
137 if force:
138 logger.warning(
139 f"🔄 Force mode enabled: bypassing change detection, processing all {len(documents)} documents"
140 )
141 else:
142 documents = await self._detect_document_changes(
143 documents, filtered_config, current_project_id
144 )
146 if not documents:
147 logger.info("✅ No new or updated documents to process")
148 return []
150 # Process documents through the pipeline
151 result = await self.components.document_pipeline.process_documents(
152 documents
153 )
155 # Update document states for successfully processed documents
156 await self._update_document_states(
157 documents, result.successfully_processed_documents, current_project_id
158 )
160 logger.info(
161 f"✅ Ingestion completed: {result.success_count} chunks processed successfully"
162 )
163 return documents
165 except Exception as e:
166 logger.error(f"❌ Pipeline orchestration failed: {e}", exc_info=True)
167 raise
169 async def _process_all_projects(
170 self,
171 source_type: str | None = None,
172 source: str | None = None,
173 force: bool = False,
174 ) -> list[Document]:
175 """Process documents from all configured projects."""
176 if not self.project_manager:
177 raise ValueError("Project manager not available")
179 all_documents = []
180 project_ids = self.project_manager.list_project_ids()
182 logger.info(f"Processing {len(project_ids)} projects")
184 for project_id in project_ids:
185 try:
186 logger.debug(f"Processing project: {project_id}")
187 project_documents = await self.process_documents(
188 project_id=project_id,
189 source_type=source_type,
190 source=source,
191 force=force,
192 )
193 all_documents.extend(project_documents)
194 logger.debug(
195 f"Processed {len(project_documents)} documents from project: {project_id}"
196 )
197 except Exception as e:
198 logger.error(
199 f"Failed to process project {project_id}: {e}", exc_info=True
200 )
201 # Continue processing other projects
202 continue
204 logger.info(
205 f"Completed processing all projects: {len(all_documents)} total documents"
206 )
207 return all_documents
209 async def _collect_documents_from_sources(
210 self, filtered_config: SourcesConfig, project_id: str | None = None
211 ) -> list[Document]:
212 """Collect documents from all configured sources."""
213 documents = []
215 # Process each source type with project context
216 if filtered_config.confluence:
217 confluence_docs = (
218 await self.components.source_processor.process_source_type(
219 filtered_config.confluence, ConfluenceConnector, "Confluence"
220 )
221 )
222 documents.extend(confluence_docs)
224 if filtered_config.git:
225 git_docs = await self.components.source_processor.process_source_type(
226 filtered_config.git, GitConnector, "Git"
227 )
228 documents.extend(git_docs)
230 if filtered_config.jira:
231 jira_docs = await self.components.source_processor.process_source_type(
232 filtered_config.jira, JiraConnector, "Jira"
233 )
234 documents.extend(jira_docs)
236 if filtered_config.publicdocs:
237 publicdocs_docs = (
238 await self.components.source_processor.process_source_type(
239 filtered_config.publicdocs, PublicDocsConnector, "PublicDocs"
240 )
241 )
242 documents.extend(publicdocs_docs)
244 if filtered_config.localfile:
245 localfile_docs = await self.components.source_processor.process_source_type(
246 filtered_config.localfile, LocalFileConnector, "LocalFile"
247 )
248 documents.extend(localfile_docs)
250 # Inject project metadata into documents if project context is available
251 if project_id and self.project_manager:
252 for document in documents:
253 enhanced_metadata = self.project_manager.inject_project_metadata(
254 project_id, document.metadata
255 )
256 document.metadata = enhanced_metadata
258 logger.info(f"📄 Collected {len(documents)} documents from all sources")
259 return documents
261 async def _detect_document_changes(
262 self,
263 documents: list[Document],
264 filtered_config: SourcesConfig,
265 project_id: str | None = None,
266 ) -> list[Document]:
267 """Detect changes in documents and return only new/updated ones."""
268 if not documents:
269 return []
271 logger.debug(f"Starting change detection for {len(documents)} documents")
273 try:
274 # Ensure state manager is initialized before use
275 if not self.components.state_manager._initialized:
276 logger.debug("Initializing state manager for change detection")
277 await self.components.state_manager.initialize()
279 async with StateChangeDetector(
280 self.components.state_manager
281 ) as change_detector:
282 changes = await change_detector.detect_changes(
283 documents, filtered_config
284 )
286 logger.info(
287 f"🔍 Change detection: {len(changes['new'])} new, "
288 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted"
289 )
291 # Return new and updated documents
292 return changes["new"] + changes["updated"]
294 except Exception as e:
295 logger.error(f"Error during change detection: {e}", exc_info=True)
296 raise
298 async def _update_document_states(
299 self,
300 documents: list[Document],
301 successfully_processed_doc_ids: set,
302 project_id: str | None = None,
303 ):
304 """Update document states for successfully processed documents."""
305 successfully_processed_docs = [
306 doc for doc in documents if doc.id in successfully_processed_doc_ids
307 ]
309 logger.debug(
310 f"Updating document states for {len(successfully_processed_docs)} documents"
311 )
313 # Ensure state manager is initialized before use
314 if not self.components.state_manager._initialized:
315 logger.debug("Initializing state manager for document state updates")
316 await self.components.state_manager.initialize()
318 for doc in successfully_processed_docs:
319 try:
320 await self.components.state_manager.update_document_state(
321 doc, project_id
322 )
323 logger.debug(f"Updated document state for {doc.id}")
324 except Exception as e:
325 logger.error(f"Failed to update document state for {doc.id}: {e}")