Coverage for src/qdrant_loader/core/pipeline/document_pipeline.py: 28%
46 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Document processing pipeline that coordinates chunking, embedding, and upserting."""
3import asyncio
4import time
5from qdrant_loader.core.document import Document
6from qdrant_loader.utils.logging import LoggingConfig
8from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker
9from .workers.upsert_worker import PipelineResult
11logger = LoggingConfig.get_logger(__name__)
14class DocumentPipeline:
15 """Handles the chunking -> embedding -> upsert pipeline."""
17 def __init__(
18 self,
19 chunking_worker: ChunkingWorker,
20 embedding_worker: EmbeddingWorker,
21 upsert_worker: UpsertWorker,
22 ):
23 self.chunking_worker = chunking_worker
24 self.embedding_worker = embedding_worker
25 self.upsert_worker = upsert_worker
27 async def process_documents(self, documents: list[Document]) -> PipelineResult:
28 """Process documents through the pipeline.
30 Args:
31 documents: List of documents to process
33 Returns:
34 PipelineResult with processing statistics
35 """
36 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline")
37 start_time = time.time()
39 try:
40 # Step 1: Chunk documents
41 logger.info("🔄 Starting chunking phase...")
42 chunking_start = time.time()
43 chunks_iter = self.chunking_worker.process_documents(documents)
45 # Step 2: Generate embeddings
46 logger.info("🔄 Chunking completed, transitioning to embedding phase...")
47 chunking_duration = time.time() - chunking_start
48 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds")
50 embedding_start = time.time()
51 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter)
53 # Step 3: Upsert to Qdrant
54 logger.info("🔄 Embedding phase ready, starting upsert phase...")
56 # Add timeout for the entire pipeline to prevent indefinite hanging
57 try:
58 result = await asyncio.wait_for(
59 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter),
60 timeout=3600.0, # 1 hour timeout for the entire pipeline
61 )
62 except asyncio.TimeoutError:
63 logger.error("❌ Pipeline timed out after 1 hour")
64 result = PipelineResult()
65 result.error_count = len(documents)
66 result.errors = ["Pipeline timed out after 1 hour"]
67 return result
69 total_duration = time.time() - start_time
70 embedding_duration = time.time() - embedding_start
72 logger.info(
73 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds"
74 )
75 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds")
76 logger.info(
77 f"✅ Pipeline completed: {result.success_count} chunks processed, "
78 f"{result.error_count} errors"
79 )
81 return result
83 except Exception as e:
84 total_duration = time.time() - start_time
85 logger.error(
86 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}",
87 exc_info=True,
88 )
89 # Return a result with error information
90 result = PipelineResult()
91 result.error_count = len(documents)
92 result.errors = [f"Pipeline failed: {e}"]
93 return result