Coverage for src / qdrant_loader / core / pipeline / orchestrator.py: 91%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Main orchestrator for the ingestion pipeline.""" 

2 

3import traceback 

4 

5from qdrant_loader.config import Settings, SourcesConfig 

6from qdrant_loader.connectors.base import ConnectorConfigurationError 

7from qdrant_loader.connectors.factory import get_connector_instance 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.project_manager import ProjectManager 

10from qdrant_loader.core.state.state_change_detector import StateChangeDetector 

11from qdrant_loader.core.state.state_manager import StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13from qdrant_loader.utils.sensitive import sanitize_exception_message 

14 

15from .document_pipeline import DocumentPipeline 

16from .source_filter import SourceFilter 

17from .source_processor import SourceProcessor 

18from .workers.upsert_worker import PipelineResult 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22 

23class PipelineComponents: 

24 """Container for pipeline components.""" 

25 

26 def __init__( 

27 self, 

28 document_pipeline: DocumentPipeline, 

29 source_processor: SourceProcessor, 

30 source_filter: SourceFilter, 

31 state_manager: StateManager, 

32 ): 

33 self.document_pipeline = document_pipeline 

34 self.source_processor = source_processor 

35 self.source_filter = source_filter 

36 self.state_manager = state_manager 

37 

38 

39class PipelineOrchestrator: 

40 """Main orchestrator for the ingestion pipeline.""" 

41 

42 def __init__( 

43 self, 

44 settings: Settings, 

45 components: PipelineComponents, 

46 project_manager: ProjectManager | None = None, 

47 ): 

48 self.settings = settings 

49 self.components = components 

50 self.project_manager = project_manager 

51 self.last_pipeline_result = None 

52 

53 async def process_documents( 

54 self, 

55 sources_config: SourcesConfig | None = None, 

56 source_type: str | None = None, 

57 source: str | None = None, 

58 project_id: str | None = None, 

59 force: bool = False, 

60 ) -> list[Document]: 

61 """Main entry point for document processing. 

62 

63 Args: 

64 sources_config: Sources configuration to use (for backward compatibility) 

65 source_type: Filter by source type 

66 source: Filter by specific source name 

67 project_id: Process documents for a specific project 

68 force: Force processing of all documents, bypassing change detection 

69 

70 Returns: 

71 List of processed documents 

72 """ 

73 logger.info("🚀 Starting document ingestion") 

74 self.last_pipeline_result = None 

75 

76 try: 

77 # Determine sources configuration to use 

78 if sources_config: 

79 # Use provided sources config (backward compatibility) 

80 logger.debug("Using provided sources configuration") 

81 filtered_config = self.components.source_filter.filter_sources( 

82 sources_config, source_type, source 

83 ) 

84 current_project_id = None 

85 elif project_id: 

86 # Use project-specific sources configuration 

87 if not self.project_manager: 

88 raise ValueError( 

89 "Project manager not available for project-specific processing" 

90 ) 

91 

92 project_context = self.project_manager.get_project_context(project_id) 

93 if ( 

94 not project_context 

95 or not project_context.config 

96 or not project_context.config.sources 

97 ): 

98 raise ValueError( 

99 f"Project '{project_id}' not found or has no configuration" 

100 ) 

101 

102 logger.debug(f"Using project configuration for project: {project_id}") 

103 project_sources_config = project_context.config.sources 

104 filtered_config = self.components.source_filter.filter_sources( 

105 project_sources_config, source_type, source 

106 ) 

107 current_project_id = project_id 

108 else: 

109 # Process all projects 

110 if not self.project_manager: 

111 raise ValueError( 

112 "Project manager not available and no sources configuration provided" 

113 ) 

114 

115 logger.debug("Processing all projects") 

116 return await self._process_all_projects(source_type, source, force) 

117 

118 # Check if filtered config is empty 

119 if source_type and not any( 

120 [ 

121 filtered_config.git, 

122 filtered_config.confluence, 

123 filtered_config.jira, 

124 filtered_config.publicdocs, 

125 filtered_config.localfile, 

126 ] 

127 ): 

128 raise ValueError(f"No sources found for type '{source_type}'") 

129 

130 # Collect documents from all sources 

131 documents = await self._collect_documents_from_sources( 

132 filtered_config, current_project_id 

133 ) 

134 

135 if not documents: 

136 logger.info("✅ No documents found from sources") 

137 return [] 

138 

139 # Detect changes in documents (bypass if force=True) 

140 if force: 

141 logger.warning( 

142 f"🔄 Force mode enabled: bypassing change detection, processing all {len(documents)} documents" 

143 ) 

144 else: 

145 documents = await self._detect_document_changes( 

146 documents, filtered_config, current_project_id 

147 ) 

148 

149 if not documents: 

150 logger.info("✅ No new or updated documents to process") 

151 return [] 

152 

153 # Process documents through the pipeline 

154 result = await self.components.document_pipeline.process_documents( 

155 documents 

156 ) 

157 self.last_pipeline_result = result 

158 

159 # Update document states for successfully processed documents 

160 await self._update_document_states( 

161 documents, result.successfully_processed_documents, current_project_id 

162 ) 

163 

164 logger.info( 

165 f"✅ Ingestion completed: {result.success_count} chunks processed successfully" 

166 ) 

167 return documents 

168 

169 except Exception as e: 

170 logger.error( 

171 f"❌ Pipeline orchestration failed: {sanitize_exception_message(e)}", 

172 error_type=type(e).__name__, 

173 sanitized_traceback=sanitize_exception_message(traceback.format_exc()), 

174 ) 

175 raise 

176 

177 async def _process_all_projects( 

178 self, 

179 source_type: str | None = None, 

180 source: str | None = None, 

181 force: bool = False, 

182 ) -> list[Document]: 

183 """Process documents from all configured projects.""" 

184 if not self.project_manager: 

185 raise ValueError("Project manager not available") 

186 

187 all_documents = [] 

188 aggregated_result = PipelineResult() 

189 failed_projects: list[str] = [] 

190 project_ids = self.project_manager.list_project_ids() 

191 

192 logger.info(f"Processing {len(project_ids)} projects") 

193 

194 for project_id in project_ids: 

195 try: 

196 logger.debug(f"Processing project: {project_id}") 

197 project_documents = await self.process_documents( 

198 project_id=project_id, 

199 source_type=source_type, 

200 source=source, 

201 force=force, 

202 ) 

203 project_result = self.last_pipeline_result 

204 all_documents.extend(project_documents) 

205 

206 if project_result is not None: 

207 aggregated_result.success_count += project_result.success_count 

208 aggregated_result.error_count += project_result.error_count 

209 aggregated_result.successfully_processed_documents.update( 

210 project_result.successfully_processed_documents 

211 ) 

212 aggregated_result.failed_document_ids.update( 

213 project_result.failed_document_ids 

214 ) 

215 aggregated_result.errors.extend(project_result.errors) 

216 

217 logger.debug( 

218 f"Processed {len(project_documents)} documents from project: {project_id}" 

219 ) 

220 except ConnectorConfigurationError as e: 

221 logger.error( 

222 f"Configuration error in project {project_id}: " 

223 f"{sanitize_exception_message(e)}. " 

224 "Skipping this project — check connector settings.", 

225 error_type=type(e).__name__, 

226 sanitized_traceback=sanitize_exception_message( 

227 traceback.format_exc() 

228 ), 

229 ) 

230 aggregated_result.errors.append( 

231 f"Configuration error in project {project_id}: " 

232 f"{sanitize_exception_message(e)}" 

233 ) 

234 failed_projects.append(project_id) 

235 continue 

236 except Exception as e: 

237 safe_error = sanitize_exception_message(e) 

238 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

239 aggregated_result.error_count += 1 

240 aggregated_result.errors.append( 

241 "project_id=" 

242 f"{project_id}; " 

243 "error_type=" 

244 f"{type(e).__name__}; " 

245 "message=" 

246 f"{safe_error}; " 

247 "traceback=" 

248 f"{sanitized_traceback}" 

249 ) 

250 logger.error( 

251 f"Failed to process project {project_id}: {safe_error}", 

252 error_type=type(e).__name__, 

253 sanitized_traceback=sanitized_traceback, 

254 ) 

255 failed_projects.append(project_id) 

256 # Continue processing other projects 

257 continue 

258 

259 self.last_pipeline_result = aggregated_result 

260 

261 total_count = len(project_ids) 

262 failed_count = len(failed_projects) 

263 success_count = total_count - failed_count 

264 if failed_count > 0: 

265 logger.warning( 

266 f"Completed processing projects: {success_count}/{total_count} succeeded, " 

267 f"{failed_count} failed. Check errors above for details.", 

268 total_projects=total_count, 

269 successful_projects=success_count, 

270 failed_projects=failed_count, 

271 ) 

272 else: 

273 logger.info( 

274 f"Completed processing all projects: {len(all_documents)} total documents" 

275 ) 

276 return all_documents 

277 

278 async def _collect_documents_from_sources( 

279 self, filtered_config: SourcesConfig, project_id: str | None = None 

280 ) -> list[Document]: 

281 """Collect documents from all configured sources.""" 

282 documents = [] 

283 

284 # Process each source type with project context 

285 if filtered_config.confluence: 

286 confluence_docs = ( 

287 await self.components.source_processor.process_source_type( 

288 filtered_config.confluence, get_connector_instance, "Confluence" 

289 ) 

290 ) 

291 documents.extend(confluence_docs) 

292 

293 if filtered_config.git: 

294 git_docs = await self.components.source_processor.process_source_type( 

295 filtered_config.git, get_connector_instance, "Git" 

296 ) 

297 documents.extend(git_docs) 

298 

299 if filtered_config.jira: 

300 jira_docs = await self.components.source_processor.process_source_type( 

301 filtered_config.jira, get_connector_instance, "Jira" 

302 ) 

303 documents.extend(jira_docs) 

304 

305 if filtered_config.publicdocs: 

306 publicdocs_docs = ( 

307 await self.components.source_processor.process_source_type( 

308 filtered_config.publicdocs, get_connector_instance, "PublicDocs" 

309 ) 

310 ) 

311 documents.extend(publicdocs_docs) 

312 

313 if filtered_config.localfile: 

314 localfile_docs = await self.components.source_processor.process_source_type( 

315 filtered_config.localfile, get_connector_instance, "LocalFile" 

316 ) 

317 documents.extend(localfile_docs) 

318 

319 # Inject project metadata into documents if project context is available 

320 if project_id and self.project_manager: 

321 for document in documents: 

322 enhanced_metadata = self.project_manager.inject_project_metadata( 

323 project_id, document.metadata 

324 ) 

325 document.metadata = enhanced_metadata 

326 

327 logger.info(f"📄 Collected {len(documents)} documents from all sources") 

328 return documents 

329 

330 async def _detect_document_changes( 

331 self, 

332 documents: list[Document], 

333 filtered_config: SourcesConfig, 

334 project_id: str | None = None, 

335 ) -> list[Document]: 

336 """Detect changes in documents and return only new/updated ones.""" 

337 if not documents: 

338 return [] 

339 

340 logger.debug(f"Starting change detection for {len(documents)} documents") 

341 

342 try: 

343 # Ensure state manager is initialized before use 

344 if not self.components.state_manager._initialized: 

345 logger.debug("Initializing state manager for change detection") 

346 await self.components.state_manager.initialize() 

347 

348 async with StateChangeDetector( 

349 self.components.state_manager 

350 ) as change_detector: 

351 changes = await change_detector.detect_changes( 

352 documents, filtered_config 

353 ) 

354 

355 logger.info( 

356 f"🔍 Change detection: {len(changes['new'])} new, " 

357 f"{len(changes['updated'])} updated, {len(changes['deleted'])} deleted" 

358 ) 

359 

360 # Return new and updated documents 

361 return changes["new"] + changes["updated"] 

362 

363 except Exception as e: 

364 logger.error( 

365 f"Error during change detection: {sanitize_exception_message(e)}", 

366 error_type=type(e).__name__, 

367 ) 

368 raise 

369 

370 async def _update_document_states( 

371 self, 

372 documents: list[Document], 

373 successfully_processed_doc_ids: set, 

374 project_id: str | None = None, 

375 ): 

376 """Update document states for successfully processed documents.""" 

377 successfully_processed_docs = [ 

378 doc for doc in documents if doc.id in successfully_processed_doc_ids 

379 ] 

380 

381 logger.debug( 

382 f"Updating document states for {len(successfully_processed_docs)} documents" 

383 ) 

384 

385 # Ensure state manager is initialized before use 

386 if not self.components.state_manager._initialized: 

387 logger.debug("Initializing state manager for document state updates") 

388 await self.components.state_manager.initialize() 

389 

390 for doc in successfully_processed_docs: 

391 try: 

392 await self.components.state_manager.update_document_state( 

393 doc, project_id 

394 ) 

395 logger.debug(f"Updated document state for {doc.id}") 

396 except Exception as e: 

397 logger.error( 

398 f"Failed to update document state for {doc.id}: {sanitize_exception_message(e)}", 

399 error_type=type(e).__name__, 

400 )