Coverage for src / qdrant_loader / core / pipeline / source_processor.py: 97%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Source processor for handling different source types.""" 

2 

3import asyncio 

4from collections.abc import Callable, Mapping 

5 

6from qdrant_loader.config.source_config import SourceConfig 

7from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.file_conversion import FileConversionConfig 

10from qdrant_loader.utils.logging import LoggingConfig 

11from qdrant_loader.utils.sensitive import sanitize_exception_message 

12 

13logger = LoggingConfig.get_logger(__name__) 

14 

15 

16class SourceProcessor: 

17 """Handles processing of different source types.""" 

18 

19 def __init__( 

20 self, 

21 shutdown_event: asyncio.Event | None = None, 

22 file_conversion_config: FileConversionConfig | None = None, 

23 ): 

24 self.shutdown_event = shutdown_event or asyncio.Event() 

25 self.file_conversion_config = file_conversion_config 

26 

27 async def process_source_type( 

28 self, 

29 source_configs: Mapping[str, SourceConfig], 

30 connector_factory: Callable[[SourceConfig], BaseConnector], 

31 source_type: str, 

32 ) -> list[Document]: 

33 """Process documents from a specific source type. 

34 

35 Args: 

36 source_configs: Mapping of source name to source configuration 

37 connector_factory: Factory function that creates a connector from a source config 

38 source_type: The type of source being processed 

39 

40 Returns: 

41 List of documents from all sources of this type 

42 """ 

43 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}") 

44 

45 all_documents = [] 

46 

47 for source_name, source_config in source_configs.items(): 

48 if self.shutdown_event.is_set(): 

49 logger.info( 

50 f"Shutdown requested, skipping {source_type} source: {source_name}" 

51 ) 

52 break 

53 

54 try: 

55 logger.debug(f"Processing {source_type} source: {source_name}") 

56 

57 # Create connector instance and use as async context manager 

58 connector = connector_factory(source_config) 

59 

60 # Set file conversion config if available and connector supports it 

61 if ( 

62 self.file_conversion_config 

63 and hasattr(connector, "set_file_conversion_config") 

64 and hasattr(source_config, "enable_file_conversion") 

65 and source_config.enable_file_conversion 

66 ): 

67 logger.debug( 

68 f"Setting file conversion config for {source_type} source: {source_name}" 

69 ) 

70 connector.set_file_conversion_config(self.file_conversion_config) 

71 

72 # Use the connector as an async context manager to ensure proper initialization 

73 async with connector: 

74 # Get documents from this source 

75 documents = await connector.get_documents() 

76 

77 logger.debug( 

78 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}" 

79 ) 

80 all_documents.extend(documents) 

81 

82 except ConnectorConfigurationError: 

83 # Fatal configuration error – re-raise immediately so the 

84 # pipeline stops with a clear message instead of silently 

85 # producing 0 documents. 

86 raise 

87 except Exception as e: 

88 safe_error = sanitize_exception_message(e) 

89 logger.error( 

90 f"Failed to process {source_type} source {source_name}: {safe_error}", 

91 error_type=type(e).__name__, 

92 ) 

93 # Continue processing other sources even if one fails 

94 continue 

95 

96 if all_documents: 

97 logger.info( 

98 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources" 

99 ) 

100 return all_documents