Coverage for src/qdrant_loader/core/pipeline/factory.py: 93%
42 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Factory for creating pipeline components."""
3import concurrent.futures
4from pathlib import Path
6from qdrant_loader.config import Settings
7from qdrant_loader.core.chunking.chunking_service import ChunkingService
8from qdrant_loader.core.embedding.embedding_service import EmbeddingService
9from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
10from qdrant_loader.core.qdrant_manager import QdrantManager
11from qdrant_loader.core.state.state_manager import StateManager
12from qdrant_loader.utils.logging import LoggingConfig
14from .config import PipelineConfig
15from .document_pipeline import DocumentPipeline
16from .orchestrator import PipelineComponents
17from .resource_manager import ResourceManager
18from .source_filter import SourceFilter
19from .source_processor import SourceProcessor
20from .workers import ChunkingWorker, EmbeddingWorker, UpsertWorker
22logger = LoggingConfig.get_logger(__name__)
25class PipelineComponentsFactory:
26 """Factory for creating pipeline components."""
28 def create_components(
29 self,
30 settings: Settings,
31 config: PipelineConfig,
32 qdrant_manager: QdrantManager,
33 state_manager: StateManager | None = None,
34 resource_manager: ResourceManager | None = None,
35 ) -> PipelineComponents:
36 """Create all pipeline components.
38 Args:
39 settings: Application settings
40 config: Pipeline configuration
41 qdrant_manager: QdrantManager instance
42 state_manager: Optional state manager (will create if not provided)
43 resource_manager: Optional resource manager (will create if not provided)
45 Returns:
46 PipelineComponents with all initialized components
47 """
48 logger.debug("Creating pipeline components")
50 # Create resource manager if not provided
51 if not resource_manager:
52 resource_manager = ResourceManager()
53 resource_manager.register_signal_handlers()
55 # Create state manager if not provided
56 if not state_manager:
57 state_manager = StateManager(settings.global_config.state_management)
59 # Create core services
60 chunking_service = ChunkingService(
61 config=settings.global_config, settings=settings
62 )
63 embedding_service = EmbeddingService(settings)
65 # Create thread pool executor for chunking
66 chunk_executor = concurrent.futures.ThreadPoolExecutor(
67 max_workers=config.max_chunk_workers
68 )
69 resource_manager.set_chunk_executor(chunk_executor)
71 # Create performance monitor
72 metrics_dir = Path.cwd() / "metrics"
73 metrics_dir.mkdir(parents=True, exist_ok=True)
74 IngestionMonitor(str(metrics_dir.absolute()))
76 # Calculate upsert batch size
77 upsert_batch_size = (
78 int(config.upsert_batch_size)
79 if config.upsert_batch_size is not None
80 else embedding_service.batch_size
81 )
83 # Create workers
84 chunking_worker = ChunkingWorker(
85 chunking_service=chunking_service,
86 chunk_executor=chunk_executor,
87 max_workers=config.max_chunk_workers,
88 queue_size=config.queue_size,
89 shutdown_event=resource_manager.shutdown_event,
90 )
92 embedding_worker = EmbeddingWorker(
93 embedding_service=embedding_service,
94 max_workers=config.max_embed_workers,
95 queue_size=config.queue_size,
96 shutdown_event=resource_manager.shutdown_event,
97 )
99 upsert_worker = UpsertWorker(
100 qdrant_manager=qdrant_manager,
101 batch_size=upsert_batch_size,
102 max_workers=config.max_upsert_workers,
103 queue_size=config.queue_size,
104 shutdown_event=resource_manager.shutdown_event,
105 )
107 # Create document pipeline
108 document_pipeline = DocumentPipeline(
109 chunking_worker=chunking_worker,
110 embedding_worker=embedding_worker,
111 upsert_worker=upsert_worker,
112 )
114 # Create source processor
115 source_processor = SourceProcessor(
116 shutdown_event=resource_manager.shutdown_event,
117 file_conversion_config=(
118 settings.global_config.file_conversion
119 if settings.global_config
120 else None
121 ),
122 )
124 # Create source filter
125 source_filter = SourceFilter()
127 # Create components container
128 components = PipelineComponents(
129 document_pipeline=document_pipeline,
130 source_processor=source_processor,
131 source_filter=source_filter,
132 state_manager=state_manager,
133 )
135 logger.debug("Pipeline components created successfully")
136 return components