Coverage for src / qdrant_loader / core / file_conversion / file_converter.py: 67%

246 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Main file conversion service using MarkItDown.""" 

2 

3import os 

4import signal 

5import sys 

6import warnings 

7from contextlib import contextmanager 

8from pathlib import Path 

9 

10# Windows compatibility fix: Monkey patch signal module for MarkItDown 

11if sys.platform == "win32" and not hasattr(signal, "SIGALRM"): 

12 # MarkItDown tries to use SIGALRM on Windows, so we provide a dummy 

13 signal.SIGALRM = 14 # Standard SIGALRM signal number on Unix 

14 signal.alarm = lambda _: None # No-op function for Windows 

15 

16from qdrant_loader.core.file_conversion.conversion_config import FileConversionConfig 

17from qdrant_loader.core.file_conversion.exceptions import ( 

18 ConversionTimeoutError, 

19 FileAccessError, 

20 FileSizeExceededError, 

21 MarkItDownError, 

22 UnsupportedFileTypeError, 

23) 

24from qdrant_loader.core.file_conversion.file_detector import FileDetector 

25from qdrant_loader.utils.logging import LoggingConfig 

26 

27logger = LoggingConfig.get_logger(__name__) 

28 

29 

30@contextmanager 

31def capture_openpyxl_warnings(logger_instance, file_path: str): 

32 """Context manager to capture openpyxl warnings and route them through our logging system.""" 

33 captured_warnings = [] 

34 

35 # Custom warning handler 

36 def warning_handler(message, category, filename, lineno, file=None, line=None): 

37 # Check if this is an openpyxl warning we want to capture 

38 if ( 

39 category is UserWarning 

40 and filename 

41 and "openpyxl" in filename 

42 and ( 

43 "Data Validation extension" in str(message) 

44 or "Conditional Formatting extension" in str(message) 

45 ) 

46 ): 

47 # Extract the specific warning type 

48 warning_type = "Unknown Excel feature" 

49 if "Data Validation extension" in str(message): 

50 warning_type = "Data Validation" 

51 elif "Conditional Formatting extension" in str(message): 

52 warning_type = "Conditional Formatting" 

53 

54 # Track captured warning 

55 captured_warnings.append(warning_type) 

56 

57 # Log through our system instead of showing the raw warning 

58 logger_instance.info( 

59 "Excel feature not fully supported during conversion", 

60 file_path=file_path, 

61 feature_type=warning_type, 

62 source="openpyxl", 

63 ) 

64 else: 

65 # For non-openpyxl warnings, use the default behavior 

66 original_showwarning(message, category, filename, lineno, file, line) 

67 

68 # Store original warning handler 

69 original_showwarning = warnings.showwarning 

70 

71 try: 

72 # Install our custom warning handler 

73 warnings.showwarning = warning_handler 

74 yield 

75 

76 # Log summary if any warnings were captured 

77 if captured_warnings: 

78 logger_instance.info( 

79 "Excel conversion completed with unsupported features", 

80 file_path=file_path, 

81 total_warnings=len(captured_warnings), 

82 warning_types=list(set(captured_warnings)), 

83 source="openpyxl", 

84 ) 

85 finally: 

86 # Restore original warning handler 

87 warnings.showwarning = original_showwarning 

88 

89 

90class TimeoutHandler: 

91 """Context manager for handling conversion timeouts.""" 

92 

93 def __init__(self, timeout_seconds: int, file_path: str): 

94 self.timeout_seconds = timeout_seconds 

95 self.file_path = file_path 

96 self.old_handler = None 

97 self.timer = None 

98 

99 def _timeout_handler(self, _signum=None, _frame=None): 

100 """Signal handler for timeout.""" 

101 raise ConversionTimeoutError(self.timeout_seconds, self.file_path) 

102 

103 def _timeout_thread(self): 

104 """Thread-based timeout for Windows.""" 

105 import time 

106 

107 time.sleep(self.timeout_seconds) 

108 self._timeout_handler() 

109 

110 def __enter__(self): 

111 """Set up timeout handler (Unix signals or Windows threading).""" 

112 if sys.platform == "win32": 

113 # Windows doesn't support SIGALRM, use threading instead 

114 import threading 

115 

116 self.timer = threading.Thread(target=self._timeout_thread, daemon=True) 

117 self.timer.start() 

118 else: 

119 # Unix/Linux/macOS: use signal-based timeout 

120 if hasattr(signal, "SIGALRM"): 

121 self.old_handler = signal.signal(signal.SIGALRM, self._timeout_handler) 

122 signal.alarm(self.timeout_seconds) 

123 return self 

124 

125 def __exit__(self, exc_type, exc_val, _exc_tb): 

126 """Clean up timeout handler.""" 

127 if sys.platform == "win32": 

128 # On Windows, we can't easily cancel the thread, but since it's daemon, 

129 # it will be cleaned up when the process exits 

130 # The timeout will simply not trigger if conversion completes first 

131 pass 

132 else: 

133 # Unix/Linux/macOS: clean up signal handler 

134 if hasattr(signal, "SIGALRM"): 

135 signal.alarm(0) # Cancel the alarm 

136 if self.old_handler is not None: 

137 signal.signal(signal.SIGALRM, self.old_handler) 

138 

139 

140class FileConverter: 

141 """Service for converting files to Markdown using MarkItDown.""" 

142 

143 def __init__(self, config: FileConversionConfig): 

144 """Initialize the file converter.""" 

145 self.config = config 

146 self.file_detector = FileDetector() 

147 self.logger = LoggingConfig.get_logger(__name__) 

148 self._markitdown = None 

149 

150 def _get_markitdown(self): 

151 """Get MarkItDown instance with lazy loading and LLM configuration.""" 

152 if self._markitdown is None: 

153 try: 

154 from markitdown import MarkItDown # type: ignore 

155 

156 if self.config.markitdown.enable_llm_descriptions: 

157 self.logger.debug( 

158 "Initializing MarkItDown with LLM configuration", 

159 llm_model=self.config.markitdown.llm_model, 

160 llm_endpoint=self.config.markitdown.llm_endpoint, 

161 ) 

162 try: 

163 if ( 

164 self.config.markitdown.llm_model 

165 or self.config.markitdown.llm_endpoint 

166 or self.config.markitdown.llm_api_key 

167 ): 

168 self.logger.warning( 

169 "Using MarkItDown llm_* overrides; prefer configuring global.llm", 

170 llm_model=bool(self.config.markitdown.llm_model), 

171 llm_endpoint=bool(self.config.markitdown.llm_endpoint), 

172 llm_api_key=bool(self.config.markitdown.llm_api_key), 

173 ) 

174 except Exception: 

175 pass 

176 

177 llm_client = self._create_llm_client() 

178 self._markitdown = MarkItDown( 

179 llm_client=llm_client, 

180 llm_model=self.config.markitdown.llm_model, 

181 ) 

182 self.logger.debug("MarkItDown initialized with LLM support") 

183 else: 

184 self._markitdown = MarkItDown() 

185 self.logger.debug("MarkItDown initialized without LLM support") 

186 

187 self._register_custom_converters(self._markitdown) 

188 

189 except ImportError as e: 

190 raise MarkItDownError( 

191 Exception("MarkItDown library not available") 

192 ) from e 

193 return self._markitdown 

194 

195 def _register_custom_converters(self, markitdown_instance) -> None: 

196 """Register qdrant-loader's custom converters at higher priority than the defaults.""" 

197 from qdrant_loader.core.file_conversion.clean_xlsx_converter import ( 

198 CleanXlsConverter, 

199 CleanXlsxConverter, 

200 ) 

201 

202 # priority=-1 beats MarkItDown's default PRIORITY_SPECIFIC_FILE_FORMAT (0.0 in 

203 # installed version 0.1.5; task spec mentioned 10). Strictly lower priority 

204 # ensures our converters are tried first during ascending-sort dispatch. 

205 markitdown_instance.register_converter(CleanXlsxConverter(), priority=-1) 

206 markitdown_instance.register_converter(CleanXlsConverter(), priority=-1) 

207 

208 def _create_llm_client(self): 

209 """Create an OpenAI-compatible LLM client backed by core provider. 

210 

211 Returns an object exposing `.chat.completions.create(...)` that adapts to 

212 the provider-agnostic ChatClient under the hood. Falls back to the 

213 OpenAI client when core is unavailable. 

214 """ 

215 # Attempt provider-first wiring using core settings 

216 try: 

217 from dataclasses import replace as _dc_replace 

218 

219 # Lazy import to avoid circular import at module import time 

220 from importlib import import_module 

221 from importlib import import_module as _import_module 

222 

223 cfg_mod = _import_module("qdrant_loader.config") 

224 settings = cfg_mod.get_settings() 

225 core_settings_mod = import_module("qdrant_loader_core.llm.settings") 

226 core_factory_mod = import_module("qdrant_loader_core.llm.factory") 

227 LLMSettings = core_settings_mod.LLMSettings 

228 create_provider = core_factory_mod.create_provider 

229 

230 base_llm: LLMSettings = settings.llm_settings # type: ignore 

231 

232 # Apply legacy MarkItDown overrides (model/endpoint/api_key) when provided 

233 md = self.config.markitdown 

234 models = dict(getattr(base_llm, "models", {}) or {}) 

235 if md.llm_model: 

236 models["chat"] = md.llm_model 

237 

238 override_kwargs = { 

239 "models": models, 

240 } 

241 if md.llm_endpoint: 

242 override_kwargs["base_url"] = md.llm_endpoint 

243 if md.llm_api_key: 

244 override_kwargs["api_key"] = md.llm_api_key 

245 

246 effective_llm = _dc_replace(base_llm, **override_kwargs) 

247 

248 provider = create_provider(effective_llm) 

249 chat_client = provider.chat() 

250 

251 # Build an OpenAI-compatible wrapper for MarkItDown 

252 class _ResponseMessage: 

253 def __init__(self, content: str): 

254 self.content = content 

255 

256 class _ResponseChoice: 

257 def __init__(self, content: str): 

258 self.message = _ResponseMessage(content) 

259 

260 class _Response: 

261 def __init__(self, content: str, model_name: str): 

262 self.choices = [_ResponseChoice(content)] 

263 self.model = model_name 

264 self.usage = None 

265 

266 class _Completions: 

267 def __init__(self, chat_client): 

268 self._chat_client = chat_client 

269 

270 def create(self, *, model: str, messages: list[dict], **kwargs): 

271 import asyncio as _asyncio 

272 

273 async def _run(): 

274 result = await self._chat_client.chat( 

275 messages=messages, 

276 model=model, 

277 **kwargs, 

278 ) 

279 text = (result or {}).get("text", "") 

280 used_model = (result or {}).get("model", model) 

281 return _Response(text, used_model) 

282 

283 try: 

284 loop = _asyncio.get_event_loop() 

285 if loop.is_running(): 

286 import concurrent.futures as _cf 

287 

288 with _cf.ThreadPoolExecutor(max_workers=1) as ex: 

289 fut = ex.submit(_asyncio.run, _run()) 

290 return fut.result() 

291 else: 

292 return loop.run_until_complete(_run()) 

293 except RuntimeError: 

294 return _asyncio.run(_run()) 

295 

296 class _Chat: 

297 def __init__(self, chat_client): 

298 self.completions = _Completions(chat_client) 

299 

300 class _OpenAICompatibleClient: 

301 def __init__(self, chat_client): 

302 self.chat = _Chat(chat_client) 

303 

304 return _OpenAICompatibleClient(chat_client) 

305 except Exception as e: 

306 # Provider path unavailable; fall back to an HTTP-based OpenAI-compatible client 

307 try: 

308 import json as _json 

309 import urllib.request as _urlreq 

310 

311 base_url = (self.config.markitdown.llm_endpoint or "").rstrip("/") 

312 if not base_url: 

313 raise RuntimeError( 

314 "No llm_endpoint configured for MarkItDown fallback" 

315 ) 

316 api_key = ( 

317 self.config.markitdown.llm_api_key 

318 or os.getenv("OPENAI_API_KEY") 

319 or os.getenv("LLM_API_KEY", "") 

320 ) 

321 

322 class _ResponseMessage: 

323 def __init__(self, content: str): 

324 self.content = content 

325 

326 class _ResponseChoice: 

327 def __init__(self, content: str): 

328 self.message = _ResponseMessage(content) 

329 

330 class _Response: 

331 def __init__( 

332 self, 

333 content: str, 

334 model_name: str, 

335 usage: dict | None, 

336 raw: dict, 

337 ): 

338 self.choices = [_ResponseChoice(content)] 

339 self.model = model_name 

340 self.usage = usage 

341 self.raw = raw 

342 

343 def _join(u: str, p: str) -> str: 

344 return f"{u}/{p.lstrip('/')}" 

345 

346 class _HTTPCompletions: 

347 def __init__(self, base: str, api_key: str): 

348 self._base = base 

349 self._api_key = api_key 

350 

351 def create(self, *, model: str, messages: list[dict], **kwargs): 

352 url = _join(self._base, "/chat/completions") 

353 payload = {"model": model, "messages": messages} 

354 for k in ( 

355 "temperature", 

356 "max_tokens", 

357 "top_p", 

358 "frequency_penalty", 

359 "presence_penalty", 

360 "stop", 

361 ): 

362 if k in kwargs and kwargs[k] is not None: 

363 payload[k] = kwargs[k] 

364 headers = {"Content-Type": "application/json"} 

365 if self._api_key: 

366 headers["Authorization"] = f"Bearer {self._api_key}" 

367 req = _urlreq.Request( 

368 url, 

369 data=_json.dumps(payload).encode("utf-8"), 

370 headers=headers, 

371 method="POST", 

372 ) 

373 with _urlreq.urlopen( 

374 req, timeout=60 

375 ) as resp: # nosec B310 - controlled URL from config 

376 body = resp.read() 

377 data = _json.loads(body.decode("utf-8")) 

378 text = "" 

379 choices = data.get("choices") or [] 

380 if choices: 

381 msg = (choices[0] or {}).get("message") or {} 

382 text = msg.get("content", "") or "" 

383 usage = data.get("usage") 

384 used_model = data.get("model", model) 

385 return _Response(text, used_model, usage, data) 

386 

387 class _HTTPChat: 

388 def __init__(self, base: str, api_key: str): 

389 self.completions = _HTTPCompletions(base, api_key) 

390 

391 class _HTTPOpenAICompatibleClient: 

392 def __init__(self, base: str, api_key: str): 

393 self.chat = _HTTPChat(base, api_key) 

394 

395 return _HTTPOpenAICompatibleClient(base_url, api_key) 

396 except Exception as e2: # pragma: no cover 

397 self.logger.warning( 

398 "LLM provider unavailable and HTTP OpenAI-compatible fallback failed for MarkItDown", 

399 error=str(e2) or str(e), 

400 ) 

401 raise MarkItDownError( 

402 Exception("No LLM client available for MarkItDown") 

403 ) 

404 

405 def convert_file(self, file_path: str) -> str: 

406 """Convert a file to Markdown format with timeout support.""" 

407 # Normalize path for consistent logging (Windows compatibility) 

408 normalized_path = file_path.replace("\\", "/") 

409 self.logger.info("Starting file conversion", file_path=normalized_path) 

410 

411 try: 

412 self._validate_file(file_path) 

413 markitdown = self._get_markitdown() 

414 

415 # Apply timeout wrapper and warning capture for conversion 

416 with TimeoutHandler(self.config.conversion_timeout, file_path): 

417 with capture_openpyxl_warnings(self.logger, file_path): 

418 result = markitdown.convert(file_path) 

419 

420 if hasattr(result, "text_content"): 

421 markdown_content = result.text_content 

422 else: 

423 markdown_content = str(result) 

424 

425 self.logger.info( 

426 "File conversion completed", 

427 file_path=normalized_path, 

428 content_length=len(markdown_content), 

429 timeout_used=self.config.conversion_timeout, 

430 ) 

431 return markdown_content 

432 

433 except ConversionTimeoutError: 

434 # Re-raise timeout errors as-is 

435 self.logger.error( 

436 "File conversion timed out", 

437 file_path=normalized_path, 

438 timeout=self.config.conversion_timeout, 

439 ) 

440 raise 

441 except Exception as e: 

442 self.logger.error( 

443 "File conversion failed", file_path=normalized_path, error=str(e) 

444 ) 

445 raise MarkItDownError(e, file_path) from e 

446 

447 def _validate_file(self, file_path: str) -> None: 

448 """Validate file for conversion.""" 

449 if not os.path.exists(file_path): 

450 raise FileAccessError(f"File does not exist: {file_path}") 

451 

452 if not os.access(file_path, os.R_OK): 

453 raise FileAccessError(f"File is not readable: {file_path}") 

454 

455 file_size = os.path.getsize(file_path) 

456 if not self.config.is_file_size_allowed(file_size): 

457 raise FileSizeExceededError(file_size, self.config.max_file_size, file_path) 

458 

459 if not self.file_detector.is_supported_for_conversion(file_path): 

460 file_info = self.file_detector.get_file_type_info(file_path) 

461 unsupported_type = file_info.get("normalized_type") 

462 if not unsupported_type: 

463 unsupported_type = Path(file_path).suffix.lstrip(".") or "unknown" 

464 raise UnsupportedFileTypeError(unsupported_type, file_path) 

465 

466 def create_fallback_document(self, file_path: str, error: Exception) -> str: 

467 """Create a fallback Markdown document when conversion fails.""" 

468 filename = Path(file_path).name 

469 file_info = self.file_detector.get_file_type_info(file_path) 

470 

471 return f"""# {filename} 

472 

473**File Information:** 

474- **Type**: {file_info.get("normalized_type", "unknown")} 

475- **Size**: {file_info.get("file_size", 0):,} bytes 

476- **Path**: {file_path} 

477 

478**Conversion Status**: ❌ Failed 

479**Error**: {str(error)} 

480 

481*This document was created as a fallback when the original file could not be converted.* 

482"""