Coverage for src / qdrant_loader / core / pipeline / document_pipeline.py: 64%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Document processing pipeline that coordinates chunking, embedding, and upserting."""
3import asyncio
4import time
5from dataclasses import dataclass
7from qdrant_loader.core.document import Document
8from qdrant_loader.utils.logging import LoggingConfig
10from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker
11from .workers.upsert_worker import PipelineResult
13logger = LoggingConfig.get_logger(__name__)
16@dataclass
17class BatchResult:
18 """Result of processing a bounded batch of documents."""
20 success_count: int = 0
21 failure_count: int = 0
22 skipped_count: int = 0
23 successfully_processed_documents: set[str] | None = None
24 failed_document_ids: set[str] | None = None
25 errors: list[str] | None = None
27 def __post_init__(self) -> None:
28 if self.successfully_processed_documents is None:
29 self.successfully_processed_documents = set()
30 if self.failed_document_ids is None:
31 self.failed_document_ids = set()
32 if self.errors is None:
33 self.errors = []
36class DocumentPipeline:
37 """Handles the chunking -> embedding -> upsert pipeline."""
39 def __init__(
40 self,
41 chunking_worker: ChunkingWorker,
42 embedding_worker: EmbeddingWorker,
43 upsert_worker: UpsertWorker,
44 ):
45 self.chunking_worker = chunking_worker
46 self.embedding_worker = embedding_worker
47 self.upsert_worker = upsert_worker
49 async def process_batch(self, batch: list[Document]) -> BatchResult:
50 """Process a bounded batch of documents through the pipeline.
52 Args:
53 batch: List of documents to process (bounded size, typically 256)
55 Returns:
56 BatchResult with processing statistics.
57 """
58 logger.info(f"⚙️ Processing batch of {len(batch)} documents through pipeline")
59 start_time = time.time()
61 try:
62 logger.debug("🔄 Starting chunking phase for batch...")
63 chunking_start = time.time()
64 chunks_iter = self.chunking_worker.process_documents(batch)
66 logger.debug("🔄 Chunking completed, transitioning to embedding phase...")
67 chunking_duration = time.time() - chunking_start
68 logger.debug(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds")
70 embedding_start = time.time()
71 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter)
73 logger.debug("🔄 Embedding phase ready, starting upsert phase...")
75 try:
76 pipeline_result = await asyncio.wait_for(
77 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter),
78 timeout=600.0, # 10 minute timeout per batch
79 )
80 except TimeoutError:
81 logger.error("❌ Batch processing timed out after 10 minutes")
82 return BatchResult(
83 failure_count=len(batch),
84 errors=["Batch processing timed out after 10 minutes"],
85 )
87 total_duration = time.time() - start_time
88 embedding_duration = time.time() - embedding_start
90 logger.debug(
91 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds"
92 )
93 logger.info(
94 f"✅ Batch processing completed: {pipeline_result.success_count} chunks, "
95 f"{pipeline_result.error_count} errors in {total_duration:.2f}s"
96 )
98 return BatchResult(
99 success_count=pipeline_result.success_count,
100 failure_count=pipeline_result.error_count,
101 skipped_count=0,
102 successfully_processed_documents=pipeline_result.successfully_processed_documents,
103 failed_document_ids=pipeline_result.failed_document_ids,
104 errors=pipeline_result.errors,
105 )
107 except Exception as e:
108 total_duration = time.time() - start_time
109 logger.error(
110 f"❌ Batch processing failed after {total_duration:.2f} seconds: {e}",
111 exc_info=True,
112 )
113 return BatchResult(
114 failure_count=len(batch),
115 errors=[f"Batch processing failed: {e}"],
116 )
118 async def process_documents(self, documents: list[Document]) -> PipelineResult:
119 """Process documents through the pipeline.
121 Args:
122 documents: List of documents to process
124 Returns:
125 PipelineResult with processing statistics
126 """
127 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline")
128 start_time = time.time()
130 try:
131 logger.info("🔄 Starting chunking phase...")
132 chunking_start = time.time()
133 chunks_iter = self.chunking_worker.process_documents(documents)
135 logger.info("🔄 Chunking completed, transitioning to embedding phase...")
136 chunking_duration = time.time() - chunking_start
137 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds")
139 embedding_start = time.time()
140 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter)
142 logger.info("🔄 Embedding phase ready, starting upsert phase...")
144 try:
145 result = await asyncio.wait_for(
146 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter),
147 timeout=3600.0, # 1 hour timeout for the entire pipeline
148 )
149 except TimeoutError:
150 logger.error("❌ Pipeline timed out after 1 hour")
151 result = PipelineResult()
152 result.error_count = len(documents)
153 result.errors = ["Pipeline timed out after 1 hour"]
154 return result
156 total_duration = time.time() - start_time
157 embedding_duration = time.time() - embedding_start
159 logger.info(
160 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds"
161 )
162 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds")
163 logger.info(
164 f"✅ Pipeline completed: {result.success_count} chunks processed, "
165 f"{result.error_count} errors"
166 )
168 return result
170 except Exception as e:
171 total_duration = time.time() - start_time
172 logger.error(
173 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}",
174 exc_info=True,
175 )
176 result = PipelineResult()
177 result.error_count = len(documents)
178 result.errors = [f"Pipeline failed: {e}"]
179 return result