Coverage for src/qdrant_loader/core/pipeline/document_pipeline.py: 28%

46 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Document processing pipeline that coordinates chunking, embedding, and upserting.""" 

2 

3import asyncio 

4import time 

5from qdrant_loader.core.document import Document 

6from qdrant_loader.utils.logging import LoggingConfig 

7 

8from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker 

9from .workers.upsert_worker import PipelineResult 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14class DocumentPipeline: 

15 """Handles the chunking -> embedding -> upsert pipeline.""" 

16 

17 def __init__( 

18 self, 

19 chunking_worker: ChunkingWorker, 

20 embedding_worker: EmbeddingWorker, 

21 upsert_worker: UpsertWorker, 

22 ): 

23 self.chunking_worker = chunking_worker 

24 self.embedding_worker = embedding_worker 

25 self.upsert_worker = upsert_worker 

26 

27 async def process_documents(self, documents: list[Document]) -> PipelineResult: 

28 """Process documents through the pipeline. 

29 

30 Args: 

31 documents: List of documents to process 

32 

33 Returns: 

34 PipelineResult with processing statistics 

35 """ 

36 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline") 

37 start_time = time.time() 

38 

39 try: 

40 # Step 1: Chunk documents 

41 logger.info("🔄 Starting chunking phase...") 

42 chunking_start = time.time() 

43 chunks_iter = self.chunking_worker.process_documents(documents) 

44 

45 # Step 2: Generate embeddings 

46 logger.info("🔄 Chunking completed, transitioning to embedding phase...") 

47 chunking_duration = time.time() - chunking_start 

48 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds") 

49 

50 embedding_start = time.time() 

51 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter) 

52 

53 # Step 3: Upsert to Qdrant 

54 logger.info("🔄 Embedding phase ready, starting upsert phase...") 

55 

56 # Add timeout for the entire pipeline to prevent indefinite hanging 

57 try: 

58 result = await asyncio.wait_for( 

59 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter), 

60 timeout=3600.0, # 1 hour timeout for the entire pipeline 

61 ) 

62 except asyncio.TimeoutError: 

63 logger.error("❌ Pipeline timed out after 1 hour") 

64 result = PipelineResult() 

65 result.error_count = len(documents) 

66 result.errors = ["Pipeline timed out after 1 hour"] 

67 return result 

68 

69 total_duration = time.time() - start_time 

70 embedding_duration = time.time() - embedding_start 

71 

72 logger.info( 

73 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds" 

74 ) 

75 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds") 

76 logger.info( 

77 f"✅ Pipeline completed: {result.success_count} chunks processed, " 

78 f"{result.error_count} errors" 

79 ) 

80 

81 return result 

82 

83 except Exception as e: 

84 total_duration = time.time() - start_time 

85 logger.error( 

86 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}", 

87 exc_info=True, 

88 ) 

89 # Return a result with error information 

90 result = PipelineResult() 

91 result.error_count = len(documents) 

92 result.errors = [f"Pipeline failed: {e}"] 

93 return result