Coverage for src/qdrant_loader/core/pipeline/document_pipeline.py: 28%
46 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Document processing pipeline that coordinates chunking, embedding, and upserting."""
3import asyncio
4import time
6from qdrant_loader.core.document import Document
7from qdrant_loader.utils.logging import LoggingConfig
9from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker
10from .workers.upsert_worker import PipelineResult
12logger = LoggingConfig.get_logger(__name__)
15class DocumentPipeline:
16 """Handles the chunking -> embedding -> upsert pipeline."""
18 def __init__(
19 self,
20 chunking_worker: ChunkingWorker,
21 embedding_worker: EmbeddingWorker,
22 upsert_worker: UpsertWorker,
23 ):
24 self.chunking_worker = chunking_worker
25 self.embedding_worker = embedding_worker
26 self.upsert_worker = upsert_worker
28 async def process_documents(self, documents: list[Document]) -> PipelineResult:
29 """Process documents through the pipeline.
31 Args:
32 documents: List of documents to process
34 Returns:
35 PipelineResult with processing statistics
36 """
37 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline")
38 start_time = time.time()
40 try:
41 # Step 1: Chunk documents
42 logger.info("🔄 Starting chunking phase...")
43 chunking_start = time.time()
44 chunks_iter = self.chunking_worker.process_documents(documents)
46 # Step 2: Generate embeddings
47 logger.info("🔄 Chunking completed, transitioning to embedding phase...")
48 chunking_duration = time.time() - chunking_start
49 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds")
51 embedding_start = time.time()
52 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter)
54 # Step 3: Upsert to Qdrant
55 logger.info("🔄 Embedding phase ready, starting upsert phase...")
57 # Add timeout for the entire pipeline to prevent indefinite hanging
58 try:
59 result = await asyncio.wait_for(
60 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter),
61 timeout=3600.0, # 1 hour timeout for the entire pipeline
62 )
63 except TimeoutError:
64 logger.error("❌ Pipeline timed out after 1 hour")
65 result = PipelineResult()
66 result.error_count = len(documents)
67 result.errors = ["Pipeline timed out after 1 hour"]
68 return result
70 total_duration = time.time() - start_time
71 embedding_duration = time.time() - embedding_start
73 logger.info(
74 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds"
75 )
76 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds")
77 logger.info(
78 f"✅ Pipeline completed: {result.success_count} chunks processed, "
79 f"{result.error_count} errors"
80 )
82 return result
84 except Exception as e:
85 total_duration = time.time() - start_time
86 logger.error(
87 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}",
88 exc_info=True,
89 )
90 # Return a result with error information
91 result = PipelineResult()
92 result.error_count = len(documents)
93 result.errors = [f"Pipeline failed: {e}"]
94 return result