Coverage for src / qdrant_loader / connectors / localfile / connector.py: 78%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:48 +0000

1import os 

2from datetime import UTC, datetime 

3from urllib.parse import unquote, urlparse 

4 

5from qdrant_loader.connectors.base import BaseConnector 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.file_conversion import ( 

8 FileConversionConfig, 

9 FileConversionError, 

10 FileConverter, 

11 FileDetector, 

12) 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15from .config import LocalFileConfig 

16from .file_processor import LocalFileFileProcessor 

17from .metadata_extractor import LocalFileMetadataExtractor 

18 

19 

20class LocalFileConnector(BaseConnector): 

21 """Connector for ingesting local files.""" 

22 

23 def __init__(self, config: LocalFileConfig): 

24 super().__init__(config) 

25 self.config = config 

26 # Parse base_url (file://...) to get the local path with Windows support 

27 parsed = urlparse(str(config.base_url)) 

28 self.base_path = self._fix_windows_file_path(parsed.path) 

29 self.file_processor = LocalFileFileProcessor(config, self.base_path) 

30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path) 

31 self.logger = LoggingConfig.get_logger(__name__) 

32 self._initialized = True 

33 

34 # Initialize file conversion components if enabled 

35 self.file_converter = None 

36 self.file_detector = None 

37 if self.config.enable_file_conversion: 

38 self.logger.debug("File conversion enabled for LocalFile connector") 

39 # File conversion config will be set from global config during ingestion 

40 self.file_detector = FileDetector() 

41 # Update file processor with file detector 

42 self.file_processor = LocalFileFileProcessor( 

43 config, self.base_path, self.file_detector 

44 ) 

45 else: 

46 self.logger.debug("File conversion disabled for LocalFile connector") 

47 

48 def _fix_windows_file_path(self, path: str) -> str: 

49 """Fix Windows file path from URL parsing. 

50 

51 urlparse() adds a leading slash to Windows drive letters, e.g.: 

52 file:///C:/Users/... -> path = "/C:/Users/..." 

53 This method removes the leading slash for Windows paths and handles URL decoding. 

54 

55 Args: 

56 path: Raw path from urlparse() 

57 

58 Returns: 

59 Fixed path suitable for the current platform 

60 """ 

61 # First decode URL encoding (e.g., %20 -> space) 

62 path = unquote(path) 

63 

64 # Handle Windows paths: remove leading slash if it's a drive letter 

65 if len(path) >= 3 and path[0] == "/" and path[2] == ":": 

66 # This looks like a Windows path with leading slash: "/C:/..." or "/C:" -> "C:/..." or "C:" 

67 path = path[1:] 

68 

69 return path 

70 

71 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

72 """Set file conversion configuration from global config. 

73 

74 Args: 

75 file_conversion_config: Global file conversion configuration 

76 """ 

77 if self.config.enable_file_conversion: 

78 self.file_converter = FileConverter(file_conversion_config) 

79 self.logger.debug("File converter initialized with global config") 

80 

81 async def get_documents(self) -> list[Document]: 

82 """Get all documents from the local file source.""" 

83 documents = [] 

84 for root, _, files in os.walk(self.base_path): 

85 for file in files: 

86 file_path = os.path.join(root, file) 

87 if not self.file_processor.should_process_file(file_path): 

88 continue 

89 try: 

90 # Get relative path from base directory 

91 rel_path = os.path.relpath(file_path, self.base_path) 

92 file_extension = os.path.splitext(file)[1].lower() 

93 

94 if self.config.enable_file_conversion and file_extension in { 

95 ".doc", 

96 ".ppt", 

97 }: 

98 file_info = ( 

99 self.file_detector.get_file_type_info(file_path) 

100 if self.file_detector 

101 else { 

102 "mime_type": None, 

103 "file_extension": file_extension, 

104 } 

105 ) 

106 self.logger.warning( 

107 "Skipping file: old doc/ppt are not supported for MarkItDown conversion", 

108 file_path=rel_path.replace("\\", "/"), 

109 mime_type=file_info.get("mime_type"), 

110 file_extension=file_info.get("file_extension"), 

111 ) 

112 continue 

113 

114 # Check if file needs conversion 

115 needs_conversion = ( 

116 self.config.enable_file_conversion 

117 and self.file_detector 

118 and self.file_converter 

119 and self.file_detector.is_supported_for_conversion(file_path) 

120 ) 

121 

122 if needs_conversion: 

123 self.logger.debug( 

124 "File needs conversion", 

125 file_path=rel_path.replace("\\", "/"), 

126 ) 

127 try: 

128 # Convert file to markdown 

129 assert self.file_converter is not None # Type checker hint 

130 content = self.file_converter.convert_file(file_path) 

131 content_type = "md" # Converted files are markdown 

132 conversion_method = "markitdown" 

133 conversion_failed = False 

134 self.logger.info( 

135 "File conversion successful", 

136 file_path=rel_path.replace("\\", "/"), 

137 ) 

138 except FileConversionError as e: 

139 self.logger.warning( 

140 "File conversion failed, creating fallback document", 

141 file_path=rel_path.replace("\\", "/"), 

142 error=str(e), 

143 ) 

144 # Create fallback document 

145 assert self.file_converter is not None # Type checker hint 

146 content = self.file_converter.create_fallback_document( 

147 file_path, e 

148 ) 

149 content_type = "md" # Fallback is also markdown 

150 conversion_method = "markitdown_fallback" 

151 conversion_failed = True 

152 else: 

153 # Read file content normally 

154 with open(file_path, encoding="utf-8", errors="ignore") as f: 

155 content = f.read() 

156 # Get file extension without the dot 

157 content_type = os.path.splitext(file)[1].lower().lstrip(".") 

158 conversion_method = None 

159 conversion_failed = False 

160 

161 # Get file modification time 

162 file_mtime = os.path.getmtime(file_path) 

163 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC) 

164 

165 metadata = self.metadata_extractor.extract_all_metadata( 

166 file_path, content 

167 ) 

168 

169 # Add file conversion metadata if applicable 

170 if needs_conversion: 

171 metadata.update( 

172 { 

173 "conversion_method": conversion_method, 

174 "conversion_failed": conversion_failed, 

175 "original_file_type": os.path.splitext(file)[1] 

176 .lower() 

177 .lstrip("."), 

178 } 

179 ) 

180 

181 self.logger.debug( 

182 f"Processed local file: {rel_path.replace('\\', '/')}" 

183 ) 

184 

185 # Create consistent URL with forward slashes for cross-platform compatibility 

186 normalized_path = os.path.realpath(file_path).replace("\\", "/") 

187 doc = Document( 

188 title=os.path.basename(file_path), 

189 content=content, 

190 content_type=content_type, 

191 metadata=metadata, 

192 source_type="localfile", 

193 source=self.config.source, 

194 url=f"file://{normalized_path}", 

195 is_deleted=False, 

196 updated_at=updated_at, 

197 ) 

198 documents.append(doc) 

199 except Exception as e: 

200 self.logger.error( 

201 "Failed to process file", 

202 file_path=file_path.replace("\\", "/"), 

203 error=str(e), 

204 ) 

205 continue 

206 return documents