Coverage for src/qdrant_loader/connectors/localfile/connector.py: 66%

74 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import os 

2from datetime import UTC, datetime 

3from urllib.parse import urlparse 

4 

5from qdrant_loader.connectors.base import BaseConnector 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.file_conversion import ( 

8 FileConverter, 

9 FileDetector, 

10 FileConversionConfig, 

11 FileConversionError, 

12) 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15from .config import LocalFileConfig 

16from .file_processor import LocalFileFileProcessor 

17from .metadata_extractor import LocalFileMetadataExtractor 

18 

19 

20class LocalFileConnector(BaseConnector): 

21 """Connector for ingesting local files.""" 

22 

23 def __init__(self, config: LocalFileConfig): 

24 super().__init__(config) 

25 self.config = config 

26 # Parse base_url (file://...) to get the local path 

27 parsed = urlparse(str(config.base_url)) 

28 self.base_path = parsed.path 

29 self.file_processor = LocalFileFileProcessor(config, self.base_path) 

30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path) 

31 self.logger = LoggingConfig.get_logger(__name__) 

32 self._initialized = True 

33 

34 # Initialize file conversion components if enabled 

35 self.file_converter = None 

36 self.file_detector = None 

37 if self.config.enable_file_conversion: 

38 self.logger.debug("File conversion enabled for LocalFile connector") 

39 # File conversion config will be set from global config during ingestion 

40 self.file_detector = FileDetector() 

41 # Update file processor with file detector 

42 self.file_processor = LocalFileFileProcessor( 

43 config, self.base_path, self.file_detector 

44 ) 

45 else: 

46 self.logger.debug("File conversion disabled for LocalFile connector") 

47 

48 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

49 """Set file conversion configuration from global config. 

50 

51 Args: 

52 file_conversion_config: Global file conversion configuration 

53 """ 

54 if self.config.enable_file_conversion: 

55 self.file_converter = FileConverter(file_conversion_config) 

56 self.logger.debug("File converter initialized with global config") 

57 

58 async def get_documents(self) -> list[Document]: 

59 """Get all documents from the local file source.""" 

60 documents = [] 

61 for root, _, files in os.walk(self.base_path): 

62 for file in files: 

63 file_path = os.path.join(root, file) 

64 if not self.file_processor.should_process_file(file_path): 

65 continue 

66 try: 

67 # Get relative path from base directory 

68 rel_path = os.path.relpath(file_path, self.base_path) 

69 

70 # Check if file needs conversion 

71 needs_conversion = ( 

72 self.config.enable_file_conversion 

73 and self.file_detector 

74 and self.file_converter 

75 and self.file_detector.is_supported_for_conversion(file_path) 

76 ) 

77 

78 if needs_conversion: 

79 self.logger.debug("File needs conversion", file_path=rel_path) 

80 try: 

81 # Convert file to markdown 

82 assert self.file_converter is not None # Type checker hint 

83 content = self.file_converter.convert_file(file_path) 

84 content_type = "md" # Converted files are markdown 

85 conversion_method = "markitdown" 

86 conversion_failed = False 

87 self.logger.info( 

88 "File conversion successful", file_path=rel_path 

89 ) 

90 except FileConversionError as e: 

91 self.logger.warning( 

92 "File conversion failed, creating fallback document", 

93 file_path=rel_path, 

94 error=str(e), 

95 ) 

96 # Create fallback document 

97 assert self.file_converter is not None # Type checker hint 

98 content = self.file_converter.create_fallback_document( 

99 file_path, e 

100 ) 

101 content_type = "md" # Fallback is also markdown 

102 conversion_method = "markitdown_fallback" 

103 conversion_failed = True 

104 else: 

105 # Read file content normally 

106 with open(file_path, encoding="utf-8", errors="ignore") as f: 

107 content = f.read() 

108 # Get file extension without the dot 

109 content_type = os.path.splitext(file)[1].lower().lstrip(".") 

110 conversion_method = None 

111 conversion_failed = False 

112 

113 # Get file modification time 

114 file_mtime = os.path.getmtime(file_path) 

115 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC) 

116 

117 metadata = self.metadata_extractor.extract_all_metadata( 

118 file_path, content 

119 ) 

120 

121 # Add file conversion metadata if applicable 

122 if needs_conversion: 

123 metadata.update( 

124 { 

125 "conversion_method": conversion_method, 

126 "conversion_failed": conversion_failed, 

127 "original_file_type": os.path.splitext(file)[1] 

128 .lower() 

129 .lstrip("."), 

130 } 

131 ) 

132 

133 self.logger.debug(f"Processed local file: {rel_path}") 

134 

135 doc = Document( 

136 title=os.path.basename(file_path), 

137 content=content, 

138 content_type=content_type, 

139 metadata=metadata, 

140 source_type="localfile", 

141 source=self.config.source, 

142 url=f"file://{os.path.realpath(file_path)}", 

143 is_deleted=False, 

144 updated_at=updated_at, 

145 ) 

146 documents.append(doc) 

147 except Exception as e: 

148 self.logger.error( 

149 "Failed to process file", file_path=file_path, error=str(e) 

150 ) 

151 continue 

152 return documents