Coverage for src/qdrant_loader/core/file_conversion/file_converter.py: 67%

238 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Main file conversion service using MarkItDown.""" 

2 

3import os 

4import signal 

5import sys 

6import warnings 

7from contextlib import contextmanager 

8from pathlib import Path 

9 

10# Windows compatibility fix: Monkey patch signal module for MarkItDown 

11if sys.platform == "win32" and not hasattr(signal, "SIGALRM"): 

12 # MarkItDown tries to use SIGALRM on Windows, so we provide a dummy 

13 signal.SIGALRM = 14 # Standard SIGALRM signal number on Unix 

14 signal.alarm = lambda _: None # No-op function for Windows 

15 

16from qdrant_loader.core.file_conversion.conversion_config import FileConversionConfig 

17from qdrant_loader.core.file_conversion.exceptions import ( 

18 ConversionTimeoutError, 

19 FileAccessError, 

20 FileSizeExceededError, 

21 MarkItDownError, 

22 UnsupportedFileTypeError, 

23) 

24from qdrant_loader.core.file_conversion.file_detector import FileDetector 

25from qdrant_loader.utils.logging import LoggingConfig 

26 

27logger = LoggingConfig.get_logger(__name__) 

28 

29 

30@contextmanager 

31def capture_openpyxl_warnings(logger_instance, file_path: str): 

32 """Context manager to capture openpyxl warnings and route them through our logging system.""" 

33 captured_warnings = [] 

34 

35 # Custom warning handler 

36 def warning_handler(message, category, filename, lineno, file=None, line=None): 

37 # Check if this is an openpyxl warning we want to capture 

38 if ( 

39 category is UserWarning 

40 and filename 

41 and "openpyxl" in filename 

42 and ( 

43 "Data Validation extension" in str(message) 

44 or "Conditional Formatting extension" in str(message) 

45 ) 

46 ): 

47 

48 # Extract the specific warning type 

49 warning_type = "Unknown Excel feature" 

50 if "Data Validation extension" in str(message): 

51 warning_type = "Data Validation" 

52 elif "Conditional Formatting extension" in str(message): 

53 warning_type = "Conditional Formatting" 

54 

55 # Track captured warning 

56 captured_warnings.append(warning_type) 

57 

58 # Log through our system instead of showing the raw warning 

59 logger_instance.info( 

60 "Excel feature not fully supported during conversion", 

61 file_path=file_path, 

62 feature_type=warning_type, 

63 source="openpyxl", 

64 ) 

65 else: 

66 # For non-openpyxl warnings, use the default behavior 

67 original_showwarning(message, category, filename, lineno, file, line) 

68 

69 # Store original warning handler 

70 original_showwarning = warnings.showwarning 

71 

72 try: 

73 # Install our custom warning handler 

74 warnings.showwarning = warning_handler 

75 yield 

76 

77 # Log summary if any warnings were captured 

78 if captured_warnings: 

79 logger_instance.info( 

80 "Excel conversion completed with unsupported features", 

81 file_path=file_path, 

82 total_warnings=len(captured_warnings), 

83 warning_types=list(set(captured_warnings)), 

84 source="openpyxl", 

85 ) 

86 finally: 

87 # Restore original warning handler 

88 warnings.showwarning = original_showwarning 

89 

90 

91class TimeoutHandler: 

92 """Context manager for handling conversion timeouts.""" 

93 

94 def __init__(self, timeout_seconds: int, file_path: str): 

95 self.timeout_seconds = timeout_seconds 

96 self.file_path = file_path 

97 self.old_handler = None 

98 self.timer = None 

99 

100 def _timeout_handler(self, _signum=None, _frame=None): 

101 """Signal handler for timeout.""" 

102 raise ConversionTimeoutError(self.timeout_seconds, self.file_path) 

103 

104 def _timeout_thread(self): 

105 """Thread-based timeout for Windows.""" 

106 import time 

107 

108 time.sleep(self.timeout_seconds) 

109 self._timeout_handler() 

110 

111 def __enter__(self): 

112 """Set up timeout handler (Unix signals or Windows threading).""" 

113 if sys.platform == "win32": 

114 # Windows doesn't support SIGALRM, use threading instead 

115 import threading 

116 

117 self.timer = threading.Thread(target=self._timeout_thread, daemon=True) 

118 self.timer.start() 

119 else: 

120 # Unix/Linux/macOS: use signal-based timeout 

121 if hasattr(signal, "SIGALRM"): 

122 self.old_handler = signal.signal(signal.SIGALRM, self._timeout_handler) 

123 signal.alarm(self.timeout_seconds) 

124 return self 

125 

126 def __exit__(self, exc_type, exc_val, _exc_tb): 

127 """Clean up timeout handler.""" 

128 if sys.platform == "win32": 

129 # On Windows, we can't easily cancel the thread, but since it's daemon, 

130 # it will be cleaned up when the process exits 

131 # The timeout will simply not trigger if conversion completes first 

132 pass 

133 else: 

134 # Unix/Linux/macOS: clean up signal handler 

135 if hasattr(signal, "SIGALRM"): 

136 signal.alarm(0) # Cancel the alarm 

137 if self.old_handler is not None: 

138 signal.signal(signal.SIGALRM, self.old_handler) 

139 

140 

141class FileConverter: 

142 """Service for converting files to Markdown using MarkItDown.""" 

143 

144 def __init__(self, config: FileConversionConfig): 

145 """Initialize the file converter.""" 

146 self.config = config 

147 self.file_detector = FileDetector() 

148 self.logger = LoggingConfig.get_logger(__name__) 

149 self._markitdown = None 

150 

151 def _get_markitdown(self): 

152 """Get MarkItDown instance with lazy loading and LLM configuration.""" 

153 if self._markitdown is None: 

154 try: 

155 from markitdown import MarkItDown # type: ignore 

156 

157 # Configure MarkItDown with LLM settings if enabled 

158 if self.config.markitdown.enable_llm_descriptions: 

159 self.logger.debug( 

160 "Initializing MarkItDown with LLM configuration", 

161 llm_model=self.config.markitdown.llm_model, 

162 llm_endpoint=self.config.markitdown.llm_endpoint, 

163 ) 

164 

165 # Warn when legacy MarkItDown overrides are in effect 

166 try: 

167 if ( 

168 self.config.markitdown.llm_model 

169 or self.config.markitdown.llm_endpoint 

170 or self.config.markitdown.llm_api_key 

171 ): 

172 self.logger.warning( 

173 "Using MarkItDown llm_* overrides; prefer configuring global.llm", 

174 llm_model=bool(self.config.markitdown.llm_model), 

175 llm_endpoint=bool(self.config.markitdown.llm_endpoint), 

176 llm_api_key=bool(self.config.markitdown.llm_api_key), 

177 ) 

178 except Exception: 

179 pass 

180 

181 # Create LLM client backed by provider (OpenAI-compatible wrapper) 

182 llm_client = self._create_llm_client() 

183 

184 self._markitdown = MarkItDown( 

185 llm_client=llm_client, 

186 llm_model=self.config.markitdown.llm_model, 

187 ) 

188 self.logger.debug("MarkItDown initialized with LLM support") 

189 else: 

190 self._markitdown = MarkItDown() 

191 self.logger.debug("MarkItDown initialized without LLM support") 

192 

193 except ImportError as e: 

194 raise MarkItDownError( 

195 Exception("MarkItDown library not available") 

196 ) from e 

197 return self._markitdown 

198 

199 def _create_llm_client(self): 

200 """Create an OpenAI-compatible LLM client backed by core provider. 

201 

202 Returns an object exposing `.chat.completions.create(...)` that adapts to 

203 the provider-agnostic ChatClient under the hood. Falls back to the 

204 OpenAI client when core is unavailable. 

205 """ 

206 # Attempt provider-first wiring using core settings 

207 try: 

208 from dataclasses import replace as _dc_replace 

209 

210 # Lazy import to avoid circular import at module import time 

211 from importlib import import_module 

212 from importlib import import_module as _import_module 

213 

214 cfg_mod = _import_module("qdrant_loader.config") 

215 settings = cfg_mod.get_settings() 

216 core_settings_mod = import_module("qdrant_loader_core.llm.settings") 

217 core_factory_mod = import_module("qdrant_loader_core.llm.factory") 

218 LLMSettings = core_settings_mod.LLMSettings 

219 create_provider = core_factory_mod.create_provider 

220 

221 base_llm: LLMSettings = settings.llm_settings # type: ignore 

222 

223 # Apply legacy MarkItDown overrides (model/endpoint/api_key) when provided 

224 md = self.config.markitdown 

225 models = dict(getattr(base_llm, "models", {}) or {}) 

226 if md.llm_model: 

227 models["chat"] = md.llm_model 

228 

229 override_kwargs = { 

230 "models": models, 

231 } 

232 if md.llm_endpoint: 

233 override_kwargs["base_url"] = md.llm_endpoint 

234 if md.llm_api_key: 

235 override_kwargs["api_key"] = md.llm_api_key 

236 

237 effective_llm = _dc_replace(base_llm, **override_kwargs) 

238 

239 provider = create_provider(effective_llm) 

240 chat_client = provider.chat() 

241 

242 # Build an OpenAI-compatible wrapper for MarkItDown 

243 class _ResponseMessage: 

244 def __init__(self, content: str): 

245 self.content = content 

246 

247 class _ResponseChoice: 

248 def __init__(self, content: str): 

249 self.message = _ResponseMessage(content) 

250 

251 class _Response: 

252 def __init__(self, content: str, model_name: str): 

253 self.choices = [_ResponseChoice(content)] 

254 self.model = model_name 

255 self.usage = None 

256 

257 class _Completions: 

258 def __init__(self, chat_client): 

259 self._chat_client = chat_client 

260 

261 def create(self, *, model: str, messages: list[dict], **kwargs): 

262 import asyncio as _asyncio 

263 

264 async def _run(): 

265 result = await self._chat_client.chat( 

266 messages=messages, 

267 model=model, 

268 **kwargs, 

269 ) 

270 text = (result or {}).get("text", "") 

271 used_model = (result or {}).get("model", model) 

272 return _Response(text, used_model) 

273 

274 try: 

275 loop = _asyncio.get_event_loop() 

276 if loop.is_running(): 

277 import concurrent.futures as _cf 

278 

279 with _cf.ThreadPoolExecutor(max_workers=1) as ex: 

280 fut = ex.submit(_asyncio.run, _run()) 

281 return fut.result() 

282 else: 

283 return loop.run_until_complete(_run()) 

284 except RuntimeError: 

285 return _asyncio.run(_run()) 

286 

287 class _Chat: 

288 def __init__(self, chat_client): 

289 self.completions = _Completions(chat_client) 

290 

291 class _OpenAICompatibleClient: 

292 def __init__(self, chat_client): 

293 self.chat = _Chat(chat_client) 

294 

295 return _OpenAICompatibleClient(chat_client) 

296 except Exception as e: 

297 # Provider path unavailable; fall back to an HTTP-based OpenAI-compatible client 

298 try: 

299 import json as _json 

300 import urllib.request as _urlreq 

301 

302 base_url = (self.config.markitdown.llm_endpoint or "").rstrip("/") 

303 if not base_url: 

304 raise RuntimeError( 

305 "No llm_endpoint configured for MarkItDown fallback" 

306 ) 

307 api_key = ( 

308 self.config.markitdown.llm_api_key 

309 or os.getenv("OPENAI_API_KEY") 

310 or os.getenv("LLM_API_KEY", "") 

311 ) 

312 

313 class _ResponseMessage: 

314 def __init__(self, content: str): 

315 self.content = content 

316 

317 class _ResponseChoice: 

318 def __init__(self, content: str): 

319 self.message = _ResponseMessage(content) 

320 

321 class _Response: 

322 def __init__( 

323 self, 

324 content: str, 

325 model_name: str, 

326 usage: dict | None, 

327 raw: dict, 

328 ): 

329 self.choices = [_ResponseChoice(content)] 

330 self.model = model_name 

331 self.usage = usage 

332 self.raw = raw 

333 

334 def _join(u: str, p: str) -> str: 

335 return f"{u}/{p.lstrip('/')}" 

336 

337 class _HTTPCompletions: 

338 def __init__(self, base: str, api_key: str): 

339 self._base = base 

340 self._api_key = api_key 

341 

342 def create(self, *, model: str, messages: list[dict], **kwargs): 

343 url = _join(self._base, "/chat/completions") 

344 payload = {"model": model, "messages": messages} 

345 for k in ( 

346 "temperature", 

347 "max_tokens", 

348 "top_p", 

349 "frequency_penalty", 

350 "presence_penalty", 

351 "stop", 

352 ): 

353 if k in kwargs and kwargs[k] is not None: 

354 payload[k] = kwargs[k] 

355 headers = {"Content-Type": "application/json"} 

356 if self._api_key: 

357 headers["Authorization"] = f"Bearer {self._api_key}" 

358 req = _urlreq.Request( 

359 url, 

360 data=_json.dumps(payload).encode("utf-8"), 

361 headers=headers, 

362 method="POST", 

363 ) 

364 with _urlreq.urlopen( 

365 req, timeout=60 

366 ) as resp: # nosec B310 - controlled URL from config 

367 body = resp.read() 

368 data = _json.loads(body.decode("utf-8")) 

369 text = "" 

370 choices = data.get("choices") or [] 

371 if choices: 

372 msg = (choices[0] or {}).get("message") or {} 

373 text = msg.get("content", "") or "" 

374 usage = data.get("usage") 

375 used_model = data.get("model", model) 

376 return _Response(text, used_model, usage, data) 

377 

378 class _HTTPChat: 

379 def __init__(self, base: str, api_key: str): 

380 self.completions = _HTTPCompletions(base, api_key) 

381 

382 class _HTTPOpenAICompatibleClient: 

383 def __init__(self, base: str, api_key: str): 

384 self.chat = _HTTPChat(base, api_key) 

385 

386 return _HTTPOpenAICompatibleClient(base_url, api_key) 

387 except Exception as e2: # pragma: no cover 

388 self.logger.warning( 

389 "LLM provider unavailable and HTTP OpenAI-compatible fallback failed for MarkItDown", 

390 error=str(e2) or str(e), 

391 ) 

392 raise MarkItDownError( 

393 Exception("No LLM client available for MarkItDown") 

394 ) 

395 

396 def convert_file(self, file_path: str) -> str: 

397 """Convert a file to Markdown format with timeout support.""" 

398 # Normalize path for consistent logging (Windows compatibility) 

399 normalized_path = file_path.replace("\\", "/") 

400 self.logger.info("Starting file conversion", file_path=normalized_path) 

401 

402 try: 

403 self._validate_file(file_path) 

404 markitdown = self._get_markitdown() 

405 

406 # Apply timeout wrapper and warning capture for conversion 

407 with TimeoutHandler(self.config.conversion_timeout, file_path): 

408 with capture_openpyxl_warnings(self.logger, file_path): 

409 result = markitdown.convert(file_path) 

410 

411 if hasattr(result, "text_content"): 

412 markdown_content = result.text_content 

413 else: 

414 markdown_content = str(result) 

415 

416 self.logger.info( 

417 "File conversion completed", 

418 file_path=normalized_path, 

419 content_length=len(markdown_content), 

420 timeout_used=self.config.conversion_timeout, 

421 ) 

422 return markdown_content 

423 

424 except ConversionTimeoutError: 

425 # Re-raise timeout errors as-is 

426 self.logger.error( 

427 "File conversion timed out", 

428 file_path=normalized_path, 

429 timeout=self.config.conversion_timeout, 

430 ) 

431 raise 

432 except Exception as e: 

433 self.logger.error( 

434 "File conversion failed", file_path=normalized_path, error=str(e) 

435 ) 

436 raise MarkItDownError(e, file_path) from e 

437 

438 def _validate_file(self, file_path: str) -> None: 

439 """Validate file for conversion.""" 

440 if not os.path.exists(file_path): 

441 raise FileAccessError(f"File does not exist: {file_path}") 

442 

443 if not os.access(file_path, os.R_OK): 

444 raise FileAccessError(f"File is not readable: {file_path}") 

445 

446 file_size = os.path.getsize(file_path) 

447 if not self.config.is_file_size_allowed(file_size): 

448 raise FileSizeExceededError(file_size, self.config.max_file_size, file_path) 

449 

450 if not self.file_detector.is_supported_for_conversion(file_path): 

451 file_info = self.file_detector.get_file_type_info(file_path) 

452 raise UnsupportedFileTypeError( 

453 file_info.get("normalized_type", "unknown"), file_path 

454 ) 

455 

456 def create_fallback_document(self, file_path: str, error: Exception) -> str: 

457 """Create a fallback Markdown document when conversion fails.""" 

458 filename = Path(file_path).name 

459 file_info = self.file_detector.get_file_type_info(file_path) 

460 

461 return f"""# {filename} 

462 

463**File Information:** 

464- **Type**: {file_info.get("normalized_type", "unknown")} 

465- **Size**: {file_info.get("file_size", 0):,} bytes 

466- **Path**: {file_path} 

467 

468**Conversion Status**: ❌ Failed 

469**Error**: {str(error)} 

470 

471*This document was created as a fallback when the original file could not be converted.* 

472"""