Coverage for src/qdrant_loader/core/pipeline/orchestrator.py: 78%

133 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Main orchestrator for the ingestion pipeline.""" 

2 

3from qdrant_loader.config import Settings, SourcesConfig 

4from qdrant_loader.connectors.confluence import ConfluenceConnector 

5from qdrant_loader.connectors.git import GitConnector 

6from qdrant_loader.connectors.jira import JiraConnector 

7from qdrant_loader.connectors.localfile import LocalFileConnector 

8from qdrant_loader.connectors.publicdocs import PublicDocsConnector 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.project_manager import ProjectManager 

11from qdrant_loader.core.state.state_change_detector import StateChangeDetector 

12from qdrant_loader.core.state.state_manager import StateManager 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15from .document_pipeline import DocumentPipeline 

16from .source_filter import SourceFilter 

17from .source_processor import SourceProcessor 

18 

19logger = LoggingConfig.get_logger(__name__) 

20 

21 

22class PipelineComponents: 

23 """Container for pipeline components.""" 

24 

25 def __init__( 

26 self, 

27 document_pipeline: DocumentPipeline, 

28 source_processor: SourceProcessor, 

29 source_filter: SourceFilter, 

30 state_manager: StateManager, 

31 ): 

32 self.document_pipeline = document_pipeline 

33 self.source_processor = source_processor 

34 self.source_filter = source_filter 

35 self.state_manager = state_manager 

36 

37 

38class PipelineOrchestrator: 

39 """Main orchestrator for the ingestion pipeline.""" 

40 

41 def __init__( 

42 self, 

43 settings: Settings, 

44 components: PipelineComponents, 

45 project_manager: ProjectManager | None = None, 

46 ): 

47 self.settings = settings 

48 self.components = components 

49 self.project_manager = project_manager 

50 

51 async def process_documents( 

52 self, 

53 sources_config: SourcesConfig | None = None, 

54 source_type: str | None = None, 

55 source: str | None = None, 

56 project_id: str | None = None, 

57 force: bool = False, 

58 ) -> list[Document]: 

59 """Main entry point for document processing. 

60 

61 Args: 

62 sources_config: Sources configuration to use (for backward compatibility) 

63 source_type: Filter by source type 

64 source: Filter by specific source name 

65 project_id: Process documents for a specific project 

66 force: Force processing of all documents, bypassing change detection 

67 

68 Returns: 

69 List of processed documents 

70 """ 

71 logger.info("🚀 Starting document ingestion") 

72 

73 try: 

74 # Determine sources configuration to use 

75 if sources_config: 

76 # Use provided sources config (backward compatibility) 

77 logger.debug("Using provided sources configuration") 

78 filtered_config = self.components.source_filter.filter_sources( 

79 sources_config, source_type, source 

80 ) 

81 current_project_id = None 

82 elif project_id: 

83 # Use project-specific sources configuration 

84 if not self.project_manager: 

85 raise ValueError( 

86 "Project manager not available for project-specific processing" 

87 ) 

88 

89 project_context = self.project_manager.get_project_context(project_id) 

90 if ( 

91 not project_context 

92 or not project_context.config 

93 or not project_context.config.sources 

94 ): 

95 raise ValueError( 

96 f"Project '{project_id}' not found or has no configuration" 

97 ) 

98 

99 logger.debug(f"Using project configuration for project: {project_id}") 

100 project_sources_config = project_context.config.sources 

101 filtered_config = self.components.source_filter.filter_sources( 

102 project_sources_config, source_type, source 

103 ) 

104 current_project_id = project_id 

105 else: 

106 # Process all projects 

107 if not self.project_manager: 

108 raise ValueError( 

109 "Project manager not available and no sources configuration provided" 

110 ) 

111 

112 logger.debug("Processing all projects") 

113 return await self._process_all_projects(source_type, source, force) 

114 

115 # Check if filtered config is empty 

116 if source_type and not any( 

117 [ 

118 filtered_config.git, 

119 filtered_config.confluence, 

120 filtered_config.jira, 

121 filtered_config.publicdocs, 

122 filtered_config.localfile, 

123 ] 

124 ): 

125 raise ValueError(f"No sources found for type '{source_type}'") 

126 

127 # Collect documents from all sources 

128 documents = await self._collect_documents_from_sources( 

129 filtered_config, current_project_id 

130 ) 

131 

132 if not documents: 

133 logger.info("✅ No documents found from sources") 

134 return [] 

135 

136 # Detect changes in documents (bypass if force=True) 

137 if force: 

138 logger.warning( 

139 f"🔄 Force mode enabled: bypassing change detection, processing all {len(documents)} documents" 

140 ) 

141 else: 

142 documents = await self._detect_document_changes( 

143 documents, filtered_config, current_project_id 

144 ) 

145 

146 if not documents: 

147 logger.info("✅ No new or updated documents to process") 

148 return [] 

149 

150 # Process documents through the pipeline 

151 result = await self.components.document_pipeline.process_documents( 

152 documents 

153 ) 

154 

155 # Update document states for successfully processed documents 

156 await self._update_document_states( 

157 documents, result.successfully_processed_documents, current_project_id 

158 ) 

159 

160 logger.info( 

161 f"✅ Ingestion completed: {result.success_count} chunks processed successfully" 

162 ) 

163 return documents 

164 

165 except Exception as e: 

166 logger.error(f"❌ Pipeline orchestration failed: {e}", exc_info=True) 

167 raise 

168 

169 async def _process_all_projects( 

170 self, 

171 source_type: str | None = None, 

172 source: str | None = None, 

173 force: bool = False, 

174 ) -> list[Document]: 

175 """Process documents from all configured projects.""" 

176 if not self.project_manager: 

177 raise ValueError("Project manager not available") 

178 

179 all_documents = [] 

180 project_ids = self.project_manager.list_project_ids() 

181 

182 logger.info(f"Processing {len(project_ids)} projects") 

183 

184 for project_id in project_ids: 

185 try: 

186 logger.debug(f"Processing project: {project_id}") 

187 project_documents = await self.process_documents( 

188 project_id=project_id, 

189 source_type=source_type, 

190 source=source, 

191 force=force, 

192 ) 

193 all_documents.extend(project_documents) 

194 logger.debug( 

195 f"Processed {len(project_documents)} documents from project: {project_id}" 

196 ) 

197 except Exception as e: 

198 logger.error( 

199 f"Failed to process project {project_id}: {e}", exc_info=True 

200 ) 

201 # Continue processing other projects 

202 continue 

203 

204 logger.info( 

205 f"Completed processing all projects: {len(all_documents)} total documents" 

206 ) 

207 return all_documents 

208 

209 async def _collect_documents_from_sources( 

210 self, filtered_config: SourcesConfig, project_id: str | None = None 

211 ) -> list[Document]: 

212 """Collect documents from all configured sources.""" 

213 documents = [] 

214 

215 # Process each source type with project context 

216 if filtered_config.confluence: 

217 confluence_docs = ( 

218 await self.components.source_processor.process_source_type( 

219 filtered_config.confluence, ConfluenceConnector, "Confluence" 

220 ) 

221 ) 

222 documents.extend(confluence_docs) 

223 

224 if filtered_config.git: 

225 git_docs = await self.components.source_processor.process_source_type( 

226 filtered_config.git, GitConnector, "Git" 

227 ) 

228 documents.extend(git_docs) 

229 

230 if filtered_config.jira: 

231 jira_docs = await self.components.source_processor.process_source_type( 

232 filtered_config.jira, JiraConnector, "Jira" 

233 ) 

234 documents.extend(jira_docs) 

235 

236 if filtered_config.publicdocs: 

237 publicdocs_docs = ( 

238 await self.components.source_processor.process_source_type( 

239 filtered_config.publicdocs, PublicDocsConnector, "PublicDocs" 

240 ) 

241 ) 

242 documents.extend(publicdocs_docs) 

243 

244 if filtered_config.localfile: 

245 localfile_docs = await self.components.source_processor.process_source_type( 

246 filtered_config.localfile, LocalFileConnector, "LocalFile" 

247 ) 

248 documents.extend(localfile_docs) 

249 

250 # Inject project metadata into documents if project context is available 

251 if project_id and self.project_manager: 

252 for document in documents: 

253 enhanced_metadata = self.project_manager.inject_project_metadata( 

254 project_id, document.metadata 

255 ) 

256 document.metadata = enhanced_metadata 

257 

258 logger.info(f"📄 Collected {len(documents)} documents from all sources") 

259 return documents 

260 

261 async def _detect_document_changes( 

262 self, 

263 documents: list[Document], 

264 filtered_config: SourcesConfig, 

265 project_id: str | None = None, 

266 ) -> list[Document]: 

267 """Detect changes in documents and return only new/updated ones.""" 

268 if not documents: 

269 return [] 

270 

271 logger.debug(f"Starting change detection for {len(documents)} documents") 

272 

273 try: 

274 # Ensure state manager is initialized before use 

275 if not self.components.state_manager._initialized: 

276 logger.debug("Initializing state manager for change detection") 

277 await self.components.state_manager.initialize() 

278 

279 async with StateChangeDetector( 

280 self.components.state_manager 

281 ) as change_detector: 

282 changes = await change_detector.detect_changes( 

283 documents, filtered_config 

284 ) 

285 

286 logger.info( 

287 f"🔍 Change detection: {len(changes['new'])} new, " 

288 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted" 

289 ) 

290 

291 # Return new and updated documents 

292 return changes["new"] + changes["updated"] 

293 

294 except Exception as e: 

295 logger.error(f"Error during change detection: {e}", exc_info=True) 

296 raise 

297 

298 async def _update_document_states( 

299 self, 

300 documents: list[Document], 

301 successfully_processed_doc_ids: set, 

302 project_id: str | None = None, 

303 ): 

304 """Update document states for successfully processed documents.""" 

305 successfully_processed_docs = [ 

306 doc for doc in documents if doc.id in successfully_processed_doc_ids 

307 ] 

308 

309 logger.debug( 

310 f"Updating document states for {len(successfully_processed_docs)} documents" 

311 ) 

312 

313 # Ensure state manager is initialized before use 

314 if not self.components.state_manager._initialized: 

315 logger.debug("Initializing state manager for document state updates") 

316 await self.components.state_manager.initialize() 

317 

318 for doc in successfully_processed_docs: 

319 try: 

320 await self.components.state_manager.update_document_state( 

321 doc, project_id 

322 ) 

323 logger.debug(f"Updated document state for {doc.id}") 

324 except Exception as e: 

325 logger.error(f"Failed to update document state for {doc.id}: {e}")