Coverage for src / qdrant_loader / core / pipeline / source_processor.py: 62%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Source processor for handling different source types.""" 

2 

3import asyncio 

4from collections.abc import AsyncIterator, Callable, Mapping 

5from datetime import datetime 

6 

7from qdrant_loader.config.source_config import SourceConfig 

8from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.file_conversion import FileConversionConfig 

11from qdrant_loader.utils.logging import LoggingConfig 

12from qdrant_loader.utils.sensitive import sanitize_exception_message 

13 

14logger = LoggingConfig.get_logger(__name__) 

15 

16 

17class SourceProcessor: 

18 """Handles processing of different source types.""" 

19 

20 def __init__( 

21 self, 

22 shutdown_event: asyncio.Event | None = None, 

23 file_conversion_config: FileConversionConfig | None = None, 

24 ): 

25 self.shutdown_event = shutdown_event or asyncio.Event() 

26 self.file_conversion_config = file_conversion_config 

27 

28 async def process_source_type( 

29 self, 

30 source_configs: Mapping[str, SourceConfig], 

31 connector_factory: Callable[[SourceConfig], BaseConnector], 

32 source_type: str, 

33 ) -> list[Document]: 

34 """Process documents from a specific source type. 

35 

36 Args: 

37 source_configs: Mapping of source name to source configuration 

38 connector_factory: Factory function that creates a connector from a source config 

39 source_type: The type of source being processed 

40 

41 Returns: 

42 List of documents from all sources of this type 

43 """ 

44 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}") 

45 

46 all_documents = [] 

47 

48 for source_name, source_config in source_configs.items(): 

49 if self.shutdown_event.is_set(): 

50 logger.info( 

51 f"Shutdown requested, skipping {source_type} source: {source_name}" 

52 ) 

53 break 

54 

55 try: 

56 logger.debug(f"Processing {source_type} source: {source_name}") 

57 

58 # Create connector instance and use as async context manager 

59 connector = connector_factory(source_config) 

60 

61 # Set file conversion config if available and connector supports it 

62 if ( 

63 self.file_conversion_config 

64 and hasattr(connector, "set_file_conversion_config") 

65 and hasattr(source_config, "enable_file_conversion") 

66 and source_config.enable_file_conversion 

67 ): 

68 logger.debug( 

69 f"Setting file conversion config for {source_type} source: {source_name}" 

70 ) 

71 connector.set_file_conversion_config(self.file_conversion_config) 

72 

73 # Use the connector as an async context manager to ensure proper initialization 

74 async with connector: 

75 documents = [] 

76 if isinstance(connector, BaseConnector): 

77 try: 

78 async for document in connector.stream_documents(): 

79 documents.append(document) 

80 except NotImplementedError: 

81 # Connector does not implement streaming; fall back to eager fetch 

82 documents = await connector.get_documents() 

83 else: 

84 documents = await connector.get_documents() 

85 

86 logger.debug( 

87 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}" 

88 ) 

89 all_documents.extend(documents) 

90 

91 except ConnectorConfigurationError: 

92 # Fatal configuration error – re-raise immediately so the 

93 # pipeline stops with a clear message instead of silently 

94 # producing 0 documents. 

95 raise 

96 except Exception as e: 

97 safe_error = sanitize_exception_message(e) 

98 logger.error( 

99 f"Failed to process {source_type} source {source_name}: {safe_error}", 

100 error_type=type(e).__name__, 

101 ) 

102 # Continue processing other sources even if one fails 

103 continue 

104 

105 if all_documents: 

106 logger.info( 

107 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources" 

108 ) 

109 return all_documents 

110 

111 async def stream_source_documents( 

112 self, 

113 source_configs: Mapping[str, SourceConfig], 

114 connector_factory: Callable[[SourceConfig], BaseConnector], 

115 source_type: str, 

116 since: datetime | None = None, 

117 ) -> AsyncIterator[Document]: 

118 """Stream documents from a specific source type (WS-1). 

119 

120 Yields documents one at a time as they are fetched from the source. 

121 """ 

122 logger.debug(f"Streaming {source_type} sources: {list(source_configs.keys())}") 

123 

124 for source_name, source_config in source_configs.items(): 

125 if self.shutdown_event.is_set(): 

126 logger.info( 

127 f"Shutdown requested, skipping {source_type} source: {source_name}" 

128 ) 

129 break 

130 

131 try: 

132 logger.debug(f"Streaming {source_type} source: {source_name}") 

133 

134 connector = connector_factory(source_config) 

135 

136 if ( 

137 self.file_conversion_config 

138 and hasattr(connector, "set_file_conversion_config") 

139 and hasattr(source_config, "enable_file_conversion") 

140 and source_config.enable_file_conversion 

141 ): 

142 logger.debug( 

143 f"Setting file conversion config for {source_type} source: {source_name}" 

144 ) 

145 connector.set_file_conversion_config(self.file_conversion_config) 

146 

147 async with connector: 

148 document_count = 0 

149 async for document in connector.stream_documents(since=since): 

150 yield document 

151 document_count += 1 

152 if document_count % 100 == 0: 

153 logger.debug( 

154 f"Streamed {document_count} documents from {source_type} source: {source_name}" 

155 ) 

156 

157 if document_count > 0: 

158 logger.info( 

159 f"✅ Streamed {document_count} documents from {source_type} source: {source_name}" 

160 ) 

161 

162 except ConnectorConfigurationError: 

163 raise 

164 except Exception as e: 

165 safe_error = sanitize_exception_message(e) 

166 logger.error( 

167 f"Failed to stream {source_type} source {source_name}: {safe_error}", 

168 error_type=type(e).__name__, 

169 ) 

170 continue