Coverage for src/qdrant_loader/core/pipeline/factory.py: 93%

42 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Factory for creating pipeline components.""" 

2 

3import concurrent.futures 

4from pathlib import Path 

5 

6from qdrant_loader.config import Settings 

7from qdrant_loader.core.chunking.chunking_service import ChunkingService 

8from qdrant_loader.core.embedding.embedding_service import EmbeddingService 

9from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor 

10from qdrant_loader.core.qdrant_manager import QdrantManager 

11from qdrant_loader.core.state.state_manager import StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14from .config import PipelineConfig 

15from .document_pipeline import DocumentPipeline 

16from .orchestrator import PipelineComponents 

17from .resource_manager import ResourceManager 

18from .source_filter import SourceFilter 

19from .source_processor import SourceProcessor 

20from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class PipelineComponentsFactory: 

26 """Factory for creating pipeline components.""" 

27 

28 def create_components( 

29 self, 

30 settings: Settings, 

31 config: PipelineConfig, 

32 qdrant_manager: QdrantManager, 

33 state_manager: StateManager | None = None, 

34 resource_manager: ResourceManager | None = None, 

35 ) -> PipelineComponents: 

36 """Create all pipeline components. 

37 

38 Args: 

39 settings: Application settings 

40 config: Pipeline configuration 

41 qdrant_manager: QdrantManager instance 

42 state_manager: Optional state manager (will create if not provided) 

43 resource_manager: Optional resource manager (will create if not provided) 

44 

45 Returns: 

46 PipelineComponents with all initialized components 

47 """ 

48 logger.debug("Creating pipeline components") 

49 

50 # Create resource manager if not provided 

51 if not resource_manager: 

52 resource_manager = ResourceManager() 

53 resource_manager.register_signal_handlers() 

54 

55 # Create state manager if not provided 

56 if not state_manager: 

57 state_manager = StateManager(settings.global_config.state_management) 

58 

59 # Create core services 

60 chunking_service = ChunkingService( 

61 config=settings.global_config, settings=settings 

62 ) 

63 embedding_service = EmbeddingService(settings) 

64 

65 # Create thread pool executor for chunking 

66 chunk_executor = concurrent.futures.ThreadPoolExecutor( 

67 max_workers=config.max_chunk_workers 

68 ) 

69 resource_manager.set_chunk_executor(chunk_executor) 

70 

71 # Create performance monitor 

72 metrics_dir = Path.cwd() / "metrics" 

73 metrics_dir.mkdir(parents=True, exist_ok=True) 

74 IngestionMonitor(str(metrics_dir.absolute())) 

75 

76 # Calculate upsert batch size 

77 upsert_batch_size = ( 

78 int(config.upsert_batch_size) 

79 if config.upsert_batch_size is not None 

80 else embedding_service.batch_size 

81 ) 

82 

83 # Create workers 

84 chunking_worker = ChunkingWorker( 

85 chunking_service=chunking_service, 

86 chunk_executor=chunk_executor, 

87 max_workers=config.max_chunk_workers, 

88 queue_size=config.queue_size, 

89 shutdown_event=resource_manager.shutdown_event, 

90 ) 

91 

92 embedding_worker = EmbeddingWorker( 

93 embedding_service=embedding_service, 

94 max_workers=config.max_embed_workers, 

95 queue_size=config.queue_size, 

96 shutdown_event=resource_manager.shutdown_event, 

97 ) 

98 

99 upsert_worker = UpsertWorker( 

100 qdrant_manager=qdrant_manager, 

101 batch_size=upsert_batch_size, 

102 max_workers=config.max_upsert_workers, 

103 queue_size=config.queue_size, 

104 shutdown_event=resource_manager.shutdown_event, 

105 ) 

106 

107 # Create document pipeline 

108 document_pipeline = DocumentPipeline( 

109 chunking_worker=chunking_worker, 

110 embedding_worker=embedding_worker, 

111 upsert_worker=upsert_worker, 

112 ) 

113 

114 # Create source processor 

115 source_processor = SourceProcessor( 

116 shutdown_event=resource_manager.shutdown_event, 

117 file_conversion_config=( 

118 settings.global_config.file_conversion 

119 if settings.global_config 

120 else None 

121 ), 

122 ) 

123 

124 # Create source filter 

125 source_filter = SourceFilter() 

126 

127 # Create components container 

128 components = PipelineComponents( 

129 document_pipeline=document_pipeline, 

130 source_processor=source_processor, 

131 source_filter=source_filter, 

132 state_manager=state_manager, 

133 ) 

134 

135 logger.debug("Pipeline components created successfully") 

136 return components