Coverage for src / qdrant_loader / core / pipeline / orchestrator.py: 75%

256 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Main orchestrator for the ingestion pipeline.""" 

2 

3import traceback 

4from collections.abc import AsyncIterator 

5from datetime import datetime 

6 

7from qdrant_loader.config import Settings, SourcesConfig 

8from qdrant_loader.connectors.base import ConnectorConfigurationError 

9from qdrant_loader.connectors.factory import get_connector_instance 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.core.project_manager import ProjectManager 

12from qdrant_loader.core.qdrant_manager import QdrantManager 

13from qdrant_loader.core.state.state_change_detector import StateChangeDetector 

14from qdrant_loader.core.state.state_manager import StateManager 

15from qdrant_loader.utils.logging import LoggingConfig 

16from qdrant_loader.utils.sensitive import sanitize_exception_message 

17 

18from .document_pipeline import DocumentPipeline 

19from .source_filter import SourceFilter 

20from .source_processor import SourceProcessor 

21from .workers.upsert_worker import PipelineResult 

22 

23logger = LoggingConfig.get_logger(__name__) 

24 

25 

26class PipelineComponents: 

27 """Container for pipeline components.""" 

28 

29 def __init__( 

30 self, 

31 document_pipeline: DocumentPipeline, 

32 source_processor: SourceProcessor, 

33 source_filter: SourceFilter, 

34 state_manager: StateManager, 

35 qdrant_manager: QdrantManager, 

36 ): 

37 self.document_pipeline = document_pipeline 

38 self.source_processor = source_processor 

39 self.source_filter = source_filter 

40 self.state_manager = state_manager 

41 self.qdrant_manager = qdrant_manager 

42 

43 

44class PipelineOrchestrator: 

45 """Main orchestrator for the ingestion pipeline.""" 

46 

47 def __init__( 

48 self, 

49 settings: Settings, 

50 components: PipelineComponents, 

51 project_manager: ProjectManager | None = None, 

52 ): 

53 self.settings = settings 

54 self.components = components 

55 self.project_manager = project_manager 

56 self.last_pipeline_result = None 

57 

58 async def _stream_batches_from_sources( 

59 self, 

60 filtered_config: SourcesConfig, 

61 batch_size: int = 256, 

62 since: datetime | None = None, 

63 project_id: str | None = None, 

64 seen_uris: set[str] | None = None, 

65 ) -> AsyncIterator[list[Document]]: 

66 """Stream source documents in bounded micro-batches. 

67 

68 This helper collects documents from each source type and yields 

69 batches of a fixed size, keeping memory usage bounded. 

70 """ 

71 batch: list[Document] = [] 

72 

73 # Note: previous implementation contained vestigial async inner 

74 # helpers `_flush_batch` and `_append_document` which attempted to 

75 # yield from inside non-generator contexts. These were dead code 

76 # and confusing. Batching is handled inline in `_process_source_type`. 

77 

78 async def _process_source_type(source_type_name: str, source_configs): 

79 if not source_configs: 

80 return 

81 

82 async for ( 

83 document 

84 ) in self.components.source_processor.stream_source_documents( 

85 source_configs, 

86 get_connector_instance, 

87 source_type_name, 

88 since=since, 

89 ): 

90 # Inject project metadata when running with project context 

91 if project_id and self.project_manager: 

92 try: 

93 document.metadata = ( 

94 self.project_manager.inject_project_metadata( 

95 project_id, document.metadata 

96 ) 

97 ) 

98 except Exception: 

99 # Don't let metadata injection break streaming; log and continue 

100 logger.debug( 

101 "Project metadata injection failed for document", 

102 document_id=document.id, 

103 project_id=project_id, 

104 ) 

105 

106 # Track seen URIs for potential post-stream reconciliation 

107 if seen_uris is not None: 

108 try: 

109 uri = f"{document.source_type}:{document.source}:{document.url.rstrip('/') }" 

110 seen_uris.add(uri) 

111 except Exception: 

112 pass 

113 

114 batch.append(document) 

115 if len(batch) >= batch_size: 

116 yield batch.copy() 

117 batch.clear() 

118 

119 if filtered_config.confluence: 

120 async for yielded_batch in _process_source_type( 

121 "Confluence", filtered_config.confluence 

122 ): 

123 yield yielded_batch 

124 

125 if filtered_config.git: 

126 async for yielded_batch in _process_source_type("Git", filtered_config.git): 

127 yield yielded_batch 

128 

129 if filtered_config.jira: 

130 async for yielded_batch in _process_source_type( 

131 "Jira", filtered_config.jira 

132 ): 

133 yield yielded_batch 

134 

135 if filtered_config.publicdocs: 

136 async for yielded_batch in _process_source_type( 

137 "PublicDocs", filtered_config.publicdocs 

138 ): 

139 yield yielded_batch 

140 

141 if filtered_config.localfile: 

142 async for yielded_batch in _process_source_type( 

143 "LocalFile", filtered_config.localfile 

144 ): 

145 yield yielded_batch 

146 

147 if batch: 

148 yield batch 

149 

150 async def process_documents( 

151 self, 

152 sources_config: SourcesConfig | None = None, 

153 source_type: str | None = None, 

154 source: str | None = None, 

155 project_id: str | None = None, 

156 force: bool = False, 

157 since: datetime | None = None, 

158 ) -> list[Document]: 

159 """Main entry point for document processing. 

160 

161 Args: 

162 sources_config: Sources configuration to use (for backward compatibility) 

163 source_type: Filter by source type 

164 source: Filter by specific source name 

165 project_id: Process documents for a specific project 

166 force: Force processing of all documents, bypassing change detection 

167 since: Only stream documents updated after this timestamp (connector-level 

168 filtering). Connectors that do not yet support time-based filtering will 

169 fall back to full fetch with hash-based change detection. 

170 

171 Returns: 

172 List of processed documents 

173 """ 

174 logger.info("🚀 Starting document ingestion") 

175 self.last_pipeline_result = None 

176 

177 try: 

178 # Determine sources configuration to use 

179 if sources_config: 

180 # Use provided sources config (backward compatibility) 

181 logger.debug("Using provided sources configuration") 

182 filtered_config = self.components.source_filter.filter_sources( 

183 sources_config, source_type, source 

184 ) 

185 current_project_id = None 

186 elif project_id: 

187 # Use project-specific sources configuration 

188 if not self.project_manager: 

189 raise ValueError( 

190 "Project manager not available for project-specific processing" 

191 ) 

192 

193 project_context = self.project_manager.get_project_context(project_id) 

194 if ( 

195 not project_context 

196 or not project_context.config 

197 or not project_context.config.sources 

198 ): 

199 raise ValueError( 

200 f"Project '{project_id}' not found or has no configuration" 

201 ) 

202 

203 logger.debug(f"Using project configuration for project: {project_id}") 

204 project_sources_config = project_context.config.sources 

205 filtered_config = self.components.source_filter.filter_sources( 

206 project_sources_config, source_type, source 

207 ) 

208 current_project_id = project_id 

209 else: 

210 # Process all projects 

211 if not self.project_manager: 

212 raise ValueError( 

213 "Project manager not available and no sources configuration provided" 

214 ) 

215 

216 logger.debug("Processing all projects") 

217 return await self._process_all_projects( 

218 source_type, source, force, since 

219 ) 

220 

221 # Check if filtered config is empty 

222 if source_type and not any( 

223 [ 

224 filtered_config.git, 

225 filtered_config.confluence, 

226 filtered_config.jira, 

227 filtered_config.publicdocs, 

228 filtered_config.localfile, 

229 ] 

230 ): 

231 raise ValueError(f"No sources found for type '{source_type}'") 

232 

233 # Stream documents in bounded micro-batches and process each batch 

234 total_documents = 0 

235 processed_documents: list[Document] = [] 

236 aggregated_result = PipelineResult() 

237 batch_count = 0 

238 

239 if not force and not self.components.state_manager._initialized: 

240 logger.debug("Initializing state manager for change detection") 

241 await self.components.state_manager.initialize() 

242 

243 change_detector = None 

244 if not force: 

245 change_detector = await StateChangeDetector( 

246 self.components.state_manager 

247 ).__aenter__() 

248 

249 seen_uris: set[str] = set() 

250 try: 

251 # Prefer calling the new signature but fall back to the 

252 # legacy 3-arg signature for backwards compatibility / tests. 

253 try: 

254 stream_iter = self._stream_batches_from_sources( 

255 filtered_config, 

256 256, 

257 since, 

258 project_id=current_project_id, 

259 seen_uris=seen_uris, 

260 ) 

261 except TypeError: 

262 # Callable likely expects the old signature 

263 stream_iter = self._stream_batches_from_sources( 

264 filtered_config, 256, since 

265 ) 

266 

267 async for batch in stream_iter: 

268 total_documents += len(batch) 

269 batch_count += 1 

270 

271 if not force and change_detector is not None: 

272 batch = await change_detector.classify_batch( 

273 batch, filtered_config, current_project_id 

274 ) 

275 

276 if not batch: 

277 continue 

278 

279 batch_result = ( 

280 await self.components.document_pipeline.process_batch(batch) 

281 ) 

282 aggregated_result.success_count += batch_result.success_count 

283 aggregated_result.error_count += batch_result.failure_count 

284 aggregated_result.errors.extend(batch_result.errors) 

285 aggregated_result.successfully_processed_documents.update( 

286 batch_result.successfully_processed_documents 

287 ) 

288 aggregated_result.failed_document_ids.update( 

289 batch_result.failed_document_ids 

290 ) 

291 

292 if batch_result.successfully_processed_documents: 

293 await self._update_document_states( 

294 batch, 

295 batch_result.successfully_processed_documents, 

296 current_project_id, 

297 ) 

298 processed_documents.extend( 

299 [ 

300 doc 

301 for doc in batch 

302 if doc.id 

303 in batch_result.successfully_processed_documents 

304 ] 

305 ) 

306 

307 if total_documents == 0 and not force: 

308 logger.warning( 

309 "⚠️ EMPTY SNAPSHOT in non-force mode. About to enter change detection " 

310 "which may classify existing corpus as deleted if source API returned partial/null results. " 

311 "This is a known risk (WS-3: add explicit snapshot_is_complete signal or per-source enable_deletion_detection). " 

312 "Proceeding carefully." 

313 ) 

314 

315 if total_documents == 0 and force: 

316 logger.info("✅ No documents found from sources") 

317 return [] 

318 

319 if not force and not processed_documents: 

320 self.last_pipeline_result = aggregated_result 

321 if aggregated_result.error_count > 0: 

322 logger.error( 

323 "No documents were successfully processed", 

324 error_count=aggregated_result.error_count, 

325 ) 

326 else: 

327 logger.info("No new or updated documents to process") 

328 return [] 

329 

330 # Deletion detection / reconciliation note: 

331 # Streaming classification only detects new/updated documents 

332 # per-batch. Full deletion detection (documents present in the 

333 # state DB but absent from the current snapshot across all 

334 # batches) requires a post-stream reconciliation (WS-3). 

335 # For now we only log that reconciliation is possible and 

336 # record the set of seen URIs; implementors can enable a 

337 # reconciliation pass that compares previous state URIs to 

338 # `seen_uris` and call `_process_deleted_documents`. 

339 if not force: 

340 logger.debug( 

341 "Post-stream reconciliation not enabled. Seen URIs collected for potential WS-3 reconciliation", 

342 seen_count=len(seen_uris), 

343 ) 

344 

345 self.last_pipeline_result = aggregated_result 

346 logger.info( 

347 f"✅ Ingestion completed: {aggregated_result.success_count} chunks processed successfully" 

348 ) 

349 return processed_documents 

350 finally: 

351 if change_detector is not None: 

352 await change_detector.__aexit__(None, None, None) 

353 

354 except Exception as e: 

355 logger.error( 

356 f"❌ Pipeline orchestration failed: {sanitize_exception_message(e)}", 

357 error_type=type(e).__name__, 

358 sanitized_traceback=sanitize_exception_message(traceback.format_exc()), 

359 ) 

360 raise 

361 

362 async def _process_all_projects( 

363 self, 

364 source_type: str | None = None, 

365 source: str | None = None, 

366 force: bool = False, 

367 since: datetime | None = None, 

368 ) -> list[Document]: 

369 """Process documents from all configured projects.""" 

370 if not self.project_manager: 

371 raise ValueError("Project manager not available") 

372 

373 all_documents = [] 

374 aggregated_result = PipelineResult() 

375 failed_projects: list[str] = [] 

376 project_ids = self.project_manager.list_project_ids() 

377 

378 logger.info(f"Processing {len(project_ids)} projects") 

379 

380 for project_id in project_ids: 

381 try: 

382 logger.debug(f"Processing project: {project_id}") 

383 project_documents = await self.process_documents( 

384 project_id=project_id, 

385 source_type=source_type, 

386 source=source, 

387 force=force, 

388 since=since, 

389 ) 

390 project_result = self.last_pipeline_result 

391 all_documents.extend(project_documents) 

392 

393 if project_result is not None: 

394 aggregated_result.success_count += project_result.success_count 

395 aggregated_result.error_count += project_result.error_count 

396 aggregated_result.successfully_processed_documents.update( 

397 project_result.successfully_processed_documents 

398 ) 

399 aggregated_result.failed_document_ids.update( 

400 project_result.failed_document_ids 

401 ) 

402 aggregated_result.errors.extend(project_result.errors) 

403 

404 logger.debug( 

405 f"Processed {len(project_documents)} documents from project: {project_id}" 

406 ) 

407 except ConnectorConfigurationError as e: 

408 logger.error( 

409 f"Configuration error in project {project_id}: " 

410 f"{sanitize_exception_message(e)}. " 

411 "Skipping this project — check connector settings.", 

412 error_type=type(e).__name__, 

413 sanitized_traceback=sanitize_exception_message( 

414 traceback.format_exc() 

415 ), 

416 ) 

417 aggregated_result.errors.append( 

418 f"Configuration error in project {project_id}: " 

419 f"{sanitize_exception_message(e)}" 

420 ) 

421 failed_projects.append(project_id) 

422 continue 

423 except Exception as e: 

424 safe_error = sanitize_exception_message(e) 

425 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

426 aggregated_result.error_count += 1 

427 aggregated_result.errors.append( 

428 "project_id=" 

429 f"{project_id}; " 

430 "error_type=" 

431 f"{type(e).__name__}; " 

432 "message=" 

433 f"{safe_error}; " 

434 "traceback=" 

435 f"{sanitized_traceback}" 

436 ) 

437 logger.error( 

438 f"Failed to process project {project_id}: {safe_error}", 

439 error_type=type(e).__name__, 

440 sanitized_traceback=sanitized_traceback, 

441 ) 

442 failed_projects.append(project_id) 

443 # Continue processing other projects 

444 continue 

445 

446 self.last_pipeline_result = aggregated_result 

447 

448 total_count = len(project_ids) 

449 failed_count = len(failed_projects) 

450 success_count = total_count - failed_count 

451 if failed_count > 0: 

452 logger.warning( 

453 f"Completed processing projects: {success_count}/{total_count} succeeded, " 

454 f"{failed_count} failed. Check errors above for details.", 

455 total_projects=total_count, 

456 successful_projects=success_count, 

457 failed_projects=failed_count, 

458 ) 

459 else: 

460 logger.info( 

461 f"Completed processing all projects: {len(all_documents)} total documents" 

462 ) 

463 return all_documents 

464 

465 async def _collect_documents_from_sources( 

466 self, filtered_config: SourcesConfig, project_id: str | None = None 

467 ) -> list[Document]: 

468 """Collect documents from all configured sources.""" 

469 documents = [] 

470 

471 # Process each source type with project context 

472 if filtered_config.confluence: 

473 confluence_docs = ( 

474 await self.components.source_processor.process_source_type( 

475 filtered_config.confluence, get_connector_instance, "Confluence" 

476 ) 

477 ) 

478 documents.extend(confluence_docs) 

479 

480 if filtered_config.git: 

481 git_docs = await self.components.source_processor.process_source_type( 

482 filtered_config.git, get_connector_instance, "Git" 

483 ) 

484 documents.extend(git_docs) 

485 

486 if filtered_config.jira: 

487 jira_docs = await self.components.source_processor.process_source_type( 

488 filtered_config.jira, get_connector_instance, "Jira" 

489 ) 

490 documents.extend(jira_docs) 

491 

492 if filtered_config.publicdocs: 

493 publicdocs_docs = ( 

494 await self.components.source_processor.process_source_type( 

495 filtered_config.publicdocs, get_connector_instance, "PublicDocs" 

496 ) 

497 ) 

498 documents.extend(publicdocs_docs) 

499 

500 if filtered_config.localfile: 

501 localfile_docs = await self.components.source_processor.process_source_type( 

502 filtered_config.localfile, get_connector_instance, "LocalFile" 

503 ) 

504 documents.extend(localfile_docs) 

505 

506 # Inject project metadata into documents if project context is available 

507 if project_id and self.project_manager: 

508 for document in documents: 

509 enhanced_metadata = self.project_manager.inject_project_metadata( 

510 project_id, document.metadata 

511 ) 

512 document.metadata = enhanced_metadata 

513 

514 logger.info(f"📄 Collected {len(documents)} documents from all sources") 

515 return documents 

516 

517 async def _detect_document_changes( 

518 self, 

519 documents: list[Document], 

520 filtered_config: SourcesConfig, 

521 project_id: str | None = None, 

522 ) -> list[Document]: 

523 """Detect changes in documents and return only new/updated ones.""" 

524 

525 logger.debug(f"Starting change detection for {len(documents)} documents") 

526 

527 try: 

528 # Ensure state manager is initialized before use 

529 if not self.components.state_manager._initialized: 

530 logger.debug("Initializing state manager for change detection") 

531 await self.components.state_manager.initialize() 

532 

533 async with StateChangeDetector( 

534 self.components.state_manager 

535 ) as change_detector: 

536 changes = await change_detector.detect_changes( 

537 documents, filtered_config 

538 ) 

539 

540 new_documents = list(changes.get("new") or []) 

541 updated_documents = list(changes.get("updated") or []) 

542 deleted_documents = list(changes.get("deleted") or []) 

543 

544 logger.info( 

545 f"🔍 Change detection: {len(new_documents)} new, " 

546 f"{len(updated_documents)} updated, " 

547 f"{len(deleted_documents)} deleted" 

548 ) 

549 

550 if deleted_documents: 

551 await self._process_deleted_documents( 

552 deleted_documents, 

553 project_id, 

554 ) 

555 

556 documents_to_process = new_documents + updated_documents 

557 

558 if not documents_to_process and deleted_documents: 

559 logger.info( 

560 "No new or updated documents to process, " 

561 "but deleted documents were handled" 

562 ) 

563 

564 return documents_to_process 

565 

566 except Exception as e: 

567 logger.error( 

568 f"Error during change detection: {sanitize_exception_message(e)}", 

569 error_type=type(e).__name__, 

570 ) 

571 raise 

572 

573 async def _process_deleted_documents( 

574 self, 

575 deleted_documents: list[Document], 

576 project_id: str | None = None, 

577 ) -> None: 

578 """Process deleted documents by updating state and removing points from Qdrant.""" 

579 if not deleted_documents: 

580 return 

581 

582 logger.info(f"Processing {len(deleted_documents)} deleted documents") 

583 

584 if not self.components.state_manager._initialized: 

585 logger.debug("Initializing state manager for deleted document processing") 

586 await self.components.state_manager.initialize() 

587 

588 # Use an atomic operation that marks state and deletes points together. 

589 try: 

590 deleted_ids = ( 

591 await self.components.state_manager.mark_documents_deleted_atomic( 

592 deleted_documents, self.components.qdrant_manager, project_id 

593 ) 

594 ) 

595 if deleted_ids: 

596 logger.info( 

597 f"Deleted {len(deleted_ids)} document points from Qdrant and updated state" 

598 ) 

599 except Exception as e: 

600 logger.error( 

601 f"Failed to process deleted documents atomically: {sanitize_exception_message(e)}", 

602 error_type=type(e).__name__, 

603 ) 

604 raise 

605 

606 async def _update_document_states( 

607 self, 

608 documents: list[Document], 

609 successfully_processed_doc_ids: set, 

610 project_id: str | None = None, 

611 ): 

612 """Update document states for successfully processed documents.""" 

613 successfully_processed_docs = [ 

614 doc for doc in documents if doc.id in successfully_processed_doc_ids 

615 ] 

616 

617 logger.debug( 

618 f"Updating document states for {len(successfully_processed_docs)} documents" 

619 ) 

620 

621 # Ensure state manager is initialized before use 

622 if not self.components.state_manager._initialized: 

623 logger.debug("Initializing state manager for document state updates") 

624 await self.components.state_manager.initialize() 

625 

626 for doc in successfully_processed_docs: 

627 try: 

628 await self.components.state_manager.update_document_state( 

629 doc, project_id 

630 ) 

631 logger.debug(f"Updated document state for {doc.id}") 

632 except Exception as e: 

633 logger.error( 

634 f"Failed to update document state for {doc.id}: {sanitize_exception_message(e)}", 

635 error_type=type(e).__name__, 

636 )