Coverage for src/qdrant_loader/core/pipeline/document_pipeline.py: 28%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Document processing pipeline that coordinates chunking, embedding, and upserting.""" 

2 

3import asyncio 

4import time 

5 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.utils.logging import LoggingConfig 

8 

9from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker 

10from .workers.upsert_worker import PipelineResult 

11 

12logger = LoggingConfig.get_logger(__name__) 

13 

14 

15class DocumentPipeline: 

16 """Handles the chunking -> embedding -> upsert pipeline.""" 

17 

18 def __init__( 

19 self, 

20 chunking_worker: ChunkingWorker, 

21 embedding_worker: EmbeddingWorker, 

22 upsert_worker: UpsertWorker, 

23 ): 

24 self.chunking_worker = chunking_worker 

25 self.embedding_worker = embedding_worker 

26 self.upsert_worker = upsert_worker 

27 

28 async def process_documents(self, documents: list[Document]) -> PipelineResult: 

29 """Process documents through the pipeline. 

30 

31 Args: 

32 documents: List of documents to process 

33 

34 Returns: 

35 PipelineResult with processing statistics 

36 """ 

37 logger.info(f"⚙️ Processing {len(documents)} documents through pipeline") 

38 start_time = time.time() 

39 

40 try: 

41 # Step 1: Chunk documents 

42 logger.info("🔄 Starting chunking phase...") 

43 chunking_start = time.time() 

44 chunks_iter = self.chunking_worker.process_documents(documents) 

45 

46 # Step 2: Generate embeddings 

47 logger.info("🔄 Chunking completed, transitioning to embedding phase...") 

48 chunking_duration = time.time() - chunking_start 

49 logger.info(f"⏱️ Chunking phase took {chunking_duration:.2f} seconds") 

50 

51 embedding_start = time.time() 

52 embedded_chunks_iter = self.embedding_worker.process_chunks(chunks_iter) 

53 

54 # Step 3: Upsert to Qdrant 

55 logger.info("🔄 Embedding phase ready, starting upsert phase...") 

56 

57 # Add timeout for the entire pipeline to prevent indefinite hanging 

58 try: 

59 result = await asyncio.wait_for( 

60 self.upsert_worker.process_embedded_chunks(embedded_chunks_iter), 

61 timeout=3600.0, # 1 hour timeout for the entire pipeline 

62 ) 

63 except TimeoutError: 

64 logger.error("❌ Pipeline timed out after 1 hour") 

65 result = PipelineResult() 

66 result.error_count = len(documents) 

67 result.errors = ["Pipeline timed out after 1 hour"] 

68 return result 

69 

70 total_duration = time.time() - start_time 

71 embedding_duration = time.time() - embedding_start 

72 

73 logger.info( 

74 f"⏱️ Embedding + Upsert phase took {embedding_duration:.2f} seconds" 

75 ) 

76 logger.info(f"⏱️ Total pipeline duration: {total_duration:.2f} seconds") 

77 logger.info( 

78 f"✅ Pipeline completed: {result.success_count} chunks processed, " 

79 f"{result.error_count} errors" 

80 ) 

81 

82 return result 

83 

84 except Exception as e: 

85 total_duration = time.time() - start_time 

86 logger.error( 

87 f"❌ Document pipeline failed after {total_duration:.2f} seconds: {e}", 

88 exc_info=True, 

89 ) 

90 # Return a result with error information 

91 result = PipelineResult() 

92 result.error_count = len(documents) 

93 result.errors = [f"Pipeline failed: {e}"] 

94 return result