Coverage for src / qdrant_loader / core / pipeline / source_processor.py: 97%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Source processor for handling different source types."""
3import asyncio
4from collections.abc import Callable, Mapping
6from qdrant_loader.config.source_config import SourceConfig
7from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.file_conversion import FileConversionConfig
10from qdrant_loader.utils.logging import LoggingConfig
11from qdrant_loader.utils.sensitive import sanitize_exception_message
13logger = LoggingConfig.get_logger(__name__)
16class SourceProcessor:
17 """Handles processing of different source types."""
19 def __init__(
20 self,
21 shutdown_event: asyncio.Event | None = None,
22 file_conversion_config: FileConversionConfig | None = None,
23 ):
24 self.shutdown_event = shutdown_event or asyncio.Event()
25 self.file_conversion_config = file_conversion_config
27 async def process_source_type(
28 self,
29 source_configs: Mapping[str, SourceConfig],
30 connector_factory: Callable[[SourceConfig], BaseConnector],
31 source_type: str,
32 ) -> list[Document]:
33 """Process documents from a specific source type.
35 Args:
36 source_configs: Mapping of source name to source configuration
37 connector_factory: Factory function that creates a connector from a source config
38 source_type: The type of source being processed
40 Returns:
41 List of documents from all sources of this type
42 """
43 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}")
45 all_documents = []
47 for source_name, source_config in source_configs.items():
48 if self.shutdown_event.is_set():
49 logger.info(
50 f"Shutdown requested, skipping {source_type} source: {source_name}"
51 )
52 break
54 try:
55 logger.debug(f"Processing {source_type} source: {source_name}")
57 # Create connector instance and use as async context manager
58 connector = connector_factory(source_config)
60 # Set file conversion config if available and connector supports it
61 if (
62 self.file_conversion_config
63 and hasattr(connector, "set_file_conversion_config")
64 and hasattr(source_config, "enable_file_conversion")
65 and source_config.enable_file_conversion
66 ):
67 logger.debug(
68 f"Setting file conversion config for {source_type} source: {source_name}"
69 )
70 connector.set_file_conversion_config(self.file_conversion_config)
72 # Use the connector as an async context manager to ensure proper initialization
73 async with connector:
74 # Get documents from this source
75 documents = await connector.get_documents()
77 logger.debug(
78 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}"
79 )
80 all_documents.extend(documents)
82 except ConnectorConfigurationError:
83 # Fatal configuration error – re-raise immediately so the
84 # pipeline stops with a clear message instead of silently
85 # producing 0 documents.
86 raise
87 except Exception as e:
88 safe_error = sanitize_exception_message(e)
89 logger.error(
90 f"Failed to process {source_type} source {source_name}: {safe_error}",
91 error_type=type(e).__name__,
92 )
93 # Continue processing other sources even if one fails
94 continue
96 if all_documents:
97 logger.info(
98 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources"
99 )
100 return all_documents