Coverage for src/qdrant_loader/core/pipeline/orchestrator.py: 79%
131 statements
ยซ prev ^ index ยป next coverage.py v7.9.1, created at 2025-06-18 09:27 +0000
ยซ prev ^ index ยป next coverage.py v7.9.1, created at 2025-06-18 09:27 +0000
1"""Main orchestrator for the ingestion pipeline."""
3from qdrant_loader.config import Settings, SourcesConfig
4from qdrant_loader.connectors.confluence import ConfluenceConnector
5from qdrant_loader.connectors.git import GitConnector
6from qdrant_loader.connectors.jira import JiraConnector
7from qdrant_loader.connectors.localfile import LocalFileConnector
8from qdrant_loader.connectors.publicdocs import PublicDocsConnector
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.project_manager import ProjectManager
11from qdrant_loader.core.state.state_change_detector import StateChangeDetector
12from qdrant_loader.core.state.state_manager import StateManager
13from qdrant_loader.utils.logging import LoggingConfig
15from .document_pipeline import DocumentPipeline
16from .source_filter import SourceFilter
17from .source_processor import SourceProcessor
19logger = LoggingConfig.get_logger(__name__)
22class PipelineComponents:
23 """Container for pipeline components."""
25 def __init__(
26 self,
27 document_pipeline: DocumentPipeline,
28 source_processor: SourceProcessor,
29 source_filter: SourceFilter,
30 state_manager: StateManager,
31 ):
32 self.document_pipeline = document_pipeline
33 self.source_processor = source_processor
34 self.source_filter = source_filter
35 self.state_manager = state_manager
38class PipelineOrchestrator:
39 """Main orchestrator for the ingestion pipeline."""
41 def __init__(
42 self,
43 settings: Settings,
44 components: PipelineComponents,
45 project_manager: ProjectManager | None = None,
46 ):
47 self.settings = settings
48 self.components = components
49 self.project_manager = project_manager
51 async def process_documents(
52 self,
53 sources_config: SourcesConfig | None = None,
54 source_type: str | None = None,
55 source: str | None = None,
56 project_id: str | None = None,
57 ) -> list[Document]:
58 """Main entry point for document processing.
60 Args:
61 sources_config: Sources configuration to use (for backward compatibility)
62 source_type: Filter by source type
63 source: Filter by specific source name
64 project_id: Process documents for a specific project
66 Returns:
67 List of processed documents
68 """
69 logger.info("๐ Starting document ingestion")
71 try:
72 # Determine sources configuration to use
73 if sources_config:
74 # Use provided sources config (backward compatibility)
75 logger.debug("Using provided sources configuration")
76 filtered_config = self.components.source_filter.filter_sources(
77 sources_config, source_type, source
78 )
79 current_project_id = None
80 elif project_id:
81 # Use project-specific sources configuration
82 if not self.project_manager:
83 raise ValueError(
84 "Project manager not available for project-specific processing"
85 )
87 project_context = self.project_manager.get_project_context(project_id)
88 if (
89 not project_context
90 or not project_context.config
91 or not project_context.config.sources
92 ):
93 raise ValueError(
94 f"Project '{project_id}' not found or has no configuration"
95 )
97 logger.debug(f"Using project configuration for project: {project_id}")
98 project_sources_config = project_context.config.sources
99 filtered_config = self.components.source_filter.filter_sources(
100 project_sources_config, source_type, source
101 )
102 current_project_id = project_id
103 else:
104 # Process all projects
105 if not self.project_manager:
106 raise ValueError(
107 "Project manager not available and no sources configuration provided"
108 )
110 logger.debug("Processing all projects")
111 return await self._process_all_projects(source_type, source)
113 # Check if filtered config is empty
114 if source_type and not any(
115 [
116 filtered_config.git,
117 filtered_config.confluence,
118 filtered_config.jira,
119 filtered_config.publicdocs,
120 filtered_config.localfile,
121 ]
122 ):
123 raise ValueError(f"No sources found for type '{source_type}'")
125 # Collect documents from all sources
126 documents = await self._collect_documents_from_sources(
127 filtered_config, current_project_id
128 )
130 if not documents:
131 logger.info("โ No documents found from sources")
132 return []
134 # Detect changes in documents
135 documents = await self._detect_document_changes(
136 documents, filtered_config, current_project_id
137 )
139 if not documents:
140 logger.info("โ No new or updated documents to process")
141 return []
143 # Process documents through the pipeline
144 result = await self.components.document_pipeline.process_documents(
145 documents
146 )
148 # Update document states for successfully processed documents
149 await self._update_document_states(
150 documents, result.successfully_processed_documents, current_project_id
151 )
153 logger.info(
154 f"โ Ingestion completed: {result.success_count} chunks processed successfully"
155 )
156 return documents
158 except Exception as e:
159 logger.error(f"โ Pipeline orchestration failed: {e}", exc_info=True)
160 raise
162 async def _process_all_projects(
163 self, source_type: str | None = None, source: str | None = None
164 ) -> list[Document]:
165 """Process documents from all configured projects."""
166 if not self.project_manager:
167 raise ValueError("Project manager not available")
169 all_documents = []
170 project_ids = self.project_manager.list_project_ids()
172 logger.info(f"Processing {len(project_ids)} projects")
174 for project_id in project_ids:
175 try:
176 logger.debug(f"Processing project: {project_id}")
177 project_documents = await self.process_documents(
178 project_id=project_id,
179 source_type=source_type,
180 source=source,
181 )
182 all_documents.extend(project_documents)
183 logger.debug(
184 f"Processed {len(project_documents)} documents from project: {project_id}"
185 )
186 except Exception as e:
187 logger.error(
188 f"Failed to process project {project_id}: {e}", exc_info=True
189 )
190 # Continue processing other projects
191 continue
193 logger.info(
194 f"Completed processing all projects: {len(all_documents)} total documents"
195 )
196 return all_documents
198 async def _collect_documents_from_sources(
199 self, filtered_config: SourcesConfig, project_id: str | None = None
200 ) -> list[Document]:
201 """Collect documents from all configured sources."""
202 documents = []
204 # Process each source type with project context
205 if filtered_config.confluence:
206 confluence_docs = (
207 await self.components.source_processor.process_source_type(
208 filtered_config.confluence, ConfluenceConnector, "Confluence"
209 )
210 )
211 documents.extend(confluence_docs)
213 if filtered_config.git:
214 git_docs = await self.components.source_processor.process_source_type(
215 filtered_config.git, GitConnector, "Git"
216 )
217 documents.extend(git_docs)
219 if filtered_config.jira:
220 jira_docs = await self.components.source_processor.process_source_type(
221 filtered_config.jira, JiraConnector, "Jira"
222 )
223 documents.extend(jira_docs)
225 if filtered_config.publicdocs:
226 publicdocs_docs = (
227 await self.components.source_processor.process_source_type(
228 filtered_config.publicdocs, PublicDocsConnector, "PublicDocs"
229 )
230 )
231 documents.extend(publicdocs_docs)
233 if filtered_config.localfile:
234 localfile_docs = await self.components.source_processor.process_source_type(
235 filtered_config.localfile, LocalFileConnector, "LocalFile"
236 )
237 documents.extend(localfile_docs)
239 # Inject project metadata into documents if project context is available
240 if project_id and self.project_manager:
241 for document in documents:
242 enhanced_metadata = self.project_manager.inject_project_metadata(
243 project_id, document.metadata
244 )
245 document.metadata = enhanced_metadata
247 logger.info(f"๐ Collected {len(documents)} documents from all sources")
248 return documents
250 async def _detect_document_changes(
251 self,
252 documents: list[Document],
253 filtered_config: SourcesConfig,
254 project_id: str | None = None,
255 ) -> list[Document]:
256 """Detect changes in documents and return only new/updated ones."""
257 if not documents:
258 return []
260 logger.debug(f"Starting change detection for {len(documents)} documents")
262 try:
263 # Ensure state manager is initialized before use
264 if not self.components.state_manager._initialized:
265 logger.debug("Initializing state manager for change detection")
266 await self.components.state_manager.initialize()
268 async with StateChangeDetector(
269 self.components.state_manager
270 ) as change_detector:
271 changes = await change_detector.detect_changes(
272 documents, filtered_config
273 )
275 logger.info(
276 f"๐ Change detection: {len(changes['new'])} new, "
277 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted"
278 )
280 # Return new and updated documents
281 return changes["new"] + changes["updated"]
283 except Exception as e:
284 logger.error(f"Error during change detection: {e}", exc_info=True)
285 raise
287 async def _update_document_states(
288 self,
289 documents: list[Document],
290 successfully_processed_doc_ids: set,
291 project_id: str | None = None,
292 ):
293 """Update document states for successfully processed documents."""
294 successfully_processed_docs = [
295 doc for doc in documents if doc.id in successfully_processed_doc_ids
296 ]
298 logger.debug(
299 f"Updating document states for {len(successfully_processed_docs)} documents"
300 )
302 # Ensure state manager is initialized before use
303 if not self.components.state_manager._initialized:
304 logger.debug("Initializing state manager for document state updates")
305 await self.components.state_manager.initialize()
307 for doc in successfully_processed_docs:
308 try:
309 await self.components.state_manager.update_document_state(
310 doc, project_id
311 )
312 logger.debug(f"Updated document state for {doc.id}")
313 except Exception as e:
314 logger.error(f"Failed to update document state for {doc.id}: {e}")