Coverage for src/qdrant_loader/core/pipeline/orchestrator.py: 79%

131 statements  

ยซ prev     ^ index     ยป next       coverage.py v7.9.1, created at 2025-06-18 09:27 +0000

1"""Main orchestrator for the ingestion pipeline.""" 

2 

3from qdrant_loader.config import Settings, SourcesConfig 

4from qdrant_loader.connectors.confluence import ConfluenceConnector 

5from qdrant_loader.connectors.git import GitConnector 

6from qdrant_loader.connectors.jira import JiraConnector 

7from qdrant_loader.connectors.localfile import LocalFileConnector 

8from qdrant_loader.connectors.publicdocs import PublicDocsConnector 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.project_manager import ProjectManager 

11from qdrant_loader.core.state.state_change_detector import StateChangeDetector 

12from qdrant_loader.core.state.state_manager import StateManager 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15from .document_pipeline import DocumentPipeline 

16from .source_filter import SourceFilter 

17from .source_processor import SourceProcessor 

18 

19logger = LoggingConfig.get_logger(__name__) 

20 

21 

22class PipelineComponents: 

23 """Container for pipeline components.""" 

24 

25 def __init__( 

26 self, 

27 document_pipeline: DocumentPipeline, 

28 source_processor: SourceProcessor, 

29 source_filter: SourceFilter, 

30 state_manager: StateManager, 

31 ): 

32 self.document_pipeline = document_pipeline 

33 self.source_processor = source_processor 

34 self.source_filter = source_filter 

35 self.state_manager = state_manager 

36 

37 

38class PipelineOrchestrator: 

39 """Main orchestrator for the ingestion pipeline.""" 

40 

41 def __init__( 

42 self, 

43 settings: Settings, 

44 components: PipelineComponents, 

45 project_manager: ProjectManager | None = None, 

46 ): 

47 self.settings = settings 

48 self.components = components 

49 self.project_manager = project_manager 

50 

51 async def process_documents( 

52 self, 

53 sources_config: SourcesConfig | None = None, 

54 source_type: str | None = None, 

55 source: str | None = None, 

56 project_id: str | None = None, 

57 ) -> list[Document]: 

58 """Main entry point for document processing. 

59 

60 Args: 

61 sources_config: Sources configuration to use (for backward compatibility) 

62 source_type: Filter by source type 

63 source: Filter by specific source name 

64 project_id: Process documents for a specific project 

65 

66 Returns: 

67 List of processed documents 

68 """ 

69 logger.info("๐Ÿš€ Starting document ingestion") 

70 

71 try: 

72 # Determine sources configuration to use 

73 if sources_config: 

74 # Use provided sources config (backward compatibility) 

75 logger.debug("Using provided sources configuration") 

76 filtered_config = self.components.source_filter.filter_sources( 

77 sources_config, source_type, source 

78 ) 

79 current_project_id = None 

80 elif project_id: 

81 # Use project-specific sources configuration 

82 if not self.project_manager: 

83 raise ValueError( 

84 "Project manager not available for project-specific processing" 

85 ) 

86 

87 project_context = self.project_manager.get_project_context(project_id) 

88 if ( 

89 not project_context 

90 or not project_context.config 

91 or not project_context.config.sources 

92 ): 

93 raise ValueError( 

94 f"Project '{project_id}' not found or has no configuration" 

95 ) 

96 

97 logger.debug(f"Using project configuration for project: {project_id}") 

98 project_sources_config = project_context.config.sources 

99 filtered_config = self.components.source_filter.filter_sources( 

100 project_sources_config, source_type, source 

101 ) 

102 current_project_id = project_id 

103 else: 

104 # Process all projects 

105 if not self.project_manager: 

106 raise ValueError( 

107 "Project manager not available and no sources configuration provided" 

108 ) 

109 

110 logger.debug("Processing all projects") 

111 return await self._process_all_projects(source_type, source) 

112 

113 # Check if filtered config is empty 

114 if source_type and not any( 

115 [ 

116 filtered_config.git, 

117 filtered_config.confluence, 

118 filtered_config.jira, 

119 filtered_config.publicdocs, 

120 filtered_config.localfile, 

121 ] 

122 ): 

123 raise ValueError(f"No sources found for type '{source_type}'") 

124 

125 # Collect documents from all sources 

126 documents = await self._collect_documents_from_sources( 

127 filtered_config, current_project_id 

128 ) 

129 

130 if not documents: 

131 logger.info("โœ… No documents found from sources") 

132 return [] 

133 

134 # Detect changes in documents 

135 documents = await self._detect_document_changes( 

136 documents, filtered_config, current_project_id 

137 ) 

138 

139 if not documents: 

140 logger.info("โœ… No new or updated documents to process") 

141 return [] 

142 

143 # Process documents through the pipeline 

144 result = await self.components.document_pipeline.process_documents( 

145 documents 

146 ) 

147 

148 # Update document states for successfully processed documents 

149 await self._update_document_states( 

150 documents, result.successfully_processed_documents, current_project_id 

151 ) 

152 

153 logger.info( 

154 f"โœ… Ingestion completed: {result.success_count} chunks processed successfully" 

155 ) 

156 return documents 

157 

158 except Exception as e: 

159 logger.error(f"โŒ Pipeline orchestration failed: {e}", exc_info=True) 

160 raise 

161 

162 async def _process_all_projects( 

163 self, source_type: str | None = None, source: str | None = None 

164 ) -> list[Document]: 

165 """Process documents from all configured projects.""" 

166 if not self.project_manager: 

167 raise ValueError("Project manager not available") 

168 

169 all_documents = [] 

170 project_ids = self.project_manager.list_project_ids() 

171 

172 logger.info(f"Processing {len(project_ids)} projects") 

173 

174 for project_id in project_ids: 

175 try: 

176 logger.debug(f"Processing project: {project_id}") 

177 project_documents = await self.process_documents( 

178 project_id=project_id, 

179 source_type=source_type, 

180 source=source, 

181 ) 

182 all_documents.extend(project_documents) 

183 logger.debug( 

184 f"Processed {len(project_documents)} documents from project: {project_id}" 

185 ) 

186 except Exception as e: 

187 logger.error( 

188 f"Failed to process project {project_id}: {e}", exc_info=True 

189 ) 

190 # Continue processing other projects 

191 continue 

192 

193 logger.info( 

194 f"Completed processing all projects: {len(all_documents)} total documents" 

195 ) 

196 return all_documents 

197 

198 async def _collect_documents_from_sources( 

199 self, filtered_config: SourcesConfig, project_id: str | None = None 

200 ) -> list[Document]: 

201 """Collect documents from all configured sources.""" 

202 documents = [] 

203 

204 # Process each source type with project context 

205 if filtered_config.confluence: 

206 confluence_docs = ( 

207 await self.components.source_processor.process_source_type( 

208 filtered_config.confluence, ConfluenceConnector, "Confluence" 

209 ) 

210 ) 

211 documents.extend(confluence_docs) 

212 

213 if filtered_config.git: 

214 git_docs = await self.components.source_processor.process_source_type( 

215 filtered_config.git, GitConnector, "Git" 

216 ) 

217 documents.extend(git_docs) 

218 

219 if filtered_config.jira: 

220 jira_docs = await self.components.source_processor.process_source_type( 

221 filtered_config.jira, JiraConnector, "Jira" 

222 ) 

223 documents.extend(jira_docs) 

224 

225 if filtered_config.publicdocs: 

226 publicdocs_docs = ( 

227 await self.components.source_processor.process_source_type( 

228 filtered_config.publicdocs, PublicDocsConnector, "PublicDocs" 

229 ) 

230 ) 

231 documents.extend(publicdocs_docs) 

232 

233 if filtered_config.localfile: 

234 localfile_docs = await self.components.source_processor.process_source_type( 

235 filtered_config.localfile, LocalFileConnector, "LocalFile" 

236 ) 

237 documents.extend(localfile_docs) 

238 

239 # Inject project metadata into documents if project context is available 

240 if project_id and self.project_manager: 

241 for document in documents: 

242 enhanced_metadata = self.project_manager.inject_project_metadata( 

243 project_id, document.metadata 

244 ) 

245 document.metadata = enhanced_metadata 

246 

247 logger.info(f"๐Ÿ“„ Collected {len(documents)} documents from all sources") 

248 return documents 

249 

250 async def _detect_document_changes( 

251 self, 

252 documents: list[Document], 

253 filtered_config: SourcesConfig, 

254 project_id: str | None = None, 

255 ) -> list[Document]: 

256 """Detect changes in documents and return only new/updated ones.""" 

257 if not documents: 

258 return [] 

259 

260 logger.debug(f"Starting change detection for {len(documents)} documents") 

261 

262 try: 

263 # Ensure state manager is initialized before use 

264 if not self.components.state_manager._initialized: 

265 logger.debug("Initializing state manager for change detection") 

266 await self.components.state_manager.initialize() 

267 

268 async with StateChangeDetector( 

269 self.components.state_manager 

270 ) as change_detector: 

271 changes = await change_detector.detect_changes( 

272 documents, filtered_config 

273 ) 

274 

275 logger.info( 

276 f"๐Ÿ” Change detection: {len(changes['new'])} new, " 

277 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted" 

278 ) 

279 

280 # Return new and updated documents 

281 return changes["new"] + changes["updated"] 

282 

283 except Exception as e: 

284 logger.error(f"Error during change detection: {e}", exc_info=True) 

285 raise 

286 

287 async def _update_document_states( 

288 self, 

289 documents: list[Document], 

290 successfully_processed_doc_ids: set, 

291 project_id: str | None = None, 

292 ): 

293 """Update document states for successfully processed documents.""" 

294 successfully_processed_docs = [ 

295 doc for doc in documents if doc.id in successfully_processed_doc_ids 

296 ] 

297 

298 logger.debug( 

299 f"Updating document states for {len(successfully_processed_docs)} documents" 

300 ) 

301 

302 # Ensure state manager is initialized before use 

303 if not self.components.state_manager._initialized: 

304 logger.debug("Initializing state manager for document state updates") 

305 await self.components.state_manager.initialize() 

306 

307 for doc in successfully_processed_docs: 

308 try: 

309 await self.components.state_manager.update_document_state( 

310 doc, project_id 

311 ) 

312 logger.debug(f"Updated document state for {doc.id}") 

313 except Exception as e: 

314 logger.error(f"Failed to update document state for {doc.id}: {e}")