Coverage for src/qdrant_loader/core/file_conversion/file_converter.py: 99%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Main file conversion service using MarkItDown.""" 

2 

3import os 

4import signal 

5import sys 

6import warnings 

7from contextlib import contextmanager 

8from pathlib import Path 

9 

10# Windows compatibility fix: Monkey patch signal module for MarkItDown 

11if sys.platform == "win32" and not hasattr(signal, "SIGALRM"): 

12 # MarkItDown tries to use SIGALRM on Windows, so we provide a dummy 

13 signal.SIGALRM = 14 # Standard SIGALRM signal number on Unix 

14 signal.alarm = lambda _: None # No-op function for Windows 

15 

16from qdrant_loader.core.file_conversion.conversion_config import FileConversionConfig 

17from qdrant_loader.core.file_conversion.exceptions import ( 

18 ConversionTimeoutError, 

19 FileAccessError, 

20 FileSizeExceededError, 

21 MarkItDownError, 

22 UnsupportedFileTypeError, 

23) 

24from qdrant_loader.core.file_conversion.file_detector import FileDetector 

25from qdrant_loader.utils.logging import LoggingConfig 

26 

27logger = LoggingConfig.get_logger(__name__) 

28 

29 

30@contextmanager 

31def capture_openpyxl_warnings(logger_instance, file_path: str): 

32 """Context manager to capture openpyxl warnings and route them through our logging system.""" 

33 captured_warnings = [] 

34 

35 # Custom warning handler 

36 def warning_handler(message, category, filename, lineno, file=None, line=None): 

37 # Check if this is an openpyxl warning we want to capture 

38 if ( 

39 category == UserWarning 

40 and filename 

41 and "openpyxl" in filename 

42 and ( 

43 "Data Validation extension" in str(message) 

44 or "Conditional Formatting extension" in str(message) 

45 ) 

46 ): 

47 

48 # Extract the specific warning type 

49 warning_type = "Unknown Excel feature" 

50 if "Data Validation extension" in str(message): 

51 warning_type = "Data Validation" 

52 elif "Conditional Formatting extension" in str(message): 

53 warning_type = "Conditional Formatting" 

54 

55 # Track captured warning 

56 captured_warnings.append(warning_type) 

57 

58 # Log through our system instead of showing the raw warning 

59 logger_instance.info( 

60 "Excel feature not fully supported during conversion", 

61 file_path=file_path, 

62 feature_type=warning_type, 

63 source="openpyxl", 

64 ) 

65 else: 

66 # For non-openpyxl warnings, use the default behavior 

67 original_showwarning(message, category, filename, lineno, file, line) 

68 

69 # Store original warning handler 

70 original_showwarning = warnings.showwarning 

71 

72 try: 

73 # Install our custom warning handler 

74 warnings.showwarning = warning_handler 

75 yield 

76 

77 # Log summary if any warnings were captured 

78 if captured_warnings: 

79 logger_instance.info( 

80 "Excel conversion completed with unsupported features", 

81 file_path=file_path, 

82 total_warnings=len(captured_warnings), 

83 warning_types=list(set(captured_warnings)), 

84 source="openpyxl", 

85 ) 

86 finally: 

87 # Restore original warning handler 

88 warnings.showwarning = original_showwarning 

89 

90 

91class TimeoutHandler: 

92 """Context manager for handling conversion timeouts.""" 

93 

94 def __init__(self, timeout_seconds: int, file_path: str): 

95 self.timeout_seconds = timeout_seconds 

96 self.file_path = file_path 

97 self.old_handler = None 

98 self.timer = None 

99 

100 def _timeout_handler(self, signum=None, frame=None): 

101 """Signal handler for timeout.""" 

102 raise ConversionTimeoutError(self.timeout_seconds, self.file_path) 

103 

104 def _timeout_thread(self): 

105 """Thread-based timeout for Windows.""" 

106 import time 

107 

108 time.sleep(self.timeout_seconds) 

109 self._timeout_handler() 

110 

111 def __enter__(self): 

112 """Set up timeout handler (Unix signals or Windows threading).""" 

113 if sys.platform == "win32": 

114 # Windows doesn't support SIGALRM, use threading instead 

115 import threading 

116 

117 self.timer = threading.Thread(target=self._timeout_thread, daemon=True) 

118 self.timer.start() 

119 else: 

120 # Unix/Linux/macOS: use signal-based timeout 

121 if hasattr(signal, "SIGALRM"): 

122 self.old_handler = signal.signal(signal.SIGALRM, self._timeout_handler) 

123 signal.alarm(self.timeout_seconds) 

124 return self 

125 

126 def __exit__(self, exc_type, exc_val, exc_tb): 

127 """Clean up timeout handler.""" 

128 if sys.platform == "win32": 

129 # On Windows, we can't easily cancel the thread, but since it's daemon, 

130 # it will be cleaned up when the process exits 

131 # The timeout will simply not trigger if conversion completes first 

132 pass 

133 else: 

134 # Unix/Linux/macOS: clean up signal handler 

135 if hasattr(signal, "SIGALRM"): 

136 signal.alarm(0) # Cancel the alarm 

137 if self.old_handler is not None: 

138 signal.signal(signal.SIGALRM, self.old_handler) 

139 

140 

141class FileConverter: 

142 """Service for converting files to Markdown using MarkItDown.""" 

143 

144 def __init__(self, config: FileConversionConfig): 

145 """Initialize the file converter.""" 

146 self.config = config 

147 self.file_detector = FileDetector() 

148 self.logger = LoggingConfig.get_logger(__name__) 

149 self._markitdown = None 

150 

151 def _get_markitdown(self): 

152 """Get MarkItDown instance with lazy loading and LLM configuration.""" 

153 if self._markitdown is None: 

154 try: 

155 from markitdown import MarkItDown # type: ignore 

156 

157 # Configure MarkItDown with LLM settings if enabled 

158 if self.config.markitdown.enable_llm_descriptions: 

159 self.logger.debug( 

160 "Initializing MarkItDown with LLM configuration", 

161 llm_model=self.config.markitdown.llm_model, 

162 llm_endpoint=self.config.markitdown.llm_endpoint, 

163 ) 

164 

165 # Create LLM client based on endpoint 

166 llm_client = self._create_llm_client() 

167 

168 self._markitdown = MarkItDown( 

169 llm_client=llm_client, 

170 llm_model=self.config.markitdown.llm_model, 

171 ) 

172 self.logger.debug("MarkItDown initialized with LLM support") 

173 else: 

174 self._markitdown = MarkItDown() 

175 self.logger.debug("MarkItDown initialized without LLM support") 

176 

177 except ImportError as e: 

178 raise MarkItDownError( 

179 Exception("MarkItDown library not available") 

180 ) from e 

181 return self._markitdown 

182 

183 def _create_llm_client(self): 

184 """Create LLM client based on configuration.""" 

185 try: 

186 # Get API key from configuration 

187 api_key = self.config.markitdown.llm_api_key 

188 if not api_key: 

189 self.logger.warning( 

190 "No LLM API key configured for MarkItDown LLM integration" 

191 ) 

192 # Fallback to environment variable for backward compatibility 

193 api_key = os.getenv("OPENAI_API_KEY") or os.getenv( 

194 "LLM_API_KEY", "dummy-key" 

195 ) 

196 

197 # Check if it's an OpenAI-compatible endpoint 

198 if "openai" in self.config.markitdown.llm_endpoint.lower(): 

199 from openai import OpenAI # type: ignore 

200 

201 return OpenAI( 

202 base_url=self.config.markitdown.llm_endpoint, 

203 api_key=api_key, 

204 ) 

205 else: 

206 # For other endpoints, try to create a generic OpenAI-compatible client 

207 from openai import OpenAI # type: ignore 

208 

209 return OpenAI( 

210 base_url=self.config.markitdown.llm_endpoint, 

211 api_key=api_key, 

212 ) 

213 except ImportError as e: 

214 self.logger.warning( 

215 "OpenAI library not available for LLM integration", error=str(e) 

216 ) 

217 raise MarkItDownError( 

218 Exception("OpenAI library required for LLM integration") 

219 ) from e 

220 

221 def convert_file(self, file_path: str) -> str: 

222 """Convert a file to Markdown format with timeout support.""" 

223 # Normalize path for consistent logging (Windows compatibility) 

224 normalized_path = file_path.replace("\\", "/") 

225 self.logger.info("Starting file conversion", file_path=normalized_path) 

226 

227 try: 

228 self._validate_file(file_path) 

229 markitdown = self._get_markitdown() 

230 

231 # Apply timeout wrapper and warning capture for conversion 

232 with TimeoutHandler(self.config.conversion_timeout, file_path): 

233 with capture_openpyxl_warnings(self.logger, file_path): 

234 result = markitdown.convert(file_path) 

235 

236 if hasattr(result, "text_content"): 

237 markdown_content = result.text_content 

238 else: 

239 markdown_content = str(result) 

240 

241 self.logger.info( 

242 "File conversion completed", 

243 file_path=normalized_path, 

244 content_length=len(markdown_content), 

245 timeout_used=self.config.conversion_timeout, 

246 ) 

247 return markdown_content 

248 

249 except ConversionTimeoutError: 

250 # Re-raise timeout errors as-is 

251 self.logger.error( 

252 "File conversion timed out", 

253 file_path=normalized_path, 

254 timeout=self.config.conversion_timeout, 

255 ) 

256 raise 

257 except Exception as e: 

258 self.logger.error( 

259 "File conversion failed", file_path=normalized_path, error=str(e) 

260 ) 

261 raise MarkItDownError(e, file_path) from e 

262 

263 def _validate_file(self, file_path: str) -> None: 

264 """Validate file for conversion.""" 

265 if not os.path.exists(file_path): 

266 raise FileAccessError(f"File does not exist: {file_path}") 

267 

268 if not os.access(file_path, os.R_OK): 

269 raise FileAccessError(f"File is not readable: {file_path}") 

270 

271 file_size = os.path.getsize(file_path) 

272 if not self.config.is_file_size_allowed(file_size): 

273 raise FileSizeExceededError(file_size, self.config.max_file_size, file_path) 

274 

275 if not self.file_detector.is_supported_for_conversion(file_path): 

276 file_info = self.file_detector.get_file_type_info(file_path) 

277 raise UnsupportedFileTypeError( 

278 file_info.get("normalized_type", "unknown"), file_path 

279 ) 

280 

281 def create_fallback_document(self, file_path: str, error: Exception) -> str: 

282 """Create a fallback Markdown document when conversion fails.""" 

283 filename = Path(file_path).name 

284 file_info = self.file_detector.get_file_type_info(file_path) 

285 

286 return f"""# {filename} 

287 

288**File Information:** 

289- **Type**: {file_info.get("normalized_type", "unknown")} 

290- **Size**: {file_info.get("file_size", 0):,} bytes 

291- **Path**: {file_path} 

292 

293**Conversion Status**: ❌ Failed 

294**Error**: {str(error)} 

295 

296*This document was created as a fallback when the original file could not be converted.* 

297"""