Coverage for src/qdrant_loader/connectors/localfile/connector.py: 69%

80 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1import os 

2from datetime import UTC, datetime 

3from urllib.parse import unquote, urlparse 

4 

5from qdrant_loader.connectors.base import BaseConnector 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.file_conversion import ( 

8 FileConversionConfig, 

9 FileConversionError, 

10 FileConverter, 

11 FileDetector, 

12) 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15from .config import LocalFileConfig 

16from .file_processor import LocalFileFileProcessor 

17from .metadata_extractor import LocalFileMetadataExtractor 

18 

19 

20class LocalFileConnector(BaseConnector): 

21 """Connector for ingesting local files.""" 

22 

23 def __init__(self, config: LocalFileConfig): 

24 super().__init__(config) 

25 self.config = config 

26 # Parse base_url (file://...) to get the local path with Windows support 

27 parsed = urlparse(str(config.base_url)) 

28 self.base_path = self._fix_windows_file_path(parsed.path) 

29 self.file_processor = LocalFileFileProcessor(config, self.base_path) 

30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path) 

31 self.logger = LoggingConfig.get_logger(__name__) 

32 self._initialized = True 

33 

34 # Initialize file conversion components if enabled 

35 self.file_converter = None 

36 self.file_detector = None 

37 if self.config.enable_file_conversion: 

38 self.logger.debug("File conversion enabled for LocalFile connector") 

39 # File conversion config will be set from global config during ingestion 

40 self.file_detector = FileDetector() 

41 # Update file processor with file detector 

42 self.file_processor = LocalFileFileProcessor( 

43 config, self.base_path, self.file_detector 

44 ) 

45 else: 

46 self.logger.debug("File conversion disabled for LocalFile connector") 

47 

48 def _fix_windows_file_path(self, path: str) -> str: 

49 """Fix Windows file path from URL parsing. 

50 

51 urlparse() adds a leading slash to Windows drive letters, e.g.: 

52 file:///C:/Users/... -> path = "/C:/Users/..." 

53 This method removes the leading slash for Windows paths and handles URL decoding. 

54 

55 Args: 

56 path: Raw path from urlparse() 

57 

58 Returns: 

59 Fixed path suitable for the current platform 

60 """ 

61 # First decode URL encoding (e.g., %20 -> space) 

62 path = unquote(path) 

63 

64 # Handle Windows paths: remove leading slash if it's a drive letter 

65 if len(path) >= 3 and path[0] == "/" and path[2] == ":": 

66 # This looks like a Windows path with leading slash: "/C:/..." or "/C:" -> "C:/..." or "C:" 

67 path = path[1:] 

68 

69 return path 

70 

71 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

72 """Set file conversion configuration from global config. 

73 

74 Args: 

75 file_conversion_config: Global file conversion configuration 

76 """ 

77 if self.config.enable_file_conversion: 

78 self.file_converter = FileConverter(file_conversion_config) 

79 self.logger.debug("File converter initialized with global config") 

80 

81 async def get_documents(self) -> list[Document]: 

82 """Get all documents from the local file source.""" 

83 documents = [] 

84 for root, _, files in os.walk(self.base_path): 

85 for file in files: 

86 file_path = os.path.join(root, file) 

87 if not self.file_processor.should_process_file(file_path): 

88 continue 

89 try: 

90 # Get relative path from base directory 

91 rel_path = os.path.relpath(file_path, self.base_path) 

92 

93 # Check if file needs conversion 

94 needs_conversion = ( 

95 self.config.enable_file_conversion 

96 and self.file_detector 

97 and self.file_converter 

98 and self.file_detector.is_supported_for_conversion(file_path) 

99 ) 

100 

101 if needs_conversion: 

102 self.logger.debug( 

103 "File needs conversion", 

104 file_path=rel_path.replace("\\", "/"), 

105 ) 

106 try: 

107 # Convert file to markdown 

108 assert self.file_converter is not None # Type checker hint 

109 content = self.file_converter.convert_file(file_path) 

110 content_type = "md" # Converted files are markdown 

111 conversion_method = "markitdown" 

112 conversion_failed = False 

113 self.logger.info( 

114 "File conversion successful", 

115 file_path=rel_path.replace("\\", "/"), 

116 ) 

117 except FileConversionError as e: 

118 self.logger.warning( 

119 "File conversion failed, creating fallback document", 

120 file_path=rel_path.replace("\\", "/"), 

121 error=str(e), 

122 ) 

123 # Create fallback document 

124 assert self.file_converter is not None # Type checker hint 

125 content = self.file_converter.create_fallback_document( 

126 file_path, e 

127 ) 

128 content_type = "md" # Fallback is also markdown 

129 conversion_method = "markitdown_fallback" 

130 conversion_failed = True 

131 else: 

132 # Read file content normally 

133 with open(file_path, encoding="utf-8", errors="ignore") as f: 

134 content = f.read() 

135 # Get file extension without the dot 

136 content_type = os.path.splitext(file)[1].lower().lstrip(".") 

137 conversion_method = None 

138 conversion_failed = False 

139 

140 # Get file modification time 

141 file_mtime = os.path.getmtime(file_path) 

142 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC) 

143 

144 metadata = self.metadata_extractor.extract_all_metadata( 

145 file_path, content 

146 ) 

147 

148 # Add file conversion metadata if applicable 

149 if needs_conversion: 

150 metadata.update( 

151 { 

152 "conversion_method": conversion_method, 

153 "conversion_failed": conversion_failed, 

154 "original_file_type": os.path.splitext(file)[1] 

155 .lower() 

156 .lstrip("."), 

157 } 

158 ) 

159 

160 self.logger.debug( 

161 f"Processed local file: {rel_path.replace('\\', '/')}" 

162 ) 

163 

164 # Create consistent URL with forward slashes for cross-platform compatibility 

165 normalized_path = os.path.realpath(file_path).replace("\\", "/") 

166 doc = Document( 

167 title=os.path.basename(file_path), 

168 content=content, 

169 content_type=content_type, 

170 metadata=metadata, 

171 source_type="localfile", 

172 source=self.config.source, 

173 url=f"file://{normalized_path}", 

174 is_deleted=False, 

175 updated_at=updated_at, 

176 ) 

177 documents.append(doc) 

178 except Exception as e: 

179 self.logger.error( 

180 "Failed to process file", 

181 file_path=file_path.replace("\\", "/"), 

182 error=str(e), 

183 ) 

184 continue 

185 return documents