Coverage for src / qdrant_loader / core / pipeline / source_processor.py: 62%
74 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Source processor for handling different source types."""
3import asyncio
4from collections.abc import AsyncIterator, Callable, Mapping
5from datetime import datetime
7from qdrant_loader.config.source_config import SourceConfig
8from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.file_conversion import FileConversionConfig
11from qdrant_loader.utils.logging import LoggingConfig
12from qdrant_loader.utils.sensitive import sanitize_exception_message
14logger = LoggingConfig.get_logger(__name__)
17class SourceProcessor:
18 """Handles processing of different source types."""
20 def __init__(
21 self,
22 shutdown_event: asyncio.Event | None = None,
23 file_conversion_config: FileConversionConfig | None = None,
24 ):
25 self.shutdown_event = shutdown_event or asyncio.Event()
26 self.file_conversion_config = file_conversion_config
28 async def process_source_type(
29 self,
30 source_configs: Mapping[str, SourceConfig],
31 connector_factory: Callable[[SourceConfig], BaseConnector],
32 source_type: str,
33 ) -> list[Document]:
34 """Process documents from a specific source type.
36 Args:
37 source_configs: Mapping of source name to source configuration
38 connector_factory: Factory function that creates a connector from a source config
39 source_type: The type of source being processed
41 Returns:
42 List of documents from all sources of this type
43 """
44 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}")
46 all_documents = []
48 for source_name, source_config in source_configs.items():
49 if self.shutdown_event.is_set():
50 logger.info(
51 f"Shutdown requested, skipping {source_type} source: {source_name}"
52 )
53 break
55 try:
56 logger.debug(f"Processing {source_type} source: {source_name}")
58 # Create connector instance and use as async context manager
59 connector = connector_factory(source_config)
61 # Set file conversion config if available and connector supports it
62 if (
63 self.file_conversion_config
64 and hasattr(connector, "set_file_conversion_config")
65 and hasattr(source_config, "enable_file_conversion")
66 and source_config.enable_file_conversion
67 ):
68 logger.debug(
69 f"Setting file conversion config for {source_type} source: {source_name}"
70 )
71 connector.set_file_conversion_config(self.file_conversion_config)
73 # Use the connector as an async context manager to ensure proper initialization
74 async with connector:
75 documents = []
76 if isinstance(connector, BaseConnector):
77 try:
78 async for document in connector.stream_documents():
79 documents.append(document)
80 except NotImplementedError:
81 # Connector does not implement streaming; fall back to eager fetch
82 documents = await connector.get_documents()
83 else:
84 documents = await connector.get_documents()
86 logger.debug(
87 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}"
88 )
89 all_documents.extend(documents)
91 except ConnectorConfigurationError:
92 # Fatal configuration error – re-raise immediately so the
93 # pipeline stops with a clear message instead of silently
94 # producing 0 documents.
95 raise
96 except Exception as e:
97 safe_error = sanitize_exception_message(e)
98 logger.error(
99 f"Failed to process {source_type} source {source_name}: {safe_error}",
100 error_type=type(e).__name__,
101 )
102 # Continue processing other sources even if one fails
103 continue
105 if all_documents:
106 logger.info(
107 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources"
108 )
109 return all_documents
111 async def stream_source_documents(
112 self,
113 source_configs: Mapping[str, SourceConfig],
114 connector_factory: Callable[[SourceConfig], BaseConnector],
115 source_type: str,
116 since: datetime | None = None,
117 ) -> AsyncIterator[Document]:
118 """Stream documents from a specific source type (WS-1).
120 Yields documents one at a time as they are fetched from the source.
121 """
122 logger.debug(f"Streaming {source_type} sources: {list(source_configs.keys())}")
124 for source_name, source_config in source_configs.items():
125 if self.shutdown_event.is_set():
126 logger.info(
127 f"Shutdown requested, skipping {source_type} source: {source_name}"
128 )
129 break
131 try:
132 logger.debug(f"Streaming {source_type} source: {source_name}")
134 connector = connector_factory(source_config)
136 if (
137 self.file_conversion_config
138 and hasattr(connector, "set_file_conversion_config")
139 and hasattr(source_config, "enable_file_conversion")
140 and source_config.enable_file_conversion
141 ):
142 logger.debug(
143 f"Setting file conversion config for {source_type} source: {source_name}"
144 )
145 connector.set_file_conversion_config(self.file_conversion_config)
147 async with connector:
148 document_count = 0
149 async for document in connector.stream_documents(since=since):
150 yield document
151 document_count += 1
152 if document_count % 100 == 0:
153 logger.debug(
154 f"Streamed {document_count} documents from {source_type} source: {source_name}"
155 )
157 if document_count > 0:
158 logger.info(
159 f"✅ Streamed {document_count} documents from {source_type} source: {source_name}"
160 )
162 except ConnectorConfigurationError:
163 raise
164 except Exception as e:
165 safe_error = sanitize_exception_message(e)
166 logger.error(
167 f"Failed to stream {source_type} source {source_name}: {safe_error}",
168 error_type=type(e).__name__,
169 )
170 continue