Coverage for src / qdrant_loader / core / pipeline / orchestrator.py: 75%
256 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Main orchestrator for the ingestion pipeline."""
3import traceback
4from collections.abc import AsyncIterator
5from datetime import datetime
7from qdrant_loader.config import Settings, SourcesConfig
8from qdrant_loader.connectors.base import ConnectorConfigurationError
9from qdrant_loader.connectors.factory import get_connector_instance
10from qdrant_loader.core.document import Document
11from qdrant_loader.core.project_manager import ProjectManager
12from qdrant_loader.core.qdrant_manager import QdrantManager
13from qdrant_loader.core.state.state_change_detector import StateChangeDetector
14from qdrant_loader.core.state.state_manager import StateManager
15from qdrant_loader.utils.logging import LoggingConfig
16from qdrant_loader.utils.sensitive import sanitize_exception_message
18from .document_pipeline import DocumentPipeline
19from .source_filter import SourceFilter
20from .source_processor import SourceProcessor
21from .workers.upsert_worker import PipelineResult
23logger = LoggingConfig.get_logger(__name__)
26class PipelineComponents:
27 """Container for pipeline components."""
29 def __init__(
30 self,
31 document_pipeline: DocumentPipeline,
32 source_processor: SourceProcessor,
33 source_filter: SourceFilter,
34 state_manager: StateManager,
35 qdrant_manager: QdrantManager,
36 ):
37 self.document_pipeline = document_pipeline
38 self.source_processor = source_processor
39 self.source_filter = source_filter
40 self.state_manager = state_manager
41 self.qdrant_manager = qdrant_manager
44class PipelineOrchestrator:
45 """Main orchestrator for the ingestion pipeline."""
47 def __init__(
48 self,
49 settings: Settings,
50 components: PipelineComponents,
51 project_manager: ProjectManager | None = None,
52 ):
53 self.settings = settings
54 self.components = components
55 self.project_manager = project_manager
56 self.last_pipeline_result = None
58 async def _stream_batches_from_sources(
59 self,
60 filtered_config: SourcesConfig,
61 batch_size: int = 256,
62 since: datetime | None = None,
63 project_id: str | None = None,
64 seen_uris: set[str] | None = None,
65 ) -> AsyncIterator[list[Document]]:
66 """Stream source documents in bounded micro-batches.
68 This helper collects documents from each source type and yields
69 batches of a fixed size, keeping memory usage bounded.
70 """
71 batch: list[Document] = []
73 # Note: previous implementation contained vestigial async inner
74 # helpers `_flush_batch` and `_append_document` which attempted to
75 # yield from inside non-generator contexts. These were dead code
76 # and confusing. Batching is handled inline in `_process_source_type`.
78 async def _process_source_type(source_type_name: str, source_configs):
79 if not source_configs:
80 return
82 async for (
83 document
84 ) in self.components.source_processor.stream_source_documents(
85 source_configs,
86 get_connector_instance,
87 source_type_name,
88 since=since,
89 ):
90 # Inject project metadata when running with project context
91 if project_id and self.project_manager:
92 try:
93 document.metadata = (
94 self.project_manager.inject_project_metadata(
95 project_id, document.metadata
96 )
97 )
98 except Exception:
99 # Don't let metadata injection break streaming; log and continue
100 logger.debug(
101 "Project metadata injection failed for document",
102 document_id=document.id,
103 project_id=project_id,
104 )
106 # Track seen URIs for potential post-stream reconciliation
107 if seen_uris is not None:
108 try:
109 uri = f"{document.source_type}:{document.source}:{document.url.rstrip('/') }"
110 seen_uris.add(uri)
111 except Exception:
112 pass
114 batch.append(document)
115 if len(batch) >= batch_size:
116 yield batch.copy()
117 batch.clear()
119 if filtered_config.confluence:
120 async for yielded_batch in _process_source_type(
121 "Confluence", filtered_config.confluence
122 ):
123 yield yielded_batch
125 if filtered_config.git:
126 async for yielded_batch in _process_source_type("Git", filtered_config.git):
127 yield yielded_batch
129 if filtered_config.jira:
130 async for yielded_batch in _process_source_type(
131 "Jira", filtered_config.jira
132 ):
133 yield yielded_batch
135 if filtered_config.publicdocs:
136 async for yielded_batch in _process_source_type(
137 "PublicDocs", filtered_config.publicdocs
138 ):
139 yield yielded_batch
141 if filtered_config.localfile:
142 async for yielded_batch in _process_source_type(
143 "LocalFile", filtered_config.localfile
144 ):
145 yield yielded_batch
147 if batch:
148 yield batch
150 async def process_documents(
151 self,
152 sources_config: SourcesConfig | None = None,
153 source_type: str | None = None,
154 source: str | None = None,
155 project_id: str | None = None,
156 force: bool = False,
157 since: datetime | None = None,
158 ) -> list[Document]:
159 """Main entry point for document processing.
161 Args:
162 sources_config: Sources configuration to use (for backward compatibility)
163 source_type: Filter by source type
164 source: Filter by specific source name
165 project_id: Process documents for a specific project
166 force: Force processing of all documents, bypassing change detection
167 since: Only stream documents updated after this timestamp (connector-level
168 filtering). Connectors that do not yet support time-based filtering will
169 fall back to full fetch with hash-based change detection.
171 Returns:
172 List of processed documents
173 """
174 logger.info("🚀 Starting document ingestion")
175 self.last_pipeline_result = None
177 try:
178 # Determine sources configuration to use
179 if sources_config:
180 # Use provided sources config (backward compatibility)
181 logger.debug("Using provided sources configuration")
182 filtered_config = self.components.source_filter.filter_sources(
183 sources_config, source_type, source
184 )
185 current_project_id = None
186 elif project_id:
187 # Use project-specific sources configuration
188 if not self.project_manager:
189 raise ValueError(
190 "Project manager not available for project-specific processing"
191 )
193 project_context = self.project_manager.get_project_context(project_id)
194 if (
195 not project_context
196 or not project_context.config
197 or not project_context.config.sources
198 ):
199 raise ValueError(
200 f"Project '{project_id}' not found or has no configuration"
201 )
203 logger.debug(f"Using project configuration for project: {project_id}")
204 project_sources_config = project_context.config.sources
205 filtered_config = self.components.source_filter.filter_sources(
206 project_sources_config, source_type, source
207 )
208 current_project_id = project_id
209 else:
210 # Process all projects
211 if not self.project_manager:
212 raise ValueError(
213 "Project manager not available and no sources configuration provided"
214 )
216 logger.debug("Processing all projects")
217 return await self._process_all_projects(
218 source_type, source, force, since
219 )
221 # Check if filtered config is empty
222 if source_type and not any(
223 [
224 filtered_config.git,
225 filtered_config.confluence,
226 filtered_config.jira,
227 filtered_config.publicdocs,
228 filtered_config.localfile,
229 ]
230 ):
231 raise ValueError(f"No sources found for type '{source_type}'")
233 # Stream documents in bounded micro-batches and process each batch
234 total_documents = 0
235 processed_documents: list[Document] = []
236 aggregated_result = PipelineResult()
237 batch_count = 0
239 if not force and not self.components.state_manager._initialized:
240 logger.debug("Initializing state manager for change detection")
241 await self.components.state_manager.initialize()
243 change_detector = None
244 if not force:
245 change_detector = await StateChangeDetector(
246 self.components.state_manager
247 ).__aenter__()
249 seen_uris: set[str] = set()
250 try:
251 # Prefer calling the new signature but fall back to the
252 # legacy 3-arg signature for backwards compatibility / tests.
253 try:
254 stream_iter = self._stream_batches_from_sources(
255 filtered_config,
256 256,
257 since,
258 project_id=current_project_id,
259 seen_uris=seen_uris,
260 )
261 except TypeError:
262 # Callable likely expects the old signature
263 stream_iter = self._stream_batches_from_sources(
264 filtered_config, 256, since
265 )
267 async for batch in stream_iter:
268 total_documents += len(batch)
269 batch_count += 1
271 if not force and change_detector is not None:
272 batch = await change_detector.classify_batch(
273 batch, filtered_config, current_project_id
274 )
276 if not batch:
277 continue
279 batch_result = (
280 await self.components.document_pipeline.process_batch(batch)
281 )
282 aggregated_result.success_count += batch_result.success_count
283 aggregated_result.error_count += batch_result.failure_count
284 aggregated_result.errors.extend(batch_result.errors)
285 aggregated_result.successfully_processed_documents.update(
286 batch_result.successfully_processed_documents
287 )
288 aggregated_result.failed_document_ids.update(
289 batch_result.failed_document_ids
290 )
292 if batch_result.successfully_processed_documents:
293 await self._update_document_states(
294 batch,
295 batch_result.successfully_processed_documents,
296 current_project_id,
297 )
298 processed_documents.extend(
299 [
300 doc
301 for doc in batch
302 if doc.id
303 in batch_result.successfully_processed_documents
304 ]
305 )
307 if total_documents == 0 and not force:
308 logger.warning(
309 "⚠️ EMPTY SNAPSHOT in non-force mode. About to enter change detection "
310 "which may classify existing corpus as deleted if source API returned partial/null results. "
311 "This is a known risk (WS-3: add explicit snapshot_is_complete signal or per-source enable_deletion_detection). "
312 "Proceeding carefully."
313 )
315 if total_documents == 0 and force:
316 logger.info("✅ No documents found from sources")
317 return []
319 if not force and not processed_documents:
320 self.last_pipeline_result = aggregated_result
321 if aggregated_result.error_count > 0:
322 logger.error(
323 "No documents were successfully processed",
324 error_count=aggregated_result.error_count,
325 )
326 else:
327 logger.info("No new or updated documents to process")
328 return []
330 # Deletion detection / reconciliation note:
331 # Streaming classification only detects new/updated documents
332 # per-batch. Full deletion detection (documents present in the
333 # state DB but absent from the current snapshot across all
334 # batches) requires a post-stream reconciliation (WS-3).
335 # For now we only log that reconciliation is possible and
336 # record the set of seen URIs; implementors can enable a
337 # reconciliation pass that compares previous state URIs to
338 # `seen_uris` and call `_process_deleted_documents`.
339 if not force:
340 logger.debug(
341 "Post-stream reconciliation not enabled. Seen URIs collected for potential WS-3 reconciliation",
342 seen_count=len(seen_uris),
343 )
345 self.last_pipeline_result = aggregated_result
346 logger.info(
347 f"✅ Ingestion completed: {aggregated_result.success_count} chunks processed successfully"
348 )
349 return processed_documents
350 finally:
351 if change_detector is not None:
352 await change_detector.__aexit__(None, None, None)
354 except Exception as e:
355 logger.error(
356 f"❌ Pipeline orchestration failed: {sanitize_exception_message(e)}",
357 error_type=type(e).__name__,
358 sanitized_traceback=sanitize_exception_message(traceback.format_exc()),
359 )
360 raise
362 async def _process_all_projects(
363 self,
364 source_type: str | None = None,
365 source: str | None = None,
366 force: bool = False,
367 since: datetime | None = None,
368 ) -> list[Document]:
369 """Process documents from all configured projects."""
370 if not self.project_manager:
371 raise ValueError("Project manager not available")
373 all_documents = []
374 aggregated_result = PipelineResult()
375 failed_projects: list[str] = []
376 project_ids = self.project_manager.list_project_ids()
378 logger.info(f"Processing {len(project_ids)} projects")
380 for project_id in project_ids:
381 try:
382 logger.debug(f"Processing project: {project_id}")
383 project_documents = await self.process_documents(
384 project_id=project_id,
385 source_type=source_type,
386 source=source,
387 force=force,
388 since=since,
389 )
390 project_result = self.last_pipeline_result
391 all_documents.extend(project_documents)
393 if project_result is not None:
394 aggregated_result.success_count += project_result.success_count
395 aggregated_result.error_count += project_result.error_count
396 aggregated_result.successfully_processed_documents.update(
397 project_result.successfully_processed_documents
398 )
399 aggregated_result.failed_document_ids.update(
400 project_result.failed_document_ids
401 )
402 aggregated_result.errors.extend(project_result.errors)
404 logger.debug(
405 f"Processed {len(project_documents)} documents from project: {project_id}"
406 )
407 except ConnectorConfigurationError as e:
408 logger.error(
409 f"Configuration error in project {project_id}: "
410 f"{sanitize_exception_message(e)}. "
411 "Skipping this project — check connector settings.",
412 error_type=type(e).__name__,
413 sanitized_traceback=sanitize_exception_message(
414 traceback.format_exc()
415 ),
416 )
417 aggregated_result.errors.append(
418 f"Configuration error in project {project_id}: "
419 f"{sanitize_exception_message(e)}"
420 )
421 failed_projects.append(project_id)
422 continue
423 except Exception as e:
424 safe_error = sanitize_exception_message(e)
425 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
426 aggregated_result.error_count += 1
427 aggregated_result.errors.append(
428 "project_id="
429 f"{project_id}; "
430 "error_type="
431 f"{type(e).__name__}; "
432 "message="
433 f"{safe_error}; "
434 "traceback="
435 f"{sanitized_traceback}"
436 )
437 logger.error(
438 f"Failed to process project {project_id}: {safe_error}",
439 error_type=type(e).__name__,
440 sanitized_traceback=sanitized_traceback,
441 )
442 failed_projects.append(project_id)
443 # Continue processing other projects
444 continue
446 self.last_pipeline_result = aggregated_result
448 total_count = len(project_ids)
449 failed_count = len(failed_projects)
450 success_count = total_count - failed_count
451 if failed_count > 0:
452 logger.warning(
453 f"Completed processing projects: {success_count}/{total_count} succeeded, "
454 f"{failed_count} failed. Check errors above for details.",
455 total_projects=total_count,
456 successful_projects=success_count,
457 failed_projects=failed_count,
458 )
459 else:
460 logger.info(
461 f"Completed processing all projects: {len(all_documents)} total documents"
462 )
463 return all_documents
465 async def _collect_documents_from_sources(
466 self, filtered_config: SourcesConfig, project_id: str | None = None
467 ) -> list[Document]:
468 """Collect documents from all configured sources."""
469 documents = []
471 # Process each source type with project context
472 if filtered_config.confluence:
473 confluence_docs = (
474 await self.components.source_processor.process_source_type(
475 filtered_config.confluence, get_connector_instance, "Confluence"
476 )
477 )
478 documents.extend(confluence_docs)
480 if filtered_config.git:
481 git_docs = await self.components.source_processor.process_source_type(
482 filtered_config.git, get_connector_instance, "Git"
483 )
484 documents.extend(git_docs)
486 if filtered_config.jira:
487 jira_docs = await self.components.source_processor.process_source_type(
488 filtered_config.jira, get_connector_instance, "Jira"
489 )
490 documents.extend(jira_docs)
492 if filtered_config.publicdocs:
493 publicdocs_docs = (
494 await self.components.source_processor.process_source_type(
495 filtered_config.publicdocs, get_connector_instance, "PublicDocs"
496 )
497 )
498 documents.extend(publicdocs_docs)
500 if filtered_config.localfile:
501 localfile_docs = await self.components.source_processor.process_source_type(
502 filtered_config.localfile, get_connector_instance, "LocalFile"
503 )
504 documents.extend(localfile_docs)
506 # Inject project metadata into documents if project context is available
507 if project_id and self.project_manager:
508 for document in documents:
509 enhanced_metadata = self.project_manager.inject_project_metadata(
510 project_id, document.metadata
511 )
512 document.metadata = enhanced_metadata
514 logger.info(f"📄 Collected {len(documents)} documents from all sources")
515 return documents
517 async def _detect_document_changes(
518 self,
519 documents: list[Document],
520 filtered_config: SourcesConfig,
521 project_id: str | None = None,
522 ) -> list[Document]:
523 """Detect changes in documents and return only new/updated ones."""
525 logger.debug(f"Starting change detection for {len(documents)} documents")
527 try:
528 # Ensure state manager is initialized before use
529 if not self.components.state_manager._initialized:
530 logger.debug("Initializing state manager for change detection")
531 await self.components.state_manager.initialize()
533 async with StateChangeDetector(
534 self.components.state_manager
535 ) as change_detector:
536 changes = await change_detector.detect_changes(
537 documents, filtered_config
538 )
540 new_documents = list(changes.get("new") or [])
541 updated_documents = list(changes.get("updated") or [])
542 deleted_documents = list(changes.get("deleted") or [])
544 logger.info(
545 f"🔍 Change detection: {len(new_documents)} new, "
546 f"{len(updated_documents)} updated, "
547 f"{len(deleted_documents)} deleted"
548 )
550 if deleted_documents:
551 await self._process_deleted_documents(
552 deleted_documents,
553 project_id,
554 )
556 documents_to_process = new_documents + updated_documents
558 if not documents_to_process and deleted_documents:
559 logger.info(
560 "No new or updated documents to process, "
561 "but deleted documents were handled"
562 )
564 return documents_to_process
566 except Exception as e:
567 logger.error(
568 f"Error during change detection: {sanitize_exception_message(e)}",
569 error_type=type(e).__name__,
570 )
571 raise
573 async def _process_deleted_documents(
574 self,
575 deleted_documents: list[Document],
576 project_id: str | None = None,
577 ) -> None:
578 """Process deleted documents by updating state and removing points from Qdrant."""
579 if not deleted_documents:
580 return
582 logger.info(f"Processing {len(deleted_documents)} deleted documents")
584 if not self.components.state_manager._initialized:
585 logger.debug("Initializing state manager for deleted document processing")
586 await self.components.state_manager.initialize()
588 # Use an atomic operation that marks state and deletes points together.
589 try:
590 deleted_ids = (
591 await self.components.state_manager.mark_documents_deleted_atomic(
592 deleted_documents, self.components.qdrant_manager, project_id
593 )
594 )
595 if deleted_ids:
596 logger.info(
597 f"Deleted {len(deleted_ids)} document points from Qdrant and updated state"
598 )
599 except Exception as e:
600 logger.error(
601 f"Failed to process deleted documents atomically: {sanitize_exception_message(e)}",
602 error_type=type(e).__name__,
603 )
604 raise
606 async def _update_document_states(
607 self,
608 documents: list[Document],
609 successfully_processed_doc_ids: set,
610 project_id: str | None = None,
611 ):
612 """Update document states for successfully processed documents."""
613 successfully_processed_docs = [
614 doc for doc in documents if doc.id in successfully_processed_doc_ids
615 ]
617 logger.debug(
618 f"Updating document states for {len(successfully_processed_docs)} documents"
619 )
621 # Ensure state manager is initialized before use
622 if not self.components.state_manager._initialized:
623 logger.debug("Initializing state manager for document state updates")
624 await self.components.state_manager.initialize()
626 for doc in successfully_processed_docs:
627 try:
628 await self.components.state_manager.update_document_state(
629 doc, project_id
630 )
631 logger.debug(f"Updated document state for {doc.id}")
632 except Exception as e:
633 logger.error(
634 f"Failed to update document state for {doc.id}: {sanitize_exception_message(e)}",
635 error_type=type(e).__name__,
636 )