Coverage for src/qdrant_loader/core/pipeline/source_processor.py: 37%

35 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Source processor for handling different source types.""" 

2 

3import asyncio 

4from collections.abc import Mapping 

5 

6from qdrant_loader.config.source_config import SourceConfig 

7from qdrant_loader.connectors.base import BaseConnector 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.file_conversion import FileConversionConfig 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12logger = LoggingConfig.get_logger(__name__) 

13 

14 

15class SourceProcessor: 

16 """Handles processing of different source types.""" 

17 

18 def __init__( 

19 self, 

20 shutdown_event: asyncio.Event | None = None, 

21 file_conversion_config: FileConversionConfig | None = None, 

22 ): 

23 self.shutdown_event = shutdown_event or asyncio.Event() 

24 self.file_conversion_config = file_conversion_config 

25 

26 async def process_source_type( 

27 self, 

28 source_configs: Mapping[str, SourceConfig], 

29 connector_class: type[BaseConnector], 

30 source_type: str, 

31 ) -> list[Document]: 

32 """Process documents from a specific source type. 

33 

34 Args: 

35 source_configs: Mapping of source name to source configuration 

36 connector_class: The connector class to use for this source type 

37 source_type: The type of source being processed 

38 

39 Returns: 

40 List of documents from all sources of this type 

41 """ 

42 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}") 

43 

44 all_documents = [] 

45 

46 for source_name, source_config in source_configs.items(): 

47 if self.shutdown_event.is_set(): 

48 logger.info( 

49 f"Shutdown requested, skipping {source_type} source: {source_name}" 

50 ) 

51 break 

52 

53 try: 

54 logger.debug(f"Processing {source_type} source: {source_name}") 

55 

56 # Create connector instance and use as async context manager 

57 connector = connector_class(source_config) 

58 

59 # Set file conversion config if available and connector supports it 

60 if ( 

61 self.file_conversion_config 

62 and hasattr(connector, "set_file_conversion_config") 

63 and hasattr(source_config, "enable_file_conversion") 

64 and source_config.enable_file_conversion 

65 ): 

66 logger.debug( 

67 f"Setting file conversion config for {source_type} source: {source_name}" 

68 ) 

69 connector.set_file_conversion_config(self.file_conversion_config) 

70 

71 # Use the connector as an async context manager to ensure proper initialization 

72 async with connector: 

73 # Get documents from this source 

74 documents = await connector.get_documents() 

75 

76 logger.debug( 

77 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}" 

78 ) 

79 all_documents.extend(documents) 

80 

81 except Exception as e: 

82 logger.error( 

83 f"Failed to process {source_type} source {source_name}: {e}", 

84 exc_info=True, 

85 ) 

86 # Continue processing other sources even if one fails 

87 continue 

88 

89 if all_documents: 

90 logger.info( 

91 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources" 

92 ) 

93 return all_documents